
## Reformat openCoesione dataset with Comune as key

In [None]:
## Runs with "daf" enviroment in repo

%config Completer.use_jedi = False #fix TAB slowness with big frames

import numpy as np
import pandas as pd
import seaborn as sns
import glob
import gc #garbage collector
from matplotlib import pyplot as plt
plt.rcParams["figure.figsize"] = (20,12)

columnsToDrop = {
    'soggetti': ['INDIRIZZO_SOGG','SOGG_DESCR_RUOLO'],
    'progetti':['OC_LINK'],
    'localizzazioni':['INDIRIZZO_PROG'],
    'pagamenti':[]}

def load_and_stack_data(prefix):
    fileList = glob.glob('./../pac_opencoesione/%s*.csv' % prefix)
    
    if prefix == 'progetti':
        dtypesDict = pd.read_csv(
            './openCoesione_dtypes.csv', index_col='Variabile', squeeze=True).to_dict()
        dateCols = [name for name in dtypesDict.keys() if dtypesDict[name] == 'datetime64']
        for name in dateCols: del dtypesDict[name]
        frames = list(map(lambda f: pd.read_csv(
            f, sep=';', dtype=dtypesDict, index_col=0, na_values=[' ', '  ']), fileList))
        data = pd.concat(frames, axis=0)
        data[dateCols] = data[dateCols].apply(lambda x: pd.to_datetime(x, infer_datetime_format=True))
        
        # drop duplicates including index when checking
        indexColName = data.index.name
        data.reset_index(inplace=True)
        data.drop_duplicates(inplace=True)
        data.set_index(indexColName, inplace=True)
        
        # clean remaining duplicate indexes by picking the row that has fewer N/A
        bDuplicate = data.index.duplicated(keep=False)
        if any(bDuplicate):
            print('%i duplicate indexes found' % sum(bDuplicate))
            duplRows = data[bDuplicate].copy()
            duplRows['NumberNA'] = duplRows.isna().sum(axis=1)
            duplRows.sort_values('NumberNA', inplace=True)
            # recomibine into final frame
            data = pd.concat([data[~bDuplicate], duplRows[duplRows.index.duplicated(keep='first')]])
    
    elif prefix == 'pagamenti':
        # pagamenti has many rows with no money amount. Drop them.
        frames = list(map(lambda f: pd.read_csv(f, sep=';', index_col=False), fileList))
        data = pd.concat(frames, axis=0)
        
        # drop duplicates excluding index
        data.drop_duplicates(inplace=True)
        data = data[~np.all(data.select_dtypes(float) == 0, axis=1)]
         
    else:
        # infer types for other datasets (temporary)
        frames = list(map(lambda f: pd.read_csv(f, sep=';', index_col=False, na_values=[' ', '  '], 
                                                dtype={'COD_COMUNE_SEDE_SOGG':float}), fileList))
        data = pd.concat(frames, axis=0)
        
        # drop duplicates excluding index
        data.drop_duplicates(inplace=True)
    
    data.drop(columns = columnsToDrop[prefix], inplace=True)
    print('Imported %s' % prefix)
    return data



In [None]:
# load datasets
progetti = load_and_stack_data('progetti')
luoghi = load_and_stack_data('localizzazioni')
soggetti = load_and_stack_data('soggetti')

# convert Comune fields:
# soggetti
soggetti.loc[soggetti['COD_COMUNE_SEDE_SOGG'].isna(), 'COD_COMUNE_SEDE_SOGG'] = -1
soggetti['COD_COMUNE_SEDE_SOGG'] = soggetti['COD_COMUNE_SEDE_SOGG'].astype(int) 
# luoghi
padZerosShort = np.vectorize(lambda x: str(x).rjust(3, '0'))
luoghi['COD_COMUNE_LUOGO'] = (luoghi.COD_REGIONE.astype('str') + 
                              padZerosShort(luoghi.COD_PROVINCIA) + padZerosShort(luoghi.COD_COMUNE)).astype(int)

gc.collect()


#### Caricamento anagrafica comuni

In [None]:
comuniAnagrafica = pd.read_csv('../ElencoComuniAttuali_20170918.csv', sep=';')
comuniAnagrafica.set_index('Codice Istat', inplace=True)
comuniAnagrafica = comuniAnagrafica[~comuniAnagrafica.index.isnull()]
#print(sum(comuniAnagrafica.index.isnull()))
assert not any(comuniAnagrafica.index.duplicated()), 'Duplicate Istat codes'

