# Importing Necessary Dependencies

In [1]:
import pandas as pd 
import psycopg2
import boto3 # pip install boto3
from botocore.exceptions import NoCredentialsError
import os
import io
from io import StringIO
import redshift_connector    # pip install redshift-connector

# Extraction Layer

In [2]:
# Establishing connection
db_params = {
    'host': 'localhost',
    'database': 'postgres',
    'user': 'postgres',
    'password': 'Lanw122l3',
    'port': '5432'
}

In [3]:
# Function to extract all the data from the table
def extract_table_to_dataframe(table_name, conn):
    query = f"SELECT * FROM {table_name};"
    df = pd.read_sql_query(query, conn)
    return df

In [4]:
# Extracting the carts data into a dataframe
connection = psycopg2.connect(**db_params)

table_name = 'kongo.carts'

carts_df = extract_table_to_dataframe(table_name, connection)

  df = pd.read_sql_query(query, conn)


In [5]:
# Extracting the products data into a dataframe
connection = psycopg2.connect(**db_params)

table_name = 'kongo.products'

products_df = extract_table_to_dataframe(table_name, connection)

  df = pd.read_sql_query(query, conn)


In [6]:
# Extracting the sales data into a dataframe
connection = psycopg2.connect(**db_params)

table_name = 'kongo.sales'

sales_df = extract_table_to_dataframe(table_name, connection)

  df = pd.read_sql_query(query, conn)


In [7]:
# Extracting the users data into a dataframe
connection = psycopg2.connect(**db_params)

table_name = 'kongo.users'

users_df = extract_table_to_dataframe(table_name, connection)

  df = pd.read_sql_query(query, conn)


# Transformation Layer

In [8]:
carts_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   id        7 non-null      int64         
 1   userid    7 non-null      int64         
 2   date      7 non-null      datetime64[ns]
 3   products  7 non-null      object        
dtypes: datetime64[ns](1), int64(2), object(1)
memory usage: 352.0+ bytes


In [9]:
users_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 13 columns):
 #   Column                    Non-Null Count  Dtype 
---  ------                    --------------  ----- 
 0   id                        10 non-null     int64 
 1   email                     10 non-null     object
 2   username                  10 non-null     object
 3   password                  10 non-null     object
 4   phone                     10 non-null     object
 5   address_city              10 non-null     object
 6   address_street            10 non-null     object
 7   address_number            10 non-null     int64 
 8   address_zipcode           10 non-null     object
 9   address_geolocation_lat   10 non-null     object
 10  address_geolocation_long  10 non-null     object
 11  name_firstname            10 non-null     object
 12  name_lastname             10 non-null     object
dtypes: int64(2), object(11)
memory usage: 1.1+ KB


In [10]:
products_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   id           20 non-null     int64  
 1   title        20 non-null     object 
 2   price        20 non-null     float64
 3   category     20 non-null     object 
 4   description  20 non-null     object 
 5   image        20 non-null     object 
dtypes: float64(1), int64(1), object(4)
memory usage: 1.1+ KB


In [11]:
sales_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   customer_id       1000 non-null   int64  
 1   customer_name     1000 non-null   object 
 2   transaction_date  1000 non-null   object 
 3   email_address     1000 non-null   object 
 4   phone_number      1000 non-null   int64  
 5   amount            1000 non-null   float64
 6   product           1000 non-null   object 
dtypes: float64(1), int64(2), object(4)
memory usage: 54.8+ KB


In [12]:
carts_dim = carts_df.copy().drop_duplicates()

In [13]:
products_dim = products_df.copy().drop_duplicates()

In [14]:
user_dim = users_df[['id', 'email', 'username', 'phone', 'address_street', 'name_lastname', 'name_firstname']]

In [15]:
location_dim = users_df[['address_city', 'address_street', 'address_number', 'address_zipcode', 'address_geolocation_lat', 'address_geolocation_long']]

In [16]:
location_dim = location_dim.reset_index(drop=True)
location_dim['locationID'] = range(1, len(location_dim) + 1)

In [17]:
location_dim.head()

Unnamed: 0,address_city,address_street,address_number,address_zipcode,address_geolocation_lat,address_geolocation_long,locationID
0,kilcoole,new road,7682,12926-3874,-37.3159,81.1496,1
1,kilcoole,Lovers Ln,7267,12926-3874,-37.3159,81.1496,2
2,Cullman,Frances Ct,86,29567-1452,40.3467,-30.131,3
3,San Antonio,Hunters Creek Dr,6454,98234-1734,50.3467,-20.131,4
4,san Antonio,adams St,245,80796-1234,40.3467,-40.131,5


In [18]:
user_dim = user_dim.merge(location_dim, on = 'address_street', how='inner')

In [19]:
user_dim = user_dim.drop(['address_city', 'address_street', 'address_number', 'address_zipcode', 'address_geolocation_lat', 'address_geolocation_long'], axis=1)

In [20]:
user_dim.head()

