In [None]:
!pip install azure-storage-blob
!pip install python-dotenv


In [2]:
!pip install pyarrow



In [5]:
import psycopg2
import pandas as pd
import io
import os
from azure.storage.blob import BlobServiceClient, BlobClient
from dotenv import load_dotenv



## Extraction

In [4]:
# Define database parameters
db_params = {
    'user':'internship',
    'password':'10alytics!',
    'host':'10alyticsinternship.postgres.database.azure.com',
    'port': '5432',
    'database':'kongoecommerce'
}

In [5]:
def extract_table_to_dataframe(table_name, conn):
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql_query(query, conn)
    return df 


In [6]:
# Extracting df
connection = psycopg2.connect(**db_params)

In [None]:
# Extracting Cart data 
cart_df = extract_table_to_dataframe('cart', connection)
cart_df.head()

In [None]:
# Extracting products data 
products_df = extract_table_to_dataframe('products', connection)
products_df.head()

In [None]:
# Extracting sales data 
sales_df = extract_table_to_dataframe('sales', connection)
sales_df.head()

In [None]:
# Extracting users data 
users_df = extract_table_to_dataframe('users', connection)
users_df.head()

In [11]:
#Save to csv
cart_df.to_csv('dataset/raw_data/cart.csv', index=False)
products_df.to_csv('dataset/raw_data/products.csv', index=False)
sales_df.to_csv('dataset/raw_data/sales.csv', index=False)
users_df.to_csv('dataset/raw_data/users.csv', index=False)

In [33]:
r_cart = pd.read_csv(r'dataset/raw_data/cart.csv')
r_product = pd.read_csv(r'dataset/raw_data/products.csv')
r_sales = pd.read_csv(r'dataset/raw_data/sales.csv')
r_user = pd.read_csv(r'dataset/raw_data/users.csv')

## Transformation

In [12]:
cart_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   id        7 non-null      int64 
 1   userId    7 non-null      int64 
 2   date      7 non-null      object
 3   products  7 non-null      object
dtypes: int64(2), object(2)
memory usage: 356.0+ bytes


In [13]:
cart_df['date'] = pd.to_datetime(cart_df['date'])

In [14]:
products_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   id           20 non-null     int64  
 1   title        20 non-null     object 
 2   price        20 non-null     float64
 3   description  20 non-null     object 
 4   category     20 non-null     object 
 5   image        20 non-null     object 
dtypes: float64(1), int64(1), object(4)
memory usage: 1.1+ KB


In [15]:
products_df.rename(columns={'id':'product_id'}, inplace=True)

In [16]:
sales_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   customer_id       1000 non-null   int64  
 1   customer_name     1000 non-null   object 
 2   Transaction_Date  1000 non-null   object 
 3   email_address     1000 non-null   object 
 4   phone_number      1000 non-null   int64  
 5   amount            1000 non-null   float64
 6   product           1000 non-null   object 
dtypes: float64(1), int64(2), object(4)
memory usage: 54.8+ KB


In [17]:
sales_df['Transaction_Date'] = pd.to_datetime(sales_df['Transaction_Date'])

In [18]:
users_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 13 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   id                        10 non-null     int64  
 1   email                     10 non-null     object 
 2   username                  10 non-null     object 
 3   password                  10 non-null     object 
 4   phone                     10 non-null     object 
 5   address_city              10 non-null     object 
 6   address_street            10 non-null     object 
 7   address_number            10 non-null     int64  
 8   address_zipcode           10 non-null     object 
 9   address_geolocation_lat   10 non-null     float64
 10  address_geolocation_long  10 non-null     float64
 11  name_firstname            10 non-null     object 
 12  name_lastname             10 non-null     object 
dtypes: float64(2), int64(2), object(9)
memory usage: 1.1+ KB


In [19]:
users_df.columns

Index(['id', 'email', 'username', 'password', 'phone', 'address_city',
       'address_street', 'address_number', 'address_zipcode',
       'address_geolocation_lat', 'address_geolocation_long', 'name_firstname',
       'name_lastname'],
      dtype='object')

In [20]:
location_dim = users_df[['address_city','address_street', 'address_number', 'address_zipcode', \
                         'address_geolocation_lat', 'address_geolocation_long']].copy().drop_duplicates().reset_index(drop=True)


In [21]:
location_dim.index.name = 'location_id'
location_dim = location_dim.reset_index()
location_dim.head()

Unnamed: 0,location_id,address_city,address_street,address_number,address_zipcode,address_geolocation_lat,address_geolocation_long
0,0,kilcoole,new road,7682,12926-3874,-37.3159,81.1496
1,1,kilcoole,Lovers Ln,7267,12926-3874,-37.3159,81.1496
2,2,Cullman,Frances Ct,86,29567-1452,40.3467,-30.131
3,3,San Antonio,Hunters Creek Dr,6454,98234-1734,50.3467,-20.131
4,4,san Antonio,adams St,245,80796-1234,40.3467,-40.131


