### Import CICO data from .csv to MongoDB documents (short version)
#### CICO data: https://www.cliclavoro.gov.it/Barometro-Del-Lavoro/Pagine/Microdati-per-la-ricerca.aspx

In [2]:
# Required packages
import pandas as pd
import numpy as np
import json
import os
import pymongo

In [3]:
# Import data (.csv format)
df = pd.read_csv('C:/Users/Nicola Caravaggio/OneDrive/Desktop/Ciu_Tos_Roma3/CICO/data_cico.csv')
print ('Data correctly imported')
# to import all as string, add ", dtype = str" after the path

Data correctly imported


In [4]:
# Convert strings to date format
df['rapporto_datainizio'] =  pd.to_datetime(df['rapporto_datainizio'], format='%d%b%Y')
df['dtfineprevista'] =  pd.to_datetime(df['dtfineprevista'], format='%d%b%Y')
df['dttrasformazione'] =  pd.to_datetime(df['dttrasformazione'], format='%d%b%Y')
df['dtcessazioneeffettiva'] =  pd.to_datetime(df['dtcessazioneeffettiva'], format='%d%b%Y')
df['rapporto_datainizio'] = df['rapporto_datainizio'].dt.strftime('%Y-%m-%d %H:%M:%S')
df['dtfineprevista'] = df['dtfineprevista'].dt.strftime('%Y-%m-%d %H:%M:%S')
df['dttrasformazione'] = df['dttrasformazione'].dt.strftime('%Y-%m-%d %H:%M:%S')
df['dtcessazioneeffettiva'] = df['dtcessazioneeffettiva'].dt.strftime('%Y-%m-%d %H:%M:%S')

In [5]:
# Show first 5 rows of the dataframe
df.head()

Unnamed: 0,cfdatore_crip,cflavoratore_crip,rapporto_datainizio,annoattivazione,annonascita,codgenere,regione_nascita,codcittadinanza,codtitolostudio,codregionedomicilio,...,codccnl,codagevolazione,idsociolavoratore,dtcessazioneeffettiva,codmotivocessazioneco,dtfineprevista,dttrasformazione,codtipotrasformazione,numeroproroghe,coef_uni
0,48D49BDAC8D8F2B2D63A6122F2B4D332,4C30B254455CDB5FF6E53857746D633E,2017-07-06 00:00:00,2017,1953,Femmina,Campania,Italia,20,Campania,...,235,999,No,2017-07-10 00:00:00,254,2017-07-10 00:00:00,NaT,,0,7.604166
1,90FF9B696EC2BD0419792F33FA3EBCEE,4CC0E3D4A897A6661D35D4D30D07F57A,2011-12-01 00:00:00,2011,1988,Maschio,Sicilia,Italia,70,Sicilia,...,999,999,No,2012-03-31 00:00:00,254,2012-03-31 00:00:00,NaT,,0,7.625


In [6]:
# Check columns' types
df.info(verbose = True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 26 columns):
cfdatore_crip                2 non-null object
cflavoratore_crip            2 non-null object
rapporto_datainizio          2 non-null object
annoattivazione              2 non-null int64
annonascita                  2 non-null int64
codgenere                    2 non-null object
regione_nascita              2 non-null object
codcittadinanza              2 non-null object
codtitolostudio              2 non-null int64
codregionedomicilio          2 non-null object
codprovincialavoro           2 non-null int64
codregionelavoro             2 non-null object
codsettore                   2 non-null object
codtipocontratto             2 non-null int64
codtipoorario                2 non-null object
codqualificaprofessionale    2 non-null object
codccnl                      2 non-null int64
codagevolazione              2 non-null int64
idsociolavoratore            2 non-null object
dtcessazionee

In [7]:
# Convert int64 (not supported in MongoDB) into float64
df["annoattivazione"] = df["annoattivazione"].astype(np.float64)
df["annonascita"] = df["annonascita"].astype(np.float64)
df["codtitolostudio"] = df["codtitolostudio"].astype(np.float64)
df["codprovincialavoro"] = df["codprovincialavoro"].astype(np.float64)
df["codtipocontratto"] = df["codtipocontratto"].astype(np.float64)
df["codccnl"] = df["codccnl"].astype(np.float64)
df["codagevolazione"] = df["codagevolazione"].astype(np.float64)
df["numeroproroghe"] = df["numeroproroghe"].astype(np.float64)

