In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [163]:
!pip install psycopg2



In [182]:
import pandas as pd
import numpy as np
import psycopg2
from string import punctuation

ch_mapping = set(punctuation.replace("_",""))

py_sql_variable_mapping = {
                            "object" : "varchar",
                            "float64": "float",
                            "int64" : "bigint",
                            "datetime64[ns]" : "date"
                          }

delete_dict = dict()
create_table_dict = dict()
insert_dict = dict()

tables= ["calidad_aire", "dataset_viajes_sube"]

output_files_dict = {
                        "calidad_aire" : "processed_calidad-aire.csv",
                        "dataset_viajes_sube" : "processed_dataset_viajes_sube.csv"
                    }

# Proccessing air quality file

In [183]:
sql_table_name = "calidad-aire"
output_file_name = output_files_dict["calidad_aire"]

for ch in ch_mapping:
    sql_table_name = sql_table_name.lower().replace(" ", "_").replace("-", "_").replace(".", "_").replace(ch, "")
    
air_dataframe = pd.read_csv("calidad-aire.csv")

In [184]:
columns_aux = list()
for column in air_dataframe.columns:
    for ch in ch_mapping:
        column = column.lower().replace(" ", "_").replace(ch, "")
        
    columns_aux.append(column)
        
air_dataframe.columns = columns_aux

In [185]:
air_dataframe.fillna(-1, inplace = True)

In [186]:
air_dataframe.replace("(?i)s/d", -1, regex=True, inplace= True)

In [187]:
air_dataframe.loc[air_dataframe["co_centenario"].str.contains("<", na = False)] = air_dataframe["co_centenario"].replace("<", "", inplace = True)
air_dataframe.loc[air_dataframe["co_cordoba"].str.contains("<", na = False)] = air_dataframe["co_cordoba"].replace("<", "", inplace = True)
air_dataframe.loc[air_dataframe["co_la_boca"].str.contains("<", na = False)] = air_dataframe["co_la_boca"].replace("<", "", inplace = True)
air_dataframe.loc[air_dataframe["co_centenario"].str.contains("#REF!", na = False)] = air_dataframe["co_centenario"].replace("#REF!", -1, inplace = True)
air_dataframe.loc[air_dataframe["co_cordoba"].str.contains("#REF!", na = False)] = air_dataframe["co_cordoba"].replace("#REF!", -1, inplace = True)
air_dataframe.loc[air_dataframe["co_la_boca"].str.contains("#REF!", na = False)] = air_dataframe["co_la_boca"].replace("#REF!", -1, inplace = True)

In [188]:
column_types = {
                    "co_centenario" : np.float64,
                    "no2_centenario" : np.float64,
                    "pm10_centenario" : np.float64,
                    "co_cordoba" : np.float64,
                    "no2_cordoba" : np.float64,
                    "pm10_cordoba" : np.float64,
                    "co_la_boca" : np.float64,
                    "no2_la_boca" : np.float64,
                    "pm10_la_boca" : np.float64,
                    "co_palermo" : np.float64,
                    "no2_palermo" : np.float64
               }

air_dataframe = air_dataframe.astype(column_types, copy = False)
air_dataframe["fecha"] = pd.to_datetime(air_dataframe["fecha"].str[:-9], dayfirst = True, format = "%d%b%Y")

In [190]:
air_dataframe.to_csv(output_file_name, sep = ";", header = air_dataframe.columns, index = False, encoding = "utf-8")

In [191]:
sql_column_creation_str = ", ".join( f"{x} {str(v)}" for x, v in air_dataframe.dtypes.replace(py_sql_variable_mapping).to_dict().items())

In [192]:
delete_dict["calidad_aire"] = f"DROP TABLE IF EXISTS {sql_table_name}"
create_table_dict["calidad_aire"] = f"CREATE TABLE {sql_table_name} ({sql_column_creation_str});"
insert_dict["calidad_aire"] = f"COPY {sql_table_name} FROM STDIN WITH CSV HEADER DELIMITER AS ';'"

# Proccessing trips file

In [193]:
sql_table_name = "dataset_viajes_sube"
output_file_name = output_files_dict["dataset_viajes_sube"]

for ch in ch_mapping:
    sql_table_name = sql_table_name.lower().replace(" ", "_").replace("-", "_").replace(".", "_").replace(ch, "")
    
trips_dataframe = pd.read_csv("dataset_viajes_sube.csv")

In [194]:
columns_aux = list()
for column in trips_dataframe.columns:
    for ch in ch_mapping:
        column = column.lower().replace(" ", "_").replace(ch, "")
        
    columns_aux.append(column)
        
trips_dataframe.columns = columns_aux

In [195]:
trips_dataframe = trips_dataframe[trips_dataframe["cantidad"].notna()]
trips_dataframe["dia"] = pd.to_datetime(trips_dataframe["dia"].str[:-9], dayfirst = True, format = "%d%b%Y")

In [196]:
trips_dataframe.to_csv(output_file_name, sep = ";", header = trips_dataframe.columns, index = False, encoding = "utf-8")

In [197]:
sql_column_creation_str = ", ".join( f"{x} {str(v)}" for x, v in trips_dataframe.dtypes.replace(py_sql_variable_mapping).to_dict().items())

In [198]:
delete_dict["dataset_viajes_sube"] = f"DROP TABLE IF EXISTS {sql_table_name}"
create_table_dict["dataset_viajes_sube"] = f"CREATE TABLE {sql_table_name} ({sql_column_creation_str});"
insert_dict["dataset_viajes_sube"] = f"COPY {sql_table_name} FROM STDIN WITH CSV HEADER DELIMITER AS ';'"

# Connection to postgreSQL

In [199]:
conn = psycopg2.connect(
    host="srubio-instance.cr6pizokfpvj.sa-east-1.rds.amazonaws.com",
    database="scvChallenge",
    user="",
    password="")

cur = conn.cursor()

print("Connection was successful")

Connection was successful


In [203]:
for table in tables:
    print(table)
    my_file = open(output_files_dict[table])
    cur.execute(delete_dict[table])
    print(f"{table} deleted")
    cur.execute(create_table_dict[table])
    print(f"{table} created")
    cur.copy_expert(sql = insert_dict[table], file = my_file)
    print(f"{table} loaded")
    conn.commit()
    print("Transactions commited")
    my_file.close()
    print("file closed")
    
cur.close()

calidad_aire
calidad_aire deleted
calidad_aire created
calidad_aire loaded
Transactions commited
file closed
dataset_viajes_sube
dataset_viajes_sube deleted
dataset_viajes_sube created
dataset_viajes_sube loaded
Transactions commited
file closed


In [204]:
conn.close()