In [1]:
from pyspark.sql import SparkSession
import csv
import pandas as pd

spark = SparkSession.builder.master("local").appName("lopputyo").getOrCreate()

df = spark.read.csv("merged0_21.csv", header=True, inferSchema=True)
df.show(vertical=True)

df.registerTempTable("example")


-RECORD 0-----------------------------
 id                 | 0               
 aikatyyppi         | Vuosi           
 kuukausi_nro       | 12              
 vuosikuukausi      | 200312          
 vuosi              | 2003            
 kunta_nro          | 257             
 kunta_nimi         | Kirkkonummi     
 ikaryhma           | 35-39           
 sukupuoli          | Nainen          
 etuus              | Työmarkkinatuki 
 korvausperuste     | Yhteensä        
 saaja_lkm          | 45              
 korvattu_paiva_lkm | 8277            
 maksettu_eur       | 215964.47       
-RECORD 1-----------------------------
 id                 | 1               
 aikatyyppi         | Vuosi           
 kuukausi_nro       | 12              
 vuosikuukausi      | 200312          
 vuosi              | 2003            
 kunta_nro          | 256             
 kunta_nimi         | Kinnula         
 ikaryhma           | 35-39           
 sukupuoli          | Nainen          
 etuus              | Työ



In [2]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- aikatyyppi: string (nullable = true)
 |-- kuukausi_nro: integer (nullable = true)
 |-- vuosikuukausi: integer (nullable = true)
 |-- vuosi: integer (nullable = true)
 |-- kunta_nro: integer (nullable = true)
 |-- kunta_nimi: string (nullable = true)
 |-- ikaryhma: string (nullable = true)
 |-- sukupuoli: string (nullable = true)
 |-- etuus: string (nullable = true)
 |-- korvausperuste: string (nullable = true)
 |-- saaja_lkm: integer (nullable = true)
 |-- korvattu_paiva_lkm: integer (nullable = true)
 |-- maksettu_eur: double (nullable = true)



In [24]:
df.groupBy('etuus').count().orderBy('count',ascending=False).show()
df.groupBy('korvausperuste').count().orderBy('count',ascending=False).show(50)

+--------------------+-------+
|               etuus|  count|
+--------------------+-------+
|            Yhteensä|2915766|
|Työmarkkinatukiet...|2756622|
|     Työmarkkinatuki|2719127|
|      Peruspäiväraha|1010349|
|      Kotoutumistuki|  75271|
|      Yhdistelmätuki|  55317|
+--------------------+-------+



In [30]:
new_df = df.withColumn("maksu_per_hlo", (df["maksettu_eur"]/df["saaja_lkm"])/12).orderBy('id',ascending=False).filter((df["aikatyyppi"] == "Vuosi") & (df["korvausperuste"] == "Työttömyysajalta"))
new_df = new_df.filter(new_df["maksettu_eur"] > 0)
new_df.show()


+-------+----------+------------+-------------+-----+---------+-----------+--------+---------+--------------+----------------+---------+------------------+------------+------------------+
|     id|aikatyyppi|kuukausi_nro|vuosikuukausi|vuosi|kunta_nro| kunta_nimi|ikaryhma|sukupuoli|         etuus|  korvausperuste|saaja_lkm|korvattu_paiva_lkm|maksettu_eur|     maksu_per_hlo|
+-------+----------+------------+-------------+-----+---------+-----------+--------+---------+--------------+----------------+---------+------------------+------------+------------------+
|9355733|     Vuosi|          12|       202212| 2022|      678|      Raahe|   45-49|   Nainen|Peruspäiväraha|Työttömyysajalta|       13|              1007|    32151.64| 206.1002564102564|
|9355732|     Vuosi|          12|       202212| 2022|      680|     Raisio|   45-49|   Nainen|Peruspäiväraha|Työttömyysajalta|       13|              1521|    44221.29| 283.4698076923077|
|9355731|     Vuosi|          12|       202212| 2022|      6

In [26]:
new_df.select("saaja_lkm", "maksettu_eur", "maksu_per_hlo").describe().show()

+-------+------------------+-----------------+--------------------+
|summary|         saaja_lkm|     maksettu_eur|       maksu_per_hlo|
+-------+------------------+-----------------+--------------------+
|  count|           1978652|          1978652|             1978652|
|   mean|27.794614212099955|111642.4409874503|    287.226645767111|
| stddev| 95.24945749808242|471120.7129070449|  149.27381797607882|
|    min|                 1|             0.05|8.333333333333334E-4|
|    max|              5931|    2.597016758E7|  1415.9533333333331|
+-------+------------------+-----------------+--------------------+