In [8]:
# Descriptive statistics
df.describe()

Unnamed: 0,annoattivazione,annonascita,codtitolostudio,codprovincialavoro,codtipocontratto,codccnl,codagevolazione,codmotivocessazioneco,codtipotrasformazione,numeroproroghe,coef_uni
count,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,0.0,2.0,2.0
mean,2014.0,1970.5,45.0,72.5,10.5,617.0,999.0,254.0,,0.0,7.614583
std,4.242641,24.748737,35.355339,13.435029,12.020815,540.229581,0.0,0.0,,0.0,0.014732
min,2011.0,1953.0,20.0,63.0,2.0,235.0,999.0,254.0,,0.0,7.604166
25%,2012.5,1961.75,32.5,67.75,6.25,426.0,999.0,254.0,,0.0,7.609375
50%,2014.0,1970.5,45.0,72.5,10.5,617.0,999.0,254.0,,0.0,7.614583
75%,2015.5,1979.25,57.5,77.25,14.75,808.0,999.0,254.0,,0.0,7.619792
max,2017.0,1988.0,70.0,82.0,19.0,999.0,999.0,254.0,,0.0,7.625


In [9]:
# Generate a subset of DataFrames clustered based on the initial values of 'cflavoratore_crip'
df_A = df[df['cflavoratore_crip'].str.match('A')]
df_B = df[df['cflavoratore_crip'].str.match('B')]
df_C = df[df['cflavoratore_crip'].str.match('C')]
df_D = df[df['cflavoratore_crip'].str.match('D')]
df_E = df[df['cflavoratore_crip'].str.match('E')]
df_F = df[df['cflavoratore_crip'].str.match('F')]
df_G = df[df['cflavoratore_crip'].str.match('G')]
df_H = df[df['cflavoratore_crip'].str.match('H')]
df_I = df[df['cflavoratore_crip'].str.match('I')]
df_J = df[df['cflavoratore_crip'].str.match('J')]
df_K = df[df['cflavoratore_crip'].str.match('K')]
df_L = df[df['cflavoratore_crip'].str.match('L')]
df_M = df[df['cflavoratore_crip'].str.match('M')]
df_N = df[df['cflavoratore_crip'].str.match('N')]
df_O = df[df['cflavoratore_crip'].str.match('O')]
df_P = df[df['cflavoratore_crip'].str.match('P')]
df_Q = df[df['cflavoratore_crip'].str.match('Q')]
df_R = df[df['cflavoratore_crip'].str.match('R')]
df_S = df[df['cflavoratore_crip'].str.match('S')]
df_T = df[df['cflavoratore_crip'].str.match('T')]
df_U = df[df['cflavoratore_crip'].str.match('U')]
df_V = df[df['cflavoratore_crip'].str.match('V')]
df_W = df[df['cflavoratore_crip'].str.match('W')]
df_X = df[df['cflavoratore_crip'].str.match('X')]
df_Y = df[df['cflavoratore_crip'].str.match('Y')]
df_Z = df[df['cflavoratore_crip'].str.match('Z')]
df_1 = df[df['cflavoratore_crip'].str.match('1')]
df_2 = df[df['cflavoratore_crip'].str.match('2')]
df_3 = df[df['cflavoratore_crip'].str.match('3')]
df_4 = df[df['cflavoratore_crip'].str.match('4')]
df_5 = df[df['cflavoratore_crip'].str.match('5')]
df_6 = df[df['cflavoratore_crip'].str.match('6')]
df_7 = df[df['cflavoratore_crip'].str.match('7')]
df_8 = df[df['cflavoratore_crip'].str.match('8')]
df_9 = df[df['cflavoratore_crip'].str.match('9')]

In [10]:
# Eliminate empty DataFrames  
df_names = ['df_A','df_B','df_C','df_D','df_E','df_F','df_G','df_H','df_I',
            'df_J','df_K','df_L','df_M','df_N','df_O','df_P','df_Q','df_R',
            'df_S','df_T','df_U','df_V','df_W','df_X','df_Y','df_Z',
            'df_1','df_2','df_3','df_4','df_5','df_6','df_7','df_8','df_9']
for df_ in df_names:
    if locals()[df_].empty:
        del locals()[df_]

