# Environment Setting

In [1]:
import boto3

import pandas as pd
import numpy as np
import os


import warnings
warnings.filterwarnings('ignore')

In [3]:
%load_ext autoreload
%autoreload 2

import sys

sys.path.append("..")

from src.utils import S3Utils, athena_to_pandas


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### Process Census Features

In [140]:
class TaxDeclarationsDataComuni(object):
    
    
    def pipe_data(self):
        
        print('Converting Data on Tax Declarations Comuni')
        s3_bucket = 'gimi-data'
        s3_in_directory = 'in/italy/tax-declarations/tax-declarations-comuni/2020/'
        s3_out_directory = 'out/italy/tax-declarations/tax-declarations-comuni/2020/'
        region = 'eu-south-1'
        
        self.pipe_tax_declarations(
            s3_bucket, s3_in_directory, s3_out_directory, region)
        print("---- End ----")
        
        return None
    
    
    def pipe_tax_declarations(
        self, s3_bucket, s3_in_directory, s3_out_directory, region):

        # Input files
        S3 = S3Utils(s3_bucket, region = region)
        paths = S3.bucket_content_keys(directory = s3_in_directory)

        for path in paths[1:]:
            print(path)

            # Download and unzip file
            fname = path.split('/')[-1]
            print(fname)
            S3.download_file(path, fname)

            # Read file from local
            temp = pd.read_csv(
                fname, sep = ';', encoding = 'iso-8859-1')
            temp = temp.reset_index()
            
            # Fix columns import issue from Csv
            cols = temp.columns
            temp = temp.reset_index()
            temp.columns = list(cols) + ['drop'] 
            temp = temp.iloc[:, :50]
            self.df = temp

            # Create output parquet file
            fname_final = fname.split('.')[0]
            fname_parquet = "{}.parquet".format(fname_final)
            print(fname_parquet)
            temp.to_parquet(fname_parquet)

            # Save to S3
            S3.upload_file(fname_parquet, os.path.join(s3_out_directory, fname_parquet))

            # Clean up folder
            os.remove(fname)
            os.remove(fname_parquet)
            
            
            
class TaxDeclarationsDataComuniSub(object):
    
    
    def pipe_data(self):
        
        print('Converting Data on Tax Declarations Comuni')
        s3_bucket = 'gimi-data'
        s3_in_directory = 'in/italy/tax-declarations/tax-declarations-comuni-sub/2020/'
        s3_out_directory = 'out/italy/tax-declarations/tax-declarations-comuni-sub/2020/'
        region = 'eu-south-1'
        
        self.pipe_tax_declarations(
            s3_bucket, s3_in_directory, s3_out_directory, region)
        print("---- End ----")
        
        return None
    
    
    def pipe_tax_declarations(
        self, s3_bucket, s3_in_directory, s3_out_directory, region):

        # Input files
        S3 = S3Utils(s3_bucket, region = region)
        paths = S3.bucket_content_keys(directory = s3_in_directory)

        for path in paths[1:]:
            print(path)

            # Download and unzip file
            fname = path.split('/')[-1]
            print(fname)
            S3.download_file(path, fname)

            # Read file from local
            temp = pd.read_csv(fname, sep = ';', encoding = 'iso-8859-1')
            self.df = temp
            
            # Fix columns import issue from Csv
            cols = temp.columns
            temp = temp.reset_index()
            temp.columns = list(cols) + ['drop'] 
            temp = temp.iloc[:, :51]
            self.df = temp

            # Create output parquet file
            fname_final = fname.split('.')[0]
            fname_parquet = "{}.parquet".format(fname_final)
            temp.to_parquet(fname_parquet)

            # Save to S3
            S3.upload_file(fname_parquet, os.path.join(s3_out_directory, fname_parquet))

            # Clean up folder
            os.remove(fname)
            os.remove(fname_parquet)

In [141]:
cl = TaxDeclarationsDataComuni()
cl.pipe_data()

Converting Data on Tax Declarations Comuni
in/italy/tax-declarations/tax-declarations-comuni/2020/Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.csv
Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.csv
Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.parquet
---- End ----


In [142]:
cl = TaxDeclarationsDataComuniSub()
cl.pipe_data()

Converting Data on Tax Declarations Comuni
in/italy/tax-declarations/tax-declarations-comuni-sub/2020/Redditi_e_principali_variabili_IRPEF_su_base_subcomunale_CSV_2019.csv
Redditi_e_principali_variabili_IRPEF_su_base_subcomunale_CSV_2019.csv
---- End ----