In [6]:
new_df.select("kuukausi_nro", "maksu_per_hlo").groupBy('kuukausi_nro').avg().orderBy("kuukausi_nro").show()

+------------+-----------------+------------------+
|kuukausi_nro|avg(kuukausi_nro)|avg(maksu_per_hlo)|
+------------+-----------------+------------------+
|          12|             12.0| 277.5315352094855|
+------------+-----------------+------------------+



In [31]:
new_df.select("ikaryhma", "maksu_per_hlo").groupBy('ikaryhma').avg().orderBy("ikaryhma").show()

+-------------+------------------+
|     ikaryhma|avg(maksu_per_hlo)|
+-------------+------------------+
|        17-19| 93.42080871439161|
|        20-24|151.48152544101694|
|        25-29|234.28955195404185|
|        30-34|284.85334953572874|
|        35-39| 309.4914551153294|
|        40-44|322.18938807476997|
|        45-49|324.93702899782664|
|        50-54|321.71786386443415|
|        55-59| 337.1820575132441|
|        60-64| 371.9260410009473|
|        65-67| 265.4385823729428|
|Tieto puuttuu|234.36835351374052|
+-------------+------------------+



In [32]:
new_df.select("vuosi", "maksu_per_hlo").groupBy('vuosi').avg().orderBy("vuosi").show(22)

+-----+----------+------------------+
|vuosi|avg(vuosi)|avg(maksu_per_hlo)|
+-----+----------+------------------+
| 2000|    2000.0| 214.3009272201384|
| 2001|    2001.0| 229.2741695158546|
| 2002|    2002.0|244.21413805947293|
| 2003|    2003.0| 245.6212348566159|
| 2004|    2004.0| 252.1342380520088|
| 2005|    2005.0| 236.6508998354385|
| 2006|    2006.0|223.59936649891227|
| 2007|    2007.0|207.57079499108855|
| 2008|    2008.0| 210.3546281077419|
| 2009|    2009.0|244.14535526845918|
| 2010|    2010.0|243.93056784037847|
| 2011|    2011.0|237.77987898728895|
| 2012|    2012.0| 293.4462309585595|
| 2013|    2013.0|  329.338275681236|
| 2014|    2014.0|337.64962810084745|
| 2015|    2015.0|343.41267682200146|
| 2016|    2016.0| 342.3772649574245|
| 2017|    2017.0| 327.6929111520672|
| 2018|    2018.0|305.05861028038544|
| 2019|    2019.0| 306.0715370954488|
| 2020|    2020.0| 338.3236944961862|
| 2021|    2021.0|371.34971417799005|
+-----+----------+------------------+
only showing

In [121]:
new_df.select("Kunta_nimi", "maksu_per_hlo").groupBy("Kunta_nimi").avg().orderBy("avg(maksu_per_hlo)", ascending=False).show(300)


+------------------+------------------+
|        Kunta_nimi|avg(maksu_per_hlo)|
+------------------+------------------+
|         Ylitornio| 816.7491616413304|
|             Luoto|   812.91044608515|
|            Oripää| 810.6171082227651|
|        Kauniainen| 808.9986567222611|
|     Uusikaarlepyy| 803.8866197301277|
|         Siikainen| 803.0896097624169|
|          Utajärvi| 801.2340629758378|
|          Marttila| 796.5370195550943|
|       Punkalaidun| 793.4276284923425|
|            Kerava| 791.2382816333173|
|             Perho| 790.6351208876331|
|             Hanko| 785.7905625278909|
|            Raisio| 785.3242139856061|
|           Nakkila| 783.7887981172889|
|        Tuntematon| 783.6719292385851|
|  Pedersören kunta| 782.8731303450713|
|         Raasepori| 781.4985503427436|
|        Nurmijärvi| 781.3491708837942|
|       Pietarsaari| 780.9747688739747|
|             Inkoo| 780.8041376523386|
|          Pomarkku| 780.6214664157011|
|             Ypäjä| 779.4557838697059|


In [122]:
data_for_json = new_df.select("Kunta_nro", "maksu_per_hlo").groupBy("Kunta_nro").avg().orderBy("avg(Kunta_nro)", ascending=True)
data_for_json.show()


