In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [3]:
from collections import namedtuple
from operator import itemgetter
import pandas as pd

Occupazione
===
Condizione professionale o non professionale della popolazione residente

In [7]:
def parse_occupazione(sc,filepath):
    rawRdd = sc.textFile(filepath)
    header = rawRdd.first()

    # Togliamo l'header
    rawRdd = rawRdd.filter(lambda x: x != header)
    return rawRdd.map(lambda x: x.split("|")).filter(lambda x: 
          x[0].find("I") == -1 and 
          x[3] == '"RESPOP_AV"' and #RESIDENTI
          x[6] == '"9"' and #SESSO
          x[9] == '"Y_GE15"' and #ETA
          x[12] == '"99"' and #STATO CIVILE                                            
          x[15] == '"TOTAL"' and #CITTADINANZA
          x[18] == '"ALL"' and #ISO1
          x[21] == '"99"' and  #TITOLO_STUDIO           
          x[24] == '"ALL"' #FORMAZIONE
        )

rddOccupazione = parse_occupazione(sc,"dataset/ISTAT/occupazione.csv")

In [8]:
lmbd_f = lambda x: (int(x[0][1:-1]),int(x[36]))

rddForzaLavoro = rddOccupazione.filter(lambda x: x[27] == '"14"').map(lmbd_f)
rddTotale = rddOccupazione.filter(lambda x: x[27] == '"99"').map(lmbd_f)
rddOccupato = rddOccupazione.filter(lambda x: x[27] == '"1"').map(lmbd_f)

istat = namedtuple("forza_lavoro",["codice_istat","totale","occupato","forza_lavoro"])
rddFinal = rddTotale.join(rddOccupato).join(rddForzaLavoro).map(lambda x: 
         istat(
            codice_istat = x[0],
            totale = x[1][0][0],
            occupato = x[1][0][1],
            forza_lavoro = x[1][1])
          )

In [9]:
# 48017 -> Firenze
rddFinal.filter(lambda x: x[0] == 48017).take(1)

[forza_lavoro(codice_istat=48017, totale=315401, occupato=154105, forza_lavoro=165358)]

In [10]:
# % forza lavoro = forza lavoro / totale
# % occupazione = occupato / forza lavoro
istat_indici = namedtuple("istat",["codice_istat","forza_lavoro","occupazione"])
rddIndici = rddFinal.map(lambda x: istat_indici(
    codice_istat = x.codice_istat,
    forza_lavoro = x.forza_lavoro / x.totale,
    occupazione = x.occupato / x.forza_lavoro))
rddIndici.take(10)

[istat(codice_istat=23040, forza_lavoro=0.5705852417302799, occupazione=0.9414912593649661),
 istat(codice_istat=65025, forza_lavoro=0.48482266482053465, occupazione=0.8455623901581723),
 istat(codice_istat=107010, forza_lavoro=0.4967897271268058, occupazione=0.7108239095315024),
 istat(codice_istat=60075, forza_lavoro=0.46562067374250116, occupazione=0.7948463825569871),
 istat(codice_istat=24075, forza_lavoro=0.5339985218033999, occupazione=0.9501730103806229),
 istat(codice_istat=66060, forza_lavoro=0.39059304703476483, occupazione=0.9476439790575916),
 istat(codice_istat=85005, forza_lavoro=0.3861490031479538, occupazione=0.7563405797101449),
 istat(codice_istat=6165, forza_lavoro=0.4411473788328388, occupazione=0.9394618834080718),
 istat(codice_istat=2070, forza_lavoro=0.5666003976143141, occupazione=0.9122807017543859),
 istat(codice_istat=21015, forza_lavoro=0.6256057526965765, occupazione=0.9782608695652174)]

In [11]:
collected_rdd = rddIndici.collect()
df = pd.DataFrame.from_records(collected_rdd,columns = istat_indici._fields)
df.to_csv('dataset/output/occupazione_processed.csv', sep = ",",index = False)

Reddito medio
===
Dichiarazioni IRPEF 2015 - Anno d'imposta 2014

http://www1.finanze.gov.it/finanze2/analisi_stat/index.php?tree=2015

In [92]:
import random
# Per questioni di privacy non possono essere comunicati valori con frequenza inferiore a 4. 
# Soluzione: lo tiriamo a caso nel range 1-4 (se è zero, viene scritto 0)
def prepare_gini_list(list_of_values):
    return list(map(lambda x: int(x) if x != '' else random.randint(1,4), list_of_values))

def gini_index(list_of_values):
    sorted_list = sorted(list_of_values)
    height, area = 0, 0
    for value in sorted_list:
        height += value
        area += height - value / 2.
    fair_area = height * len(list_of_values) / 2.
    return (fair_area - area) / fair_area

In [93]:
ipref = namedtuple("irpef",['codice_istat','reddito_medio','gini_index'])
def process_ipref(x):
    gini_list = prepare_gini_list([x[30],x[32],x[34],x[36],x[38],x[40],x[42],x[44]])
    
    return ipref(
        codice_istat = int(x[2]),
        reddito_medio = int(int(x[23])/int(x[22])),
        gini_index = gini_index(gini_list)
    )

def parse_ipref(sc,filepath):
    rawRdd = sc.textFile(filepath)
    header = rawRdd.first()

    # Togliamo l'header
    rawRdd = rawRdd.filter(lambda x: x != header)
    return rawRdd.map(lambda x: x.split(";")).map(lambda x: process_ipref(x))

rddIpref = parse_ipref(sc,"dataset/ISTAT/IRPEF_2014.csv")

In [94]:
rddIpref.take(1)

[irpef(codice_istat=28001, reddito_medio=22634, gini_index=0.535505347032921)]

In [95]:
collected_rdd = rddIpref.collect()
df = pd.DataFrame.from_records(collected_rdd,columns = ipref._fields)
df.to_csv('dataset/output/ipref_processed.csv', sep = ",",index = False)

Istruzione
===
Indice di possesso del diploma di scuola secondaria di 2°grado (19+ YE)

In [4]:
istruzione = namedtuple("istruzione", ['codice_istat','indice_istruzione'])

def do_parse_istruzione(x):
    return istruzione(
        codice_istat = int(x[0][1:-1]),
        indice_istruzione = float(x[15])
        )

def parse_istruzione(sc,filepath):
    rawRdd = sc.textFile(filepath)
    header = rawRdd.first()

    # Togliamo l'header
    rawRdd = rawRdd.filter(lambda x: x != header)
    return rawRdd.map(lambda x: x.split("|")).filter(lambda x: 
           x[3] == '"EDU_RATE_YGE19"' and 
           x[6] == '"9"' and 
           x[0].find("IT") == -1
           ).map(lambda x: do_parse_istruzione(x))

rddIstruzione = parse_istruzione(sc,"dataset/ISTAT/istruzione.csv")

In [5]:
rddIstruzione.take(5)

[istruzione(codice_istat=98001, indice_istruzione=31.02),
 istruzione(codice_istat=15002, indice_istruzione=40.75),
 istruzione(codice_istat=87001, indice_istruzione=49.07),
 istruzione(codice_istat=43001, indice_istruzione=36.54),
 istruzione(codice_istat=55001, indice_istruzione=39.59)]

In [6]:
collected_rdd = rddIstruzione.collect()
df = pd.DataFrame.from_records(collected_rdd,columns = istruzione._fields)
df.to_csv('dataset/output/istruzione_processed.csv', sep = ",",index = False)