In [None]:
'''
This script is designed to automate the process of creating tables 
in a PostgreSQL database and populating them with data from CSV files using Python libraries. 
It iterates through tables, creates the corresponding table on PostgreSQL and then populates them
'''

# Importing the necessary libraries
# Numpy for data manipulation (just in case) 
# Pandas for reading CSV into a DF and analyzing it
# Psycopg2 and sqlalchemy to interact with PostgreSQL database
# OS to interact with the, well, OS and to analyze/manipulate files

import numpy as np
import pandas as pd
import psycopg2 as pg2
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

In [None]:
# Creating a dictionary of dataframes from the .csv files
df_dict = {file.split(".csv")[0]:pd.read_csv(file) for file in os.listdir() if file.endswith(".csv")}

# Configuring pg2
conn_pg2 = pg2.connect(database='BrazilEcom', user='postgres',password='****')

#Connecting with the DB using sql_alch
conn_string = 'postgresql://postgres:****@localhost/BrazilEcom'
db = create_engine(conn_string)
conn_sql_alch = db.connect()
#FINALLY SQLALCHEMY WORKS

In [None]:
for file in df_dict:
    print(file)
    df_dict[file].info()
    print("------\n")

In [None]:
# Printing all the .csv files in the current dir
for file in os.listdir():
    if file.endswith(".csv"):
        print(file)

In [None]:
# Let's see how many null rows we have in each table
print("Total number of NaN rows: \n")
for file in df_dict:
    print(f"{file} : {df_dict[file].isnull().sum().sum()}")

In [None]:
# Now we'll create a table in PostgreSQL for each DataFrame and then load it

command = '''
CREATE TABLE IF NOT EXISTS olist_customers_dataset (
index INTEGER,
customer_id VARCHAR(50) PRIMARY KEY,
customer_unique_id VARCHAR(50),
customer_zip_code_prefix INTEGER,
customer_city VARCHAR(50),
customer_state VARCHAR(50)
)
'''
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")

# !!IMPORTANT!!

# we "APPEND" the data into the table because it already exists (and has 0 entries)
# Have we set if_exists to "REPLACE", it would Drop the table before inserting new values.
# If the REPLACE option is selected, the PK CONSTRAINT that we've set before dissapears
# Check out https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html for more info

# !!IMPORTANT!!

# Here we set index True since we don't originally have a PK and we want to create it while loading the data
# We want the index to start from 1, so we change the df index
df_dict["olist_customers_dataset"].index += 1
# We also added this while creating the table so the .to_sql can find that column when appending data
df_dict["olist_customers_dataset"].to_sql("olist_customers_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")



In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_geolocation_dataset (
index INTEGER,
geolocation_zip_code_prefix INTEGER,
geolocation_lat DOUBLE PRECISION,
geolocation_lng DOUBLE PRECISION,
geolocation_city VARCHAR(50),
geolocation_state VARCHAR(50)
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")

# Here we set index True since we don't originally have a PK and we want to create it while loading the data
# We want the index to start from 1, so we change the df index
df_dict["olist_geolocation_dataset"].index += 1
# We also added this while creating the table so the .to_sql can find that column when appending data
df_dict["olist_geolocation_dataset"].to_sql("olist_geolocation_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_orders_dataset (
index INTEGER,
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50),
order_status VARCHAR(50),
order_purchase_timestamp TIMESTAMP,
order_approved_at TIMESTAMP,
order_delivered_carrier_date TIMESTAMP,
order_delivered_customer_date TIMESTAMP,
order_estimated_delivery_date TIMESTAMP
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")


df_dict["olist_orders_dataset"].index += 1
df_dict["olist_orders_dataset"].to_sql("olist_orders_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_order_items_dataset (
index INTEGER,
order_id VARCHAR(50),
order_item_id SMALLINT,
product_id VARCHAR(50),
seller_id VARCHAR(50),
shipping_limit_date TIMESTAMP,
price MONEY,
freight_value DOUBLE PRECISION
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")


df_dict["olist_order_items_dataset"].index += 1
df_dict["olist_order_items_dataset"].to_sql("olist_order_items_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_order_payments_dataset (
index INTEGER,
order_id VARCHAR(50),
payment_sequential SMALLINT,
payment_type VARCHAR(50),
payment_installments SMALLINT,
payment_value MONEY
)
'''
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")



df_dict["olist_order_payments_dataset"].index += 1
df_dict["olist_order_payments_dataset"].to_sql("olist_order_payments_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_order_reviews_dataset (
index INTEGER,
review_id VARCHAR(50),
order_id VARCHAR(50),
review_score SMALLINT,
review_comment_title text,
review_comment_message text,
review_creation_date TIMESTAMP,
review_answer_timestamp TIMESTAMP
)
'''
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")



df_dict["olist_order_reviews_dataset"].index += 1
df_dict["olist_order_reviews_dataset"].to_sql("olist_order_reviews_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_products_dataset (
index INTEGER,
product_id VARCHAR(50) PRIMARY KEY,
product_category_name VARCHAR(50),
product_name_lenght SMALLINT,
product_description_lenght INTEGER,
product_photos_qty SMALLINT,
product_weight_g INTEGER,
product_length_cm SMALLINT,
product_height_cm SMALLINT,
product_width_cm SMALLINT
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")



df_dict["olist_products_dataset"].index += 1
df_dict["olist_products_dataset"].to_sql("olist_products_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS olist_sellers_dataset (
index INTEGER,
seller_id VARCHAR(50) PRIMARY KEY,
seller_zip_code_prefix INTEGER,
seller_city VARCHAR(50),
seller_state VARCHAR(50)
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")


df_dict["olist_sellers_dataset"].index += 1
df_dict["olist_sellers_dataset"].to_sql("olist_sellers_dataset", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")

In [None]:
command = '''
CREATE TABLE IF NOT EXISTS product_category_name_translation (
index INTEGER,
product_category_name VARCHAR(50) PRIMARY KEY,
product_category_name_english VARCHAR(50)
)
''' 
cur = conn_pg2.cursor()
cur.execute(command)
cur.close()
conn_pg2.commit()
print("Successfully Created Table in SQL")



df_dict["product_category_name_translation"].index += 1
df_dict["product_category_name_translation"].to_sql("product_category_name_translation", con=conn_sql_alch, if_exists='append', index=True)
conn_sql_alch.autocommit = True
print("Successfully Uploaded values in SQL")