+---------+--------------+------------------+
|Kunta_nro|avg(Kunta_nro)|avg(maksu_per_hlo)|
+---------+--------------+------------------+
|        5|           5.0| 738.1062025981265|
|        9|           9.0| 727.9863972345393|
|       10|          10.0| 724.4748109763534|
|       16|          16.0| 738.7130065449091|
|       18|          18.0| 762.2738952728189|
|       19|          19.0| 754.4751288407152|
|       20|          20.0| 755.9141478496157|
|       46|          46.0| 751.8476794519348|
|       47|          47.0| 773.8661063921106|
|       49|          49.0| 621.0150824867189|
|       50|          50.0| 761.0217238217496|
|       51|          51.0| 769.7265981401238|
|       52|          52.0| 739.0193623854537|
|       61|          61.0| 775.3656286398626|
|       69|          69.0| 754.1616488032081|
|       71|          71.0| 711.5508237623047|
|       72|          72.0| 736.6703001983361|
|       74|          74.0| 753.0327456299973|
|       75|          75.0| 743.124

In [133]:
import json
with open("kunnat.json", "r") as kunnat:
    kuntajson = json.load(kunnat)
    

{'area005': '123', 'area009': 'Alavieska', 'area010': 'Alavus', 'area016': 'Asikkala', 'area018': 'Askola', 'area019': 'Aura', 'area020': 'Akaa', 'area035': 'BrÃ¤ndÃ¶', 'area043': 'EckerÃ¶', 'area046': 'Enonkoski', 'area047': 'EnontekiÃ¶', 'area049': 'Espoo', 'area050': 'Eura', 'area051': 'Eurajoki', 'area052': 'EvijÃ¤rvi', 'area060': 'FinstrÃ¶m', 'area061': 'Forssa', 'area062': 'FÃ¶glÃ¶', 'area065': 'Geta', 'area069': 'HaapajÃ¤rvi', 'area071': 'Haapavesi', 'area072': 'Hailuoto', 'area074': 'Halsua', 'area075': 'Hamina', 'area076': 'Hammarland', 'area077': 'Hankasalmi', 'area078': 'Hanko', 'area079': 'Harjavalta', 'area081': 'Hartola', 'area082': 'Hattula', 'area086': 'HausjÃ¤rvi', 'area090': 'HeinÃ¤vesi', 'area091': 'Helsinki', 'area092': 'Vantaa', 'area097': 'Hirvensalmi', 'area098': 'Hollola', 'area099': 'Honkajoki', 'area102': 'Huittinen', 'area103': 'Humppila', 'area105': 'Hyrynsalmi', 'area106': 'HyvinkÃ¤Ã¤', 'area108': 'HÃ¤meenkyrÃ¶', 'area109': 'HÃ¤meenlinna', 'area111': 'Heino

In [141]:



string = "{\n"
for row in data_for_json.rdd.collect():
    val1 = row["Kunta_nro"]
    val2 = row["avg(maksu_per_hlo)"]
    if f'area{val1:03d}' in kuntajson:
        string += f'"area{val1:03d}": "{val2:.2f}",\n'
string += "\n}"
print(string)


with open("kunnat2.json", "w") as kunnat2:
    json.dump(string, kunnat2)

{
"area005": "738.11",
"area009": "727.99",
"area010": "724.47",
"area016": "738.71",
"area018": "762.27",
"area019": "754.48",
"area020": "755.91",
"area046": "751.85",
"area047": "773.87",
"area049": "621.02",
"area050": "761.02",
"area051": "769.73",
"area052": "739.02",
"area061": "775.37",
"area069": "754.16",
"area071": "711.55",
"area072": "736.67",
"area074": "753.03",
"area075": "743.12",
"area077": "749.89",
"area078": "785.79",
"area079": "753.26",
"area081": "753.43",
"area082": "736.99",
"area086": "756.84",
"area090": "772.05",
"area091": "359.08",
"area092": "617.22",
"area097": "755.00",
"area098": "724.92",
"area099": "766.74",
"area102": "762.81",
"area103": "749.55",
"area105": "761.31",
"area106": "779.36",
"area108": "753.99",
"area109": "730.10",
"area111": "742.59",
"area139": "721.43",
"area140": "761.21",
"area142": "752.22",
"area143": "748.82",
"area145": "740.19",
"area146": "739.97",
"area148": "724.22",
"area149": "780.80",
"area151": "773.01",
"area152": 

In [144]:
jsondict = {}

for row in data_for_json.rdd.collect():
    val1 = row["Kunta_nro"]
    val2 = row["avg(maksu_per_hlo)"]
    #if f'area{val1:03d}' in kuntajson:
        #string += f'"area{val1:03d}": "{val2:.2f}",\n'
    jsondict[f'area{val1:03d}'] = f'{val2:.2f}'

with open("kunnat3.json", "w") as kunnat3:
    json.dump(jsondict, kunnat3)