Unnamed: 0,id,email,username,phone,name_lastname,name_firstname,locationID
0,1,john@gmail.com,johnd,1-570-236-7033,doe,john,1
1,2,morrison@gmail.com,mor_2314,1-570-236-7033,morrison,david,2
2,3,kevin@gmail.com,kevinryan,1-567-094-1345,ryan,kevin,3
3,4,don@gmail.com,donero,1-765-789-6734,romer,don,4
4,5,derek@gmail.com,derek,1-956-001-1945,powell,derek,5


In [21]:
sales_fact = sales_df[['customer_id', 'transaction_date', 'amount', 'product']]

In [22]:
sales_fact = sales_fact.reset_index(drop=True)
sales_fact['salesID'] = range(1, len(sales_fact) + 1)

In [23]:
# Drop the 'title' column from the DataFrame
products_dim = products_dim.drop(columns=['category', 'description', 'image'])

# Now, products_dim DataFrame doesn't contain the 'title' column


In [24]:
products_dim.head()

Unnamed: 0,id,title,price
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3
2,3,Mens Cotton Jacket,55.99
3,4,Mens Casual Slim Fit,15.99
4,5,John Hardy Women's Legends Naga Gold & Silver ...,695.0


In [25]:
carts_dim = carts_dim.drop(columns=['products'])

In [26]:
carts_dim.to_csv('carts_dim.csv', index=False)
user_dim.to_csv('user_dim.csv', index=False)
products_dim.to_csv('products_dim.csv', index=False)
sales_fact.to_csv('sales_fact.csv', index=False)
location_dim.to_csv('location_dim.csv', index=False)

# Loading Layer

In [27]:
# 1st save to s3 bucket........ boto3
# 2nd save to aws redshift

##  Loading into S3

In [22]:
# Initialize boto3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id='*************',
    aws_secret_access_key='*************',
    region_name='*************'
)

# Function to upload Dataframe to s3 bucket as a parquet     [! pip install pyarrow]
def upload_df_to_s3_as_parquet(df, bucket, key):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    s3_client.upload_fileobj(buffer, bucket, key)
    print(f'{key} uploaded to S3 bucket {bucket} successfully')


# S3 bucket name 
bucket_name = 'mykongo-ecommerce'

# Upload the dataframe to s3 bucket
upload_df_to_s3_as_parquet(carts_df, bucket_name, 'rawdata/carts.parquet')
upload_df_to_s3_as_parquet(sales_df, bucket_name, 'rawdata/sales.parquet')
upload_df_to_s3_as_parquet(products_df, bucket_name, 'rawdata/products.parquet')
upload_df_to_s3_as_parquet(users_df, bucket_name, 'rawdata/users.parquet')

  if _pandas_api.is_sparse(col):


rawdata/carts.parquet uploaded to S3 bucket mykongo-ecommerce successfully
rawdata/sales.parquet uploaded to S3 bucket mykongo-ecommerce successfully
rawdata/products.parquet uploaded to S3 bucket mykongo-ecommerce successfully
rawdata/users.parquet uploaded to S3 bucket mykongo-ecommerce successfully


In [23]:
# Initialize boto3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id='*************',
    aws_secret_access_key='*************',
    region_name='*************'
)

# Function to upload Dataframe to s3 bucket as a parquet
def upload_df_to_s3_as_parquet(df, bucket, key):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    s3_client.upload_fileobj(buffer, bucket, key)
    print(f'{key} uploaded to S3 bucket {bucket} successfully')


# S3 bucket name 
bucket_name = 'mykongo-ecommerce'

# Upload the dataframe to s3 bucket
upload_df_to_s3_as_parquet(carts_dim, bucket_name, 'transformeddata/carts_dim.parquet')
upload_df_to_s3_as_parquet(location_dim, bucket_name, 'transformeddata/location_dim.parquet')
upload_df_to_s3_as_parquet(sales_fact, bucket_name, 'transformeddata/sales_fact.parquet')
upload_df_to_s3_as_parquet(products_dim, bucket_name, 'transformeddata/products_dim.parquet')
upload_df_to_s3_as_parquet(user_dim, bucket_name, 'transformeddata/user_dim.parquet')

transformeddata/carts_dim.parquet uploaded to S3 bucket mykongo-ecommerce successfully
transformeddata/location_dim.parquet uploaded to S3 bucket mykongo-ecommerce successfully
transformeddata/sales_fact.parquet uploaded to S3 bucket mykongo-ecommerce successfully
transformeddata/products_dim.parquet uploaded to S3 bucket mykongo-ecommerce successfully
transformeddata/user_dim.parquet uploaded to S3 bucket mykongo-ecommerce successfully


In [24]:
# AWS credentials
aws_access_key_id = '*************'
aws_secret_access_key = '*************'

files = ['location_dim', 'carts_dim', 'products_dim', 'sales_fact', 'user_dim']

