In [1]:
# This scripts will pull data from the database and transform and load back to the database

try:
    import pandas as pd
    # Python SQL toolkit and Object Relational Mapper
    import sqlalchemy
    from sqlalchemy.ext.automap import automap_base
    from sqlalchemy.orm import Session
    from sqlalchemy import create_engine, inspect, func
    import os, sys
    import datetime as dt

    # Import DB key
    from config import config 
    key = config.db_key
        
except Exception as e:
    print(f"a module(s) have not been imported {e}" )

In [2]:
# generate connection to DB
engine = create_engine('postgresql://postgres:'+key+'@localhost:5432/Brazilian E-Commerce')
connection = engine.connect()


In [3]:
engine.execute('select * from stg_olist_products_dataset').fetchone()

('1e9e8ef04dbcff4541ed26657ea517e5', 'perfumaria', 40, 287, 1, 225, 16, 10, 14)

In [None]:
inspector = inspect(engine)
columns = inspector.get_columns('stg_olist_products_dataset')
for c in columns:
    print(c['name'], c['type'])
    

In [6]:

#Pull data from stg_olist_products_dataset table
stg_olist_products_dataset=pd.read_sql("select * from stg_olist_products_dataset", connection)
#stg_olist_products_dataset.head()
products_df=stg_olist_products_dataset[['product_id', 'product_category_name', 'product_name_lenght','product_description_lenght', 'product_photos_qty','product_weight_g','product_length_cm','product_height_cm','product_width_cm']].copy()
products_df.head()
products_df.to_sql(name='products', con=engine, if_exists='append', index=False)

In [8]:
#Pull data from stg_olist_sellers_dataset table
stg_olist_sellers_dataset = pd.read_sql("select * from stg_olist_sellers_dataset ", connection)
#stg_olist_sellers_dataset.head()
sellers_df = stg_olist_sellers_dataset[['seller_id', 'seller_zip_code_prefix', 'seller_city','seller_state']].copy()
#sellers_df.head()
sellers_df.to_sql(name='sellers', con=engine, if_exists='append', index=False)

In [7]:
#Pull data from stg_product_category_name_translation table
stg_product_category_name_translation = pd.read_sql("select * from stg_product_category_name_translation ", connection)

stg_product_category_name_translation = stg_product_category_name_translation.rename(columns={'product_category_name_english': 'product_category_name_en'})
#stg_product_category_name_translation.head()

product_category_df = stg_product_category_name_translation[['product_category_name', 'product_category_name_en']].copy()
#product_category_df.head()
product_category_df.to_sql(name='product_category', con=engine, if_exists='append', index=False)



In [None]:
#Pull data from stg_olist_marketing_qualified_leads_dataset table
stg_olist_marketing_qualified_leads_dataset = pd.read_sql("select * from stg_olist_marketing_qualified_leads_dataset ", connection)
#stg_olist_marketing_qualified_leads_dataset.head()
mkt_qualified_leads = stg_olist_marketing_qualified_leads_dataset[['mql_id', 'first_contact_date', 'landing_page_id', 'origin']].copy()
mkt_qualified_leads['sk_first_contact_dt']=stg_olist_marketing_qualified_leads_dataset['first_contact_date'].apply(lambda x: int(x[:x.find(' ')].replace('-','')))

#mkt_qualified_leads.dtypes 
#mkt_qualified_leads.head()
mkt_qualified_leads.to_sql(name='mkt_qualified_leads', con=engine, if_exists='append', index=False)



In [None]:
#Pull data from stg_olist_closed_deals_dataset table
stg_olist_closed_deals_dataset = pd.read_sql("select * from stg_olist_closed_deals_dataset ", connection)
stg_olist_closed_deals_dataset.head()

mkt_closed_deals = stg_olist_closed_deals_dataset[['mql_id', 'seller_id', 'sdr_id', 'sr_id','won_date','business_segment','lead_type','lead_behaviour_profile','has_company','has_gtin','average_stock','business_type','declared_product_catalog_size','declared_monthly_revenue']].copy()

mkt_closed_deals = mkt_closed_deals.rename(columns={'declared_product_catalog_size': 'declared_prod_cat_size'})
mkt_closed_deals['sk_won_dt']=stg_olist_closed_deals_dataset['won_date'].apply(lambda x: int(x[:x.find(' ')].replace('-','')))

#mkt_closed_deals.dtypes 
#mkt_closed_deals.head()
mkt_closed_deals.to_sql(name='mkt_closed_deals', con=engine, if_exists='append', index=False)