In [50]:
TaxDeclarationsData().tax_declarations_comuni_sub()

Converting Data on Tax Declarations Comuni - Cap Level
in/italy/tax-declarations/tax-declarations-comuni-sub/2020/Redditi_e_principali_variabili_IRPEF_su_base_subcomunale_CSV_2019.csv
Redditi_e_principali_variabili_IRPEF_su_base_subcomunale_CSV_2019.csv
---- End ----


In [51]:
TaxDeclarationsData().tax_declarations_comuni()

Converting Data on Tax Declarations Comuni
in/italy/tax-declarations/tax-declarations-comuni/2020/Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.csv
Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.csv
---- End ----


In [1]:
! pip install awswrangler

Collecting awswrangler
  Downloading awswrangler-2.9.0-py3-none-any.whl (183 kB)
[K     |████████████████████████████████| 183 kB 21.1 MB/s eta 0:00:01
[?25hCollecting pymysql<1.1.0,>=0.9.0
  Downloading PyMySQL-1.0.2-py3-none-any.whl (43 kB)
[K     |████████████████████████████████| 43 kB 156 kB/s  eta 0:00:01
Collecting redshift-connector~=2.0.0
  Downloading redshift_connector-2.0.882-py3-none-any.whl (93 kB)
[K     |████████████████████████████████| 93 kB 1.9 MB/s  eta 0:00:01
Collecting pg8000<1.20.0,>=1.16.0
  Downloading pg8000-1.19.5-py3-none-any.whl (34 kB)
Collecting scramp==1.4.0
  Downloading scramp-1.4.0-py3-none-any.whl (8.4 kB)
Installing collected packages: scramp, redshift-connector, pymysql, pg8000, awswrangler
Successfully installed awswrangler-2.9.0 pg8000-1.19.5 pymysql-1.0.2 redshift-connector-2.0.882 scramp-1.4.0
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [3]:
import awswrangler as wr
import pandas as pd

In [7]:
comuni_path = 'Redditi_e_principali_variabili_IRPEF_su_base_comunale_CSV_2019.csv'
comuni = pd.read_csv(comuni_path, sep = ";")

In [8]:
comuni.head()

Unnamed: 0,Anno di imposta,Codice catastale,Codice Istat Comune,Denominazione Comune,Sigla Provincia,Regione,Codice Istat Regione,Numero contribuenti,Reddito da fabbricati - Frequenza,Reddito da fabbricati - Ammontare in euro,...,Reddito complessivo da 15000 a 26000 euro - Ammontare in euro,Reddito complessivo da 26000 a 55000 euro - Frequenza,Reddito complessivo da 26000 a 55000 euro - Ammontare in euro,Reddito complessivo da 55000 a 75000 euro - Frequenza,Reddito complessivo da 55000 a 75000 euro - Ammontare in euro,Reddito complessivo da 75000 a 120000 euro - Frequenza,Reddito complessivo da 75000 a 120000 euro - Ammontare in euro,Reddito complessivo oltre 120000 euro - Frequenza,Reddito complessivo oltre 120000 euro - Ammontare in euro,Unnamed: 21
2019,A001,28001,ABANO TERME,PD,Veneto,5,14954,7477.0,10358625.0,7847,...,3472.0,121942029.0,438.0,27886866.0,396.0,36808564.0,187.0,38228141.0,,
2019,A004,98001,ABBADIA CERRETO,LO,Lombardia,3,205,86.0,66213.0,102,...,46.0,1489603.0,,,,,0.0,0.0,,
2019,A005,97001,ABBADIA LARIANA,LC,Lombardia,3,2441,1193.0,1723810.0,1342,...,671.0,23138246.0,74.0,4698905.0,48.0,4328711.0,31.0,6397476.0,,
2019,A006,52001,ABBADIA SAN SALVATORE,SI,Toscana,9,4876,2635.0,2827574.0,2412,...,796.0,27215546.0,68.0,4342264.0,32.0,2960972.0,9.0,1435917.0,,
2019,A007,95001,ABBASANTA,OR,Sardegna,20,1731,875.0,766745.0,817,...,433.0,14540500.0,24.0,1572949.0,10.0,908389.0,,,,


In [11]:

wr.s3.to_parquet(
    df=comuni,
    path="s3://gimi-data/out/italy/tax-declarations/tax-declarations-comuni/",
    dataset=True,
    database="gimi",
    table="irpef-comuni",
)

TypeError: got an unexpected keyword argument 'region'