convTable = pd.read_csv('../provToReg.csv', index_col='Sigla automobilistica', dtype='str')

comuniAnagrafica = comuniAnagrafica.join(convTable, on='Sigla Provincia')

In [None]:
formattedValues = comuniAnagrafica.index.values.astype('int')
padZeros = np.vectorize(lambda x: str(x).rjust(6, '0'))
comuniAnagrafica['Codice Istat Lungo'] = (comuniAnagrafica['Codice regione'] + padZeros(formattedValues)).astype(int)


#### Gestione progetti con piu' luoghi / soggetti

In [None]:
numeroSoggetti = soggetti[soggetti.SOGG_COD_RUOLO == 2].groupby('COD_LOCALE_PROGETTO').size()
bProgettoMoltiSoggetti = progetti.index.isin(numeroSoggetti.index[numeroSoggetti > 1])


In [None]:
numeroLuoghi = luoghi[luoghi.OC_FLAG_CAP_PROG==1].groupby('COD_LOCALE_PROGETTO').size()

bProgettoMoltiLuoghi = progetti.index.isin(numeroLuoghi.index[numeroLuoghi > 1])

In [None]:
bProgettoNonMappabile = bProgettoMoltiLuoghi | bProgettoMoltiSoggetti

print('---CONSIDERATI PROGETTI con solo un luogo e un soggetto attuatore. Esclusi EUR %.2f' %
      sum(progetti.TOT_PAGAMENTI[bProgettoMoltiLuoghi | bProgettoMoltiSoggetti]))
progettiMap = progetti[~bProgettoNonMappabile]


## join: pagamenti + (progetti con altre variabili uniche, soggetti attuatori e luoghi)

In [None]:
# Step 1: JOIN soggetti - progettiMap index
datiUniti = progettiMap[['CUP','TOT_PAGAMENTI']].join(soggetti[soggetti.SOGG_COD_RUOLO == 2].set_index('COD_LOCALE_PROGETTO'))
firstJoinTotPagamenti = datiUniti.TOT_PAGAMENTI.sum()

bMatch = datiUniti['COD_COMUNE_SEDE_SOGG'].isin(comuniAnagrafica['Codice Istat Lungo'].values)
bMatch = bMatch | (datiUniti['COD_COMUNE_SEDE_SOGG'] == -1)

print('---CONSIDERATI PROGETTI con comune soggetto che corrisponde ad anagrafica. Restano scollegati EUR %.6g' %
      datiUniti[~bMatch]['TOT_PAGAMENTI'].sum())

del soggetti
gc.collect()


In [None]:
# Step 2: JOIN previous union - luoghi
datiUniti = datiUniti.join(luoghi[luoghi.OC_FLAG_CAP_PROG==1].set_index('COD_LOCALE_PROGETTO'))
assert datiUniti.TOT_PAGAMENTI.sum() == firstJoinTotPagamenti, 'Inconsistent join'

bMatchLuogo = datiUniti['COD_COMUNE_LUOGO'].isin(comuniAnagrafica['Codice Istat Lungo'].values)

print('---CONSIDERATI PROGETTI con assegnazione a livello comunale che corrisponde ad anagrafica. Restano scollegati EUR %.6g' %
      datiUniti[~bMatchLuogo]['TOT_PAGAMENTI'].sum())

del luoghi
gc.collect()


In [None]:
# Step 3: JOIN pagamenti-progetti // fill pagamenti data with the above info

bTriggerJoinPagamenti = False
bTriggerJoinPagamenti = True
if bTriggerJoinPagamenti:
    # check pagamenti sums
    pagamenti = load_and_stack_data('pagamenti')
    origSum = pagamenti['TOT_PAGAMENTI'].sum() # ok, around 60B

    # do the join
    datiUnitiPagamenti = datiUniti.join(pagamenti.set_index('COD_LOCALE_PROGETTO'),  lsuffix='_summary')
    del pagamenti
    gc.collect()

    # Finally recheck the sums!
    assert 0 == datiUnitiPagamenti.TOT_PAGAMENTI.sum().round() - firstJoinTotPagamenti.round(), 'Bad pagamenti data'
    datiUnitiPagamenti.to_csv('../pac_opencoesione/pagamentiSoggettiLuoghi.csv')
else:
    datiUniti.to_csv('../pac_opencoesione/SoggettiELuoghi.csv')

In [None]:
# Summary on beneficiary types
datiUniti.groupby('COD_COMUNE_SEDE_SOGG')['COD_COMUNE_LUOGO'].nunique().sort_values(ascending=False)