In [22]:
location_dim.head()

Unnamed: 0,location_id,address_city,address_street,address_number,address_zipcode,address_geolocation_lat,address_geolocation_long
0,0,kilcoole,new road,7682,12926-3874,-37.3159,81.1496
1,1,kilcoole,Lovers Ln,7267,12926-3874,-37.3159,81.1496
2,2,Cullman,Frances Ct,86,29567-1452,40.3467,-30.131
3,3,San Antonio,Hunters Creek Dr,6454,98234-1734,50.3467,-20.131
4,4,san Antonio,adams St,245,80796-1234,40.3467,-40.131


In [23]:
users_df.columns

Index(['id', 'email', 'username', 'password', 'phone', 'address_city',
       'address_street', 'address_number', 'address_zipcode',
       'address_geolocation_lat', 'address_geolocation_long', 'name_firstname',
       'name_lastname'],
      dtype='object')

In [24]:
user_dim = users_df.merge(location_dim, on=['address_city','address_street', 'address_number', 'address_zipcode', \
                                            'address_geolocation_lat', 'address_geolocation_long'], how='left') \
                           [['id', 'email', 'username', 'password', 'phone', 'name_firstname','name_lastname','location_id']]


In [25]:
user_dim.head()

Unnamed: 0,id,email,username,password,phone,name_firstname,name_lastname,location_id
0,1,john@gmail.com,johnd,m38rmF$,1-570-236-7033,john,doe,0
1,2,morrison@gmail.com,mor_2314,83r5^_,1-570-236-7033,david,morrison,1
2,3,kevin@gmail.com,kevinryan,kev02937@,1-567-094-1345,kevin,ryan,2
3,4,don@gmail.com,donero,ewedon,1-765-789-6734,don,romer,3
4,5,derek@gmail.com,derek,jklg*_56,1-956-001-1945,derek,powell,4


In [26]:
user_dim.rename(columns={'id':'customer__id'}, inplace=True)

In [27]:
cart_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   id        7 non-null      int64         
 1   userId    7 non-null      int64         
 2   date      7 non-null      datetime64[ns]
 3   products  7 non-null      object        
dtypes: datetime64[ns](1), int64(2), object(1)
memory usage: 356.0+ bytes


In [40]:
#Saving to csv
cart_df.to_csv('dataset/cleaned_data/clean_cart.csv', index=False)
user_dim.to_csv('dataset/cleaned_data/clean_user.csv', index=False)
location_dim.to_csv('dataset/cleaned_data/clean_location.csv', index=False)
products_df.to_csv('dataset/cleaned_data/clean_products.csv', index=False)
sales_df.to_csv('dataset/cleaned_data/clean_sales.csv', index=False)

## Loading

In [None]:
# Setup our connection
connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)


In [49]:
def upload_df_to_blob_container(df, container_client, blob_name):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(buffer, blob_type="BlockBlob",overwrite=True)
    print(f'{blob_name} uploaded to Blob storage successfully')

In [60]:
upload_df_to_blob_container(r_cart, container_client, 'rawdata/cart.parquet')
upload_df_to_blob_container(r_product, container_client, 'rawdata/product.parquet')
upload_df_to_blob_container(r_sales, container_client, 'rawdata/sales.parquet')
upload_df_to_blob_container(r_user, container_client, 'rawdata/user.parquet')

rawdata/cart.parquet uploaded to Blob storage successfully
rawdata/product.parquet uploaded to Blob storage successfully
rawdata/sales.parquet uploaded to Blob storage successfully
rawdata/user.parquet uploaded to Blob storage successfully


In [61]:
upload_df_to_blob_container(cart_df, container_client, 'cleandata/cart_dim.parquet')
upload_df_to_blob_container(products_df, container_client, 'cleandata/product_dim.parquet')
upload_df_to_blob_container(sales_df, container_client, 'cleandata/sales_fact.parquet')
upload_df_to_blob_container(user_dim, container_client, 'cleandata/user_dim.parquet')
upload_df_to_blob_container(location_dim, container_client, 'cleandata/location_dim.parquet')

cleandata/cart_dim.parquet uploaded to Blob storage successfully
cleandata/product_dim.parquet uploaded to Blob storage successfully
cleandata/sales_fact.parquet uploaded to Blob storage successfully
cleandata/user_dim.parquet uploaded to Blob storage successfully
cleandata/location_dim.parquet uploaded to Blob storage successfully
