In [1]:
import pandas as pd
import unicodedata
import string
from sqlalchemy import create_engine

In [2]:
engine = create_engine('postgresql://postgres:s3nh4n3t@database-1.chcqqyli2cjd.us-east-2.rds.amazonaws.com:5432/olist')
conn = engine.connect()
table = str.maketrans('', '', string.punctuation)

#### Lets truncate all tables so this can be reexecuted without duplicating data

In [3]:
conn.execute("""truncate table order_review CASCADE;
truncate table order_payment CASCADE; 
truncate table payment_type CASCADE;
truncate table order_item CASCADE;
truncate table "order" CASCADE;
truncate table order_status CASCADE;
truncate table product CASCADE;
truncate table internationalization CASCADE;
truncate table "language" CASCADE;
truncate table category CASCADE;
truncate table customer CASCADE;
truncate table seller CASCADE;
truncate table geolocation CASCADE;
truncate table zip CASCADE;
truncate table city CASCADE;
truncate table state CASCADE;
""")

<sqlalchemy.engine.result.ResultProxy at 0x7f82e5224e48>

#### Load the datasets

In [4]:
df_seller = pd.read_csv('../datasets/olist_sellers_dataset.csv')
df_product = pd.read_csv('../datasets/olist_products_dataset.csv')
df_category_translation = pd.read_csv('../datasets/product_category_name_translation.csv')
df_customers = pd.read_csv('../datasets/olist_customers_dataset.csv')
df_geolocation = pd.read_csv('../datasets/olist_geolocation_dataset.csv')
df_order = pd.read_csv('../datasets/olist_orders_dataset.csv')
df_order_item = pd.read_csv('../datasets/olist_order_items_dataset.csv')
df_order_payment = pd.read_csv('../datasets/olist_order_payments_dataset.csv')
# The next dataframe has strings without delimiters which is breaking import.
# getting all cols as string to perform basic checks
df_order_review = pd.read_csv('../datasets/olist_order_reviews_dataset.csv', dtype=str, skipinitialspace = True, quotechar = '"')


#### Normalize zip, city, state and get all of them

In [5]:
# first lets normalize data on dataframes
def normalize_text(text):
    return unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode().translate(table).upper().strip()

In [6]:
# first lets normalize data on dataframes
df_geolocation['geolocation_city'] = df_geolocation['geolocation_city'].map(lambda x: normalize_text(x))
df_geolocation['geolocation_state'] = df_geolocation['geolocation_state'].map(lambda x: normalize_text(x))
df_seller['seller_city'] = df_seller['seller_city'].map(lambda x: normalize_text(x))
df_seller['seller_state'] = df_seller['seller_state'].map(lambda x: normalize_text(x))
df_customers['customer_city'] = df_customers['customer_city'].map(lambda x: unicodedata.normalize('NFKD', x).encode('ASCII', 'ignore').decode().translate(table).upper().strip())
df_customers['customer_state'] = df_customers['customer_state'].map(lambda x: unicodedata.normalize('NFKD', x).encode('ASCII', 'ignore').decode().translate(table).upper().strip())

# Now lets get all unique zip/city/state

df_all_zip = df_customers.iloc[:, 2:5]
df_all_zip.columns = ['zip_prefix','city', 'state']
df_tmp_zip = df_seller.iloc[:, 1:4]
df_tmp_zip.columns = ['zip_prefix','city', 'state']
df_all_zip = pd.concat([df_all_zip, df_tmp_zip])
df_tmp_zip = df_geolocation.drop(['geolocation_lat', 'geolocation_lng'], axis=1)
df_tmp_zip.columns = ['zip_prefix','city', 'state']
df_all_zip = pd.concat([df_all_zip, df_tmp_zip] )
df_all_zip = df_all_zip.groupby(['zip_prefix','city', 'state']).count().reset_index()


### CATEGORY

**get all categories from products dataset to ensure every possibel category in inserted**

In [7]:
category_list = df_product.product_category_name.unique()

In [8]:
sql = 'BEGIN;\n'
for cat in category_list:
    sql += f"insert into category(name) values('{cat}');\n"
sql += 'COMMIT;'
conn.execute(sql)

<sqlalchemy.engine.result.ResultProxy at 0x7f82a82631d0>

### LANGUAGE
**for now I'll insert only english, which is the only translation present in the dataset**

In [9]:
sql = "insert into language(code) values('EN-US');"
conn.execute(sql)


<sqlalchemy.engine.result.ResultProxy at 0x7f82b4a52a58>

In [10]:
sql = "select * from language where code = 'EN-US'"
result = conn.execute(sql)
lang_id, _ = result.first()

### INTERNATIONALIZATION

**Lets insert the translations for product categories**

In [11]:
sql = 'BEGIN;\n'
for index, row in df_category_translation.iterrows():
    sql += f"""insert into internationalization(reference_table, reference_id, language_id, text) 
            VALUES('category', (select id from category where name='{row['product_category_name']}'), {lang_id}, '{row['product_category_name_english']}');\n"""            
sql += 'COMMIT;'
result = conn.execute(sql)


#### STATE

In [12]:
state_list = df_all_zip.state.unique()

In [13]:
sql = 'BEGIN;\n'
for state in state_list:
    sql += f"""insert into state(name) VALUES('{state}');\n"""            
sql += 'COMMIT;'
result = conn.execute(sql)

#### CITY

In [14]:
df_cities = df_all_zip.drop(['zip_prefix'], axis=1).groupby(['city', 'state']).count().reset_index()

In [15]:
sql = 'BEGIN;\n'
for index, row in df_cities.iterrows():
    sql += f"""insert into city(name, state_id) VALUES('{row['city']}', (select id from state where name='{row['state']}'));\n"""