In [11]:
# Remaining DataFrame      
alldfs = [var for var in dir() if isinstance(eval(var), pd.core.frame.DataFrame)]
for i in alldfs:
    if i[:1] != '_':
        print (i)

df
df_4


In [13]:
# Connect to MongoDB and clean existing collections
# -------------------------------------------------
client = pymongo.MongoClient("mongodb://localhost:27017/")

db = client["cico"]

print(db.list_collection_names())
print("The collection does exist?")
print('datacico' in db.list_collection_names())    
collection = db['datacico']
collection.estimated_document_count() == 0
collection.drop()  

col = db['datacico']

['datacico', 'datacico2']
The collection does exist?
True


### Import data into MongoDB

In [14]:
# Select the directory from which derive the script
os.chdir("C:/Users/Nicola Caravaggio/OneDrive/Desktop/MongoDB/Scripts") 

try: 
    df = df_A
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_B
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_C
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_D
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_E
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_F
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_G
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_H
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_I
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_J
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_K
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_L
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_M
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_N
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_O
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_P
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_Q
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_R
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_S
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_T
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_U
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_V
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_W
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_X
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_Y
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_Z
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_1
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_2
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_3
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_4
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_5
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_6
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_7
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_8
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

try: 
    df = df_9
    exec(open("script_import_to_mongo.py").read());
except NameError: print("no var")

no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
no var
Number of inserted objects:
2
no var
no var
no var
no var
no var


In [15]:
# Modify strings into Date ISO format (into MongoDB)

client = pymongo.MongoClient('localhost', 27017)
db = client['cico']
datacico = db['datacico']
pipe = [{
        '$addFields': {
            'attivazioni': {
                '$map': {
                    'input': '$attivazioni', 
                    'as': 'att', 
                    'in': {  
                        'cfdatore_crip': '$$att.cfdatore_crip',
                        'rapporto_datainizio': 
                            { '$toDate': { '$substr': [ '$$att.rapporto_datainizio', 0, { '$subtract': [ {
                                '$strLenCP': '$$att.rapporto_datainizio' }, 1 ] } ] } },
                        'codregionedomicilio': '$$att.codregionedomicilio', 
                        'codregionelavoro': '$$att.codregionelavoro',
                        'codprovincialavoro': '$$att.codprovincialavoro', 
                        'codsettore': '$$att.codsettore', 
                        'codtipocontratto': '$$att.codtipocontratto', 
                        'codtipoorario': '$$att.codtipoorario', 
                        'codqualificaprofessionale': '$$att.codqualificaprofessionale', 
                        'codccnl': '$$att.codccnl', 
                        'codagevolazione': '$$att.codagevolazione',  
                        'idsociolavoratore': '$$att.idsociolavoratore',
                        'dtcessazioneeffettiva': 
                            { '$cond': [ { '$eq': [ "$$att.dtcessazioneeffettiva", "NaT" ] }, "NaT", 
                                { '$toDate': { '$substr': [ '$$att.dtcessazioneeffettiva', 0, { '$subtract': [ {
                                    '$strLenCP': '$$att.dtcessazioneeffettiva' }, 1 ] } ] } }
                                 ] },
                        'dtfineprevista': 
                            { '$cond': [ { '$eq': [ "$$att.dtfineprevista", "NaT" ] }, "NaT", 
                                { '$toDate': { '$substr': [ '$$att.dtfineprevista', 0, { '$subtract': [ {
                                    '$strLenCP': '$$att.dtfineprevista' }, 1 ] } ] } }
                                 ] },
                        'dttrasformazione': 
                            { '$cond': [ { '$eq': [ "$$att.dttrasformazione", "NaT" ] }, "NaT", 
                                { '$toDate': { '$substr': [ '$$att.dttrasformazione', 0, { '$subtract': [ {
                                    '$strLenCP': '$$att.dttrasformazione' }, 1 ] } ] } }
                                 ] },
                        'codtipotrasformazione': '$$att.codtipotrasformazione', 
                        'numeroproroghe': '$$att.numeroproroghe'
                    }
                }
            }
        }
    }, {
        '$out': 'datacico'
    }]

TestOutput = datacico.aggregate(pipeline = pipe)
client.close()
print(list(TestOutput))
print("Query correctly read")

[]
Query correctly read