for file in files:
    # S3 bucket and object key for the data file
    s3_bucket = 'mykongo-ecommerce'
    s3_key = f'transformeddata/{file}.csv'

    # Local path to the data file
    local_data_file_path = f"{file}.csv"

    def upload_to_s3(file_path, bucket, object_key):
        try:
            s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id,
                            aws_secret_access_key=aws_secret_access_key)
            s3.upload_file(file_path, bucket, object_key)
            print(f"File uploaded to S3://{bucket}/{object_key}")
        except NoCredentialsError:
            print("Credentials not available or incorrect.")

    upload_to_s3(local_data_file_path, s3_bucket, s3_key)

File uploaded to S3://mykongo-ecommerce/transformeddata/location_dim.csv
File uploaded to S3://mykongo-ecommerce/transformeddata/carts_dim.csv
File uploaded to S3://mykongo-ecommerce/transformeddata/products_dim.csv
File uploaded to S3://mykongo-ecommerce/transformeddata/sales_fact.csv
File uploaded to S3://mykongo-ecommerce/transformeddata/user_dim.csv


In [None]:
# Load transformed data to redshift
#! pip install redshift-connector

In [25]:
# Connect to Redshift cluster using AWS credentials
conn = redshift_connector.connect(
    host='************************',
    database='*****',
    user='**********',
    password='***********'
    )


In [26]:
conn.autocommit = True

## Generate Create table query satements for our dimension tables

In [27]:
carts_dimsql = pd.io.sql.get_schema(carts_dim, 'carts_dim')
print(''.join(carts_dimsql)) 

CREATE TABLE "carts_dim" (
"id" INTEGER,
  "userid" INTEGER,
  "date" TIMESTAMP
)


In [28]:
products_dimsql = pd.io.sql.get_schema(products_dim, 'products_dim')
print(''.join(products_dimsql)) 

CREATE TABLE "products_dim" (
"id" INTEGER,
  "title" TEXT,
  "price" REAL
)


In [29]:
location_dimsql = pd.io.sql.get_schema(location_dim, 'location_dim')
print(''.join(location_dimsql)) 

CREATE TABLE "location_dim" (
"address_city" TEXT,
  "address_street" TEXT,
  "address_number" INTEGER,
  "address_zipcode" TEXT,
  "address_geolocation_lat" TEXT,
  "address_geolocation_long" TEXT,
  "locationID" INTEGER
)


In [30]:
user_dimsql = pd.io.sql.get_schema(user_dim, 'user_dim')
print(''.join(user_dimsql)) 

CREATE TABLE "user_dim" (
"id" INTEGER,
  "email" TEXT,
  "username" TEXT,
  "phone" TEXT,
  "name_lastname" TEXT,
  "name_firstname" TEXT,
  "locationID" INTEGER
)


In [31]:
sales_factsql = pd.io.sql.get_schema(sales_fact, 'sales_fact')
print(''.join(sales_factsql)) 

CREATE TABLE "sales_fact" (
"customer_id" INTEGER,
  "transaction_date" DATE,
  "amount" REAL,
  "product" TEXT,
  "salesID" INTEGER
)


In [63]:
! pip install os

ERROR: Could not find a version that satisfies the requirement os (from versions: none)
ERROR: No matching distribution found for os


## Next step to create tables on redshift

In [32]:
cursor = redshift_connector.Cursor = conn.cursor()

In [33]:
cursor.execute("""
CREATE TABLE "carts_dim" (
"id" INTEGER,
  "userid" INTEGER,
  "date" TIMESTAMP
)
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [34]:
cursor.execute("""
CREATE TABLE "products_dim" (
"id" INTEGER,
  "title" varchar(65535),
  "price" REAL
)
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [35]:
cursor.execute("""
CREATE TABLE "location_dim" (
"address_city" TEXT,
  "address_street" TEXT,
  "address_number" INTEGER,
  "address_zipcode" TEXT,
  "address_geolocation_lat" TEXT,
  "address_geolocation_long" TEXT,
  "locationID" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [36]:
cursor.execute("""
CREATE TABLE "user_dim" (
"id" INTEGER,
  "email" TEXT,
  "username" TEXT,
  "phone" TEXT,
  "name_lastname" TEXT,
  "name_firstname" TEXT,
  "locationID" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [37]:
cursor.execute("""
CREATE TABLE "sales_fact" (
"customer_id" INTEGER,
  "transaction_date" DATE,
  "amount" REAL,
  "product" TEXT,
  "salesID" INTEGER
)
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

## Copy the data from the csv file in s3 to our redshift tables

In [39]:
cursor.execute("""
copy carts_dim from '******************************' 
credentials '***********************************************************************'
delimiter ','
region 'us-east-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [40]:
cursor.execute("""
COPY products_dim FROM '**************************************************'
CREDENTIALS '***********************************************************************'
delimiter ','
region 'us-east-1'
IGNOREHEADER 2
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [41]:
cursor.execute("""
copy location_dim from '******************************************'
credentials '***********************************************************************'
delimiter ','
region 'us-east-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>

In [42]:
cursor.execute("""
copy sales_fact from '*************************************************'
credentials '***********************************************************************'
delimiter ','
region 'us-east-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x21911720d00>