# Some preprocessing for Cities data

- First clean-up files in command line
    - Remove additional fields and rows
    - Remove scandinavian and other special chars which are not accepted by Spark SQL
- Run queries to convert data from multi line format to multi file
    - Same results into file system

Next step is join individual files into joined.

## Command line preprocessing 
```
iconv -f ISO-8859-1 raw/003_139f_2040_20230919-193133.csv > Cleaned/Forecast_003_139f_2040_20230919-193133.csv

grep KO Cleaned/Forecast_003_139f_2040_20230919-193133.csv > Cleaned/Areas/Forecast_Areas.csv

grep -v MA tmp2.csv > Cleaned/Forecast_003_139f_2040_20230919-193133.csv 
```

In [2]:
##
## Imports and Spark initiation 
##
import pandas as pd
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("main")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()

input_dir = "data/Input/"
general_file = input_dir + "Cities_general_data.csv"
population_file = input_dir + "Cities_population_data.csv"
forecast_file = input_dir + "Cities_Forecast.csv"


23/10/25 20:05:50 WARN Utils: Your hostname, Markos-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
23/10/25 20:05:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/25 20:05:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 20:05:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
def loadDataAndRegister(path, viewName):
    tbl = spark.read.option("delimiter", "\t").csv(path, inferSchema="true", header="true", nullValue="NA")
    tbl.createOrReplaceTempView(viewName)
    return tbl

cities_general_data = loadDataAndRegister(general_file, "general_view")
cities_general_data.show(5)
cities_population_data = loadDataAndRegister(population_file, "population_view")
cities_population_data.show(5)

+------+--------------------+------+------+------+------+------+------+------+------+------+------+------+------+
|  Alue|              Tiedot| Y2010| Y2011| Y2012| Y2013| Y2014| Y2015| Y2016| Y2017| Y2018| Y2019| Y2020| Y2021|
+------+--------------------+------+------+------+------+------+------+------+------+------+------+------+------+
|Askola|Asukkaat yhteensä...|4864.0|4911.0|4988.0|4991.0|5064.0|5104.0|5046.0|4990.0|4958.0|4943.0|4878.0|4847.0|
|Askola|         Miehet (HE)|2455.0|2482.0|2505.0|2518.0|2549.0|2565.0|2530.0|2519.0|2519.0|2496.0|2486.0|2453.0|
|Askola|         Naiset (HE)|2409.0|2429.0|2483.0|2473.0|2515.0|2539.0|2516.0|2471.0|2439.0|2447.0|2392.0|2394.0|
|Askola|Asukkaiden keski-...|  39.0|  40.0|  40.0|  40.0|  40.0|  40.0|  40.0|  41.0|  41.0|  42.0|  42.0|  43.0|
|Askola|18 vuotta täyttän...|3650.0|3696.0|3745.0|3742.0|3815.0|3832.0|3785.0|3755.0|3748.0|3774.0|3762.0|3777.0|
+------+--------------------+------+------+------+------+------+------+------+------+---

Original data set is quite large and many rows should be dropped. That is most likely easier to do in command line directly into csv files.   

```
grep -v vuotiaat Cities_general_data.csv > tmp.csv 
grep -v "85 vuotta" tmp.csv > Cities_general_data.csv

grep -v "Alle 15-vuotiaiden osuus, %" Cities_population_data.csv | grep -v "15-64-vuotiaiden osuus, %" | grep -v "65 vuotta täyttäneiden osuus, %" > tmp.csv
grep -v osuus tmp.csv > Cities_population_data.csv

tr -s ",%" "_"
sed -i "" "s/km[^\"]*/#/g" tmp.csv 
sed -i "" "s/_ _p/g" Cities_population_data.csv
sed -i "" "s/\"20/\"Y20/g" Cities_population_data.csv

cat Cities_Forecast.csv | cut -f 1,2,3 
```

In [12]:
df = spark.read.option("delimiter", "\t").csv(forecast_file, inferSchema="true", header="true")
#df.show()
pivot_df = df.groupBy("Alue").pivot("Vuosi").agg(f.first("Yhteensa"))

for c in pivot_df.columns:
    if c == "Alue":
        pivot_df = pivot_df.withColumnRenamed(c, "City")
    else:
        nc = "Y" + str(c)
        pivot_df = pivot_df.withColumnRenamed(c, nc)
pivot_df.show()

+----------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|            City|Y2021|Y2022|Y2023|Y2024|Y2025|Y2026|Y2027|Y2028|Y2029|Y2030|Y2031|
+----------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|         Kajaani|36429|36251|36069|35887|35702|35510|35321|35125|34928|34732|34533|
|       Enonkoski| 1356| 1342| 1327| 1314| 1300| 1287| 1274| 1259| 1246| 1230| 1214|
|       Parikkala| 4556| 4463| 4377| 4295| 4219| 4144| 4070| 4002| 3935| 3870| 3806|
|       Tuusniemi| 2377| 2329| 2283| 2244| 2207| 2174| 2142| 2113| 2084| 2058| 2033|
|        Kuortane| 3497| 3458| 3423| 3387| 3354| 3319| 3287| 3257| 3229| 3204| 3177|
|        Finström| 2625| 2636| 2649| 2660| 2671| 2678| 2683| 2689| 2695| 2700| 2704|
|     Taipalsaari| 4600| 4567| 4535| 4501| 4469| 4436| 4406| 4375| 4344| 4316| 4287|
|        Kyyjärvi| 1264| 1239| 1218| 1199| 1179| 1161| 1143| 1124| 1105| 1086| 1069|
|          Eckerö|  966|  970|  976|  982|  987|  993|  998| 1003