sql += 'COMMIT;'
result = conn.execute(sql)

#### ZIP

In [16]:
sql = 'BEGIN;\n'
for index, row in df_all_zip.iterrows():
    sql += f"insert into zip(prefix, city_id) VALUES('{row['zip_prefix']}', (select id from city where name='{row['city']}' and state_id=(select id from state where name ='{row['state']}')));\n"
sql += 'COMMIT;'
result = conn.execute(sql)

In [17]:
sql = """select z.id zip_id, cast(z.prefix as int8) geolocation_zip_code_prefix, 
c.name geolocation_city, s."name" geolocation_state
from zip as z
join city as c on z.city_id = c.id
join state as s on c.state_id = s.id
"""
df_zip_id = pd.read_sql(sql, conn)

In [18]:
df_geo_zip = pd.merge(df_geolocation, df_zip_id, how='left', on=['geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'])

In [19]:
df_geo_zip1 = df_geo_zip.drop(['geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'], axis=1)
df_geo_zip1.columns = ['latitude','longitude','zip_id']
df_geo_zip1.to_sql('geolocation', conn, if_exists='append', chunksize = 1000, index=False, method='multi')


#### SELLER

In [20]:
df_seller_1 = pd.merge(df_seller, df_zip_id, how='left', left_on=['seller_zip_code_prefix', 'seller_city', 'seller_state'], right_on=['geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'])
df_seller_1 = df_seller_1.drop(['seller_city', 'seller_state', 'seller_zip_code_prefix', 'geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'], axis=1)
df_seller_1.columns = ['id', 'zip_id']
df_seller_1.to_sql('seller', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### CUSTOMER

In [21]:
df_customer_1 = pd.merge(df_customers, df_zip_id, how='left', left_on=['customer_zip_code_prefix', 'customer_city', 'customer_state'], right_on=['geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'])
df_customer_1 = df_customer_1.drop(['customer_city', 'customer_state', 'customer_zip_code_prefix', 'geolocation_zip_code_prefix', 'geolocation_city', 'geolocation_state'], axis=1)
df_customer_1.columns = ['id', 'unique_id', 'zip_id']
df_customer_1.to_sql('customer', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### PRODUCT

In [22]:
df_cat_id = pd.read_sql("""select id as category_id, name as category_name from category""", conn)
df_product_1 = pd.merge(df_product, df_cat_id, how='left', left_on=['product_category_name'], right_on=['category_name'])
df_product_1 = df_product_1.drop(['product_category_name','category_name'], axis=1)
df_product_1.columns = ['id', 'name_length', 'description_length', 'photos_qty', 'weight_g', 'length_cm', 'height_cm', 'width_cm', 'category_id']
df_product_1.to_sql('product', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### ORDER STATUS

In [23]:
status_list = df_order.order_status.unique()
sql = 'BEGIN;\n'
for cat in status_list:
    sql += f"insert into order_status(description) values('{cat}');\n"
sql += 'COMMIT;'
conn.execute(sql)

<sqlalchemy.engine.result.ResultProxy at 0x7f82a1878588>

#### ORDER

In [24]:
df_order_1 = df_order.copy()
df_order_1.columns = [column_name.replace('order_', '') for column_name in df_order.columns]
df_status_id = pd.read_sql("""select id as status_id, description as status from order_status""", conn)
df_order_1 = pd.merge(df_order_1, df_status_id, how='left', on=['status'])
df_order_1 = df_order_1.drop(['status'], axis=1)
df_order_1.to_sql('order', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### ORDER ITEM

In [25]:
df_order_item_1 = df_order_item.copy()
df_order_item_1.columns = ['order_id', 'item_id', 'product_id', 'seller_id', 'shipping_limit_date', 'price', 'freight_value']
df_order_item_1.to_sql('order_item', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### PAYMENT TYPE

In [26]:
payment_type_list = df_order_payment.payment_type.unique()
sql = 'BEGIN;\n'
for cat in payment_type_list:
    sql += f"insert into payment_type(description) values('{cat}');\n"
sql += 'COMMIT;'
conn.execute(sql)

<sqlalchemy.engine.result.ResultProxy at 0x7f82a0a57ef0>

#### ORDER PAYMENT

In [27]:
df_order_payment_1 = df_order_payment.copy()
df_order_payment_1.columns = ['order_id', 'sequential', 'payment_type', 'installments', 'value']
df_payment_type_id = pd.read_sql("""select id as payment_type_id, description as payment_type from payment_type""", conn)
df_order_payment_1 = pd.merge(df_order_payment_1, df_payment_type_id, how='left', on=['payment_type'])
df_order_payment_1 = df_order_payment_1.drop(['payment_type'], axis=1)
df_order_payment_1.to_sql('order_payment', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

#### ORDER REVIEW

In [28]:
df_order_review_1 = df_order_review.copy()
print(f"Deleting a total of {df_order_review[df_order_review['Unnamed: 7'].notnull()]['review_id'].count()} reviews which have bad formmating due to time constraints")
df_order_review_1 = df_order_review_1.drop(df_order_review_1[df_order_review_1['Unnamed: 7'].notnull()].index)
df_order_review_1.columns = ['review_id'] + [column_name.replace('review_', '').replace('creation_', 'create_') for column_name in df_order_review_1.columns if column_name != 'review_id']
df_order_review_1.columns
df_order_review_1 = df_order_review_1.iloc[:, 0:7]
df_order_review_1.to_sql('order_review', conn, if_exists='append', chunksize = 1000, index=False, method='multi')

Deleting a total of 60 reviews which have bad formmating due to time constraints


In [29]:
conn.close()
engine.dispose()