In [None]:
#Pull data from stg_olist_order_payments_dataset table
stg_order_payments_df=pd.read_sql("select * from stg_olist_order_payments_dataset", connection)
#stg_order_payments_df.head()

order_payments_df=stg_order_payments_df[['order_id', 'payment_sequential', 'payment_type','payment_installments', 'payment_value']].copy()
#order_payments_df.head()
order_payments_df.to_sql(name='order_payments', con=engine, if_exists='append', index=False)

In [5]:
#Pull data from stg_olist_order_reviews_dataset table

stg_order_reviews_df=pd.read_sql("select * from stg_olist_order_reviews_dataset", connection)
#stg_order_reviews_df.head()
order_reviews_df=stg_order_reviews_df[['review_id', 'order_id', 'review_score', 'review_comment_title', 'review_comment_message', 'review_creation_date']].copy()
order_reviews_df['sk_review_creation_dt'] = stg_order_reviews_df['review_creation_date'].apply(lambda x: int(x.strftime('%Y%m%d')))
order_reviews_df['review_answer_timestamp']=stg_order_reviews_df['review_answer_timestamp'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))

order_reviews_df['sk_review_answer_dt']=stg_order_reviews_df['review_answer_timestamp'].apply(lambda x: int(x[:x.find(' ')].replace('-','')))
order_reviews_df.head()
order_reviews_df.to_sql(name='order_reviews', con=engine, if_exists='append', index=False)



In [5]:
#Pull data from stg_olist_orders_dataset table

stg_orders_df=pd.read_sql("select * from stg_olist_orders_dataset", connection)
#stg_orders_df.head()
orders_df=stg_orders_df[['order_id', 'customer_id', 'order_status']].copy()
orders_df['order_purchase_timestamp']=stg_orders_df['order_purchase_timestamp'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') if (x!=None) else None)
orders_df['sk_order_purchase_dt']=stg_orders_df['order_purchase_timestamp'].apply(lambda x: int(x[:x.find(' ')].replace('-','')) if (x!=None) else None)

orders_df['order_approved_at']=stg_orders_df['order_approved_at'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') if (x!=None) else None)
orders_df['sk_order_approved_at_dt']=stg_orders_df['order_approved_at'].apply(lambda x: int(x[:x.find(' ')].replace('-','')) if (x!=None) else None)

orders_df['order_delivered_carrier_date']=stg_orders_df['order_delivered_carrier_date'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') if (x!=None) else None)
orders_df['sk_order_delivered_carrier_dt']=stg_orders_df['order_delivered_carrier_date'].apply(lambda x: int(x[:x.find(' ')].replace('-','')) if (x!=None) else None)

orders_df['order_delivered_customer_date']=stg_orders_df['order_delivered_customer_date'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') if (x!=None) else None)
orders_df['sk_order_delivered_customer_dt']=stg_orders_df['order_delivered_customer_date'].apply(lambda x: int(x[:x.find(' ')].replace('-','')) if (x!=None) else None)

orders_df['order_estimated_delivery_date']=stg_orders_df['order_estimated_delivery_date']
orders_df['sk_order_estimated_delivery_dt']=stg_orders_df['order_estimated_delivery_date'].apply(lambda x: int(x.strftime('%Y%m%d')))

#orders_df.head(10)


orders_df.to_sql(name='orders', con=engine, if_exists='append', index=False)

In [None]:
#Pull data from stg_olist_customer_dataset table

customer_df=pd.read_sql("select * from stg_olist_customer_dataset", connection)
#customer_df.head()
customer_df.to_sql(name='customer', con=engine, if_exists='append', index=False)



In [None]:
#Pull data from stg_olist_geolocation_dataset table

geolocation_df=pd.read_sql("select * from stg_olist_geolocation_dataset", connection)
#geolocation_df.head()
geolocation_df.to_sql(name='geolocation', con=engine, if_exists='append', index=False)

In [None]:
#Pull data from stg_olist_order_items_dataset table

stg_order_items_df=pd.read_sql("select * from stg_olist_order_items_dataset", connection)
stg_order_items_df.head()

order_items_df=stg_order_items_df[['order_id', 'order_item_id', 'product_id','seller_id']].copy()
order_items_df['shipping_limit_date']=stg_order_items_df['shipping_limit_date'].apply(lambda x: dt.datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
order_items_df['sk_shipping_limit_dt']=stg_order_items_df['shipping_limit_date'].apply(lambda x: int(x[:x.find(' ')].replace('-','')))
order_items_df['price']=stg_order_items_df['price']
order_items_df['freight_value']=stg_order_items_df['freight_value']
order_items_df.head()

order_items_df.to_sql(name='order_items', con=engine, if_exists='append', index=False)