In [12]:
apartments_data_file = "data/Input/" + "preprocessed_merged_apartments_population_2011-2022.csv"
df = spark.read.option("delimiter", ",").csv(apartments_data_file, inferSchema="true", header="true")
pivot_rent = df.groupBy("City").pivot("Year").agg(f.mean("Rent per m2"))
pivot_rent.show()
pivot_sell = df.groupBy("City").pivot("Year").agg(f.mean("Sales price per m2"))


+----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|            City|              2011|              2012|              2013|              2014|              2015|              2016|              2017|              2018|              2019|              2020|              2021|              2022|
+----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|           Turku|11.213333333333333|11.590000000000002|11.913333333333334|12.146666666666668|             11.87|12.306666666666667|12.943333333333333|13.403333333333334|             13.75|             14.25|14.553333333333333|              15.1|
|       Jyvä

In [13]:
import io
import os 
import time
  

def convert(input):
    tmp = input.lower().translate(str.maketrans(".äö() -", "_ao____"))
    return tmp.replace("__", "_").strip("_")

def get_col_names(s, e, c):
    cols = []
    for year in range(s, e):
        col_name_template = "Y%i as %i_%s" %(year, year, c)
        cols.append(col_name_template)
    return ", ".join(cols)

def write_metadata(df, export_dir):
    metadata_file = "%s/metadata.txt" %(export_dir)
    buffer = io.StringIO()
    df.toPandas().info(buf=buffer, verbose=True)
    s = buffer.getvalue()
    f = open(metadata_file, "a", encoding="utf-8")
    f.write("#"*20)
    f.write("\n")
    f.write(s)
    f.close()

def export_data(data_items, years, export_dir, view):
    s, e = years
    for row in data_items.collect():
        org_col_name = row["Tiedot"]
        new_col_name = convert(org_col_name)
        all_cols = get_col_names(s, e, new_col_name)
        sql_query = "Select Alue as City, %s from %s where Tiedot='%s'" %(all_cols, view, org_col_name)
        print(sql_query)
        df = spark.sql(sql_query)
        export_file = "%s/%s" %(export_dir, new_col_name)
        df.write.option("header", True).csv(export_file)
        write_metadata(df, export_dir)
        print("Exporting: %s" %(export_file))


export_dir = "spark_to_csv_%f" %(time.time())
os.mkdir(export_dir)

data_items = spark.sql("select distinct Tiedot from general_view")
export_data(data_items, (2010, 2022), export_dir, "general_view")
data_items = spark.sql("select distinct Tiedot from population_view")
export_data(data_items, (2011, 2023), export_dir, "population_view")

export_file = "%s/forecast_data" %(export_dir)
pivot_df.write.option("header", True).csv(export_file)
write_metadata(pivot_df, export_dir)
print("Exporting: %s" %(export_file))



    
    

Select Alue as City, Y2010 as 2010_taloudet_yhteensa_te, Y2011 as 2011_taloudet_yhteensa_te, Y2012 as 2012_taloudet_yhteensa_te, Y2013 as 2013_taloudet_yhteensa_te, Y2014 as 2014_taloudet_yhteensa_te, Y2015 as 2015_taloudet_yhteensa_te, Y2016 as 2016_taloudet_yhteensa_te, Y2017 as 2017_taloudet_yhteensa_te, Y2018 as 2018_taloudet_yhteensa_te, Y2019 as 2019_taloudet_yhteensa_te, Y2020 as 2020_taloudet_yhteensa_te, Y2021 as 2021_taloudet_yhteensa_te from general_view where Tiedot='Taloudet yhteensä (TE)'
Exporting: spark_to_csv_1696257401.110592/taloudet_yhteensa_te
Select Alue as City, Y2010 as 2010_asukkaat_yhteensa_he, Y2011 as 2011_asukkaat_yhteensa_he, Y2012 as 2012_asukkaat_yhteensa_he, Y2013 as 2013_asukkaat_yhteensa_he, Y2014 as 2014_asukkaat_yhteensa_he, Y2015 as 2015_asukkaat_yhteensa_he, Y2016 as 2016_asukkaat_yhteensa_he, Y2017 as 2017_asukkaat_yhteensa_he, Y2018 as 2018_asukkaat_yhteensa_he, Y2019 as 2019_asukkaat_yhteensa_he, Y2020 as 2020_asukkaat_yhteensa_he, Y2021 as 202