In [1]:
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from io import StringIO 

import pandas as pd
import numpy as np
import psycopg2
import psycopg2.extras as extras

import csv

In [2]:
#VARIABLES

#Postgres credentials
pw = '[password]'
db = "[database]" 
user='[user]'
host='[host]'
port = '5432'


#s3 credentials
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

bucket_name =  "[bucket]"
Prefix='orders_data/'
suffix = 'analytics_export/'
res = s3.list_objects_v2(
    Bucket=bucket_name,
    Prefix=Prefix)
    

#Storage Locations - LOCAL
#source_path = "../data/Webpages/"
#save_path = "output/"
   

#Storage Locations - CLOUD
source_path = 's3://'+bucket_name+'/'+Prefix
save_path = 'analytics_export/byroneji4734/'




In [3]:
def create_server_connection(host_name, user_name, user_password):
    connection = None
    try:

        connection = psycopg2.connect(
            database=db,
            user=user,
            password=pw,
            host=host,
            port=port
        )
        
        print("PostgreSql Database connection successful")
    except Error as err:
        print(f"Error: '{err}'")

    return connection

In [4]:
#List and Download files from data lake

#def download_csv_from_s3_bucket():
    file_list = []
    for content in res.get('Contents', [])[1:]:
        trunc_key = (content['Key'].rsplit(Prefix))
        trunc_key = (','.join(trunc_key[1:]))
        #print(trunc_key)
        s3.download_file(bucket_name,content['Key'], trunc_key)
        file_list.append(trunc_key[:-4]) #create list of files ignoring the file extension(.csv)

    for file in file_list:
        s3.download_file(bucket_name, "orders_data/"+file+".csv", file+".csv")
    return file_list

In [4]:
def read_and_convert_csv_to_dataframe(df_name,csv_path):

    
    if df_name == 'shipment_deliveries':
        obj = s3.get_object(Bucket= bucket_name, Key= Prefix+csv_path)
        df_name = pd.read_csv(obj['Body']) 
        df_name = pd.DataFrame(df_name)
        #replace NaN values with a weird date string to enable SQL accept the entry
        df_name = df_name.replace(np.nan, '1000-01-01',regex = True) 
   
    else:
        obj = s3.get_object(Bucket= bucket_name, Key= Prefix+csv_path)
        df_name = pd.read_csv(obj['Body']) 
        df_name = pd.DataFrame(df_name)
        
    return df_name  


### LOAD DATA TO STAGING SCHEMA

In [5]:
def load_df_to_postgres(df, table):

    tuples = [list(row) for row in df.itertuples(index=False)]
 

    cols = ','.join(list(df.columns))
    
    # SQL query to execute
    query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)                   
    cursor = connection.cursor()
    
    try:
        extras.execute_values(cursor, query, tuples)
        #cursor.execute(query)
        connection.commit()
      
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        connection.rollback()
        cursor.close()
        return 1
    except psycopg2.InterfaceError:
        pass
    print("load to postgresql successful")
    cursor.close()

# DATA TRANSFORMATION AND ANALYTICS

In [6]:
def update_postgres_for_weird_dates(query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        connection.commit()
        print('weird dates casted to Null')
    except Error as err:
        print(f"Error: '{err}'")

In [7]:
def read_and_convert_query_to_dataframe(query):
    cursor = connection.cursor()
    result = None
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        #get column names
        col_names = [i[0] for i in cursor.description ]
        result = pd.DataFrame(result,columns=col_names)
        return result
    except Error as err:
        print(f"Error: '{err}'")

In [8]:
query_update_for_null_shipment_date = """
UPDATE byroneji4734_staging.shipment_deliveries
    SET shipment_date = Null
WHERE shipment_date = '1000-01-01';
"""

In [9]:
query_update_for_null_delivery_date = """
UPDATE byroneji4734_staging.shipment_deliveries
    SET delivery_date = Null
WHERE delivery_date = '1000-01-01';
"""

In [10]:
query_agg_shipments = """
SELECT CURRENT_DATE AS ingestion_date, tt_late_shipments,  tt_undelivered_items
FROM 
(
    SELECT count(orders.order_date) AS tt_late_shipments
    FROM byroneji4734_staging.shipment_deliveries

    JOIN byroneji4734_staging.orders
    ON orders.order_id = shipment_deliveries.order_id
    WHERE shipment_date >= (orders.order_date + 6)
    AND shipment_deliveries.delivery_date IS Null
) 
    as x, 

(

    SELECT count(orders.order_date) as tt_undelivered_items
    FROM byroneji4734_staging.shipment_deliveries

    JOIN byroneji4734_staging.orders
    ON orders.order_id = shipment_deliveries.order_id
    WHERE shipment_date IS Null
    AND shipment_deliveries.delivery_date IS Null
    AND (order_date) + 15 = '2022-09-05'

) 

as y
"""

In [11]:
query_agg_public_holiday = """
SELECT
    CURRENT_DATE AS ingestion_date, 
    count(order_id) FILTER (WHERE month_of_the_year_num = 1) AS "tt_order_hol_jan",
    count(order_id) FILTER (WHERE month_of_the_year_num = 2) AS "tt_order_hol_feb",
    count(order_id) FILTER (WHERE month_of_the_year_num = 3) AS "tt_order_hol_mar",
    count(order_id) FILTER (WHERE month_of_the_year_num = 4) AS "tt_order_hol_apr",
    count(order_id) FILTER (WHERE month_of_the_year_num = 5) AS "tt_order_hol_may",
    count(order_id) FILTER (WHERE month_of_the_year_num = 6) AS "tt_order_hol_jun",
    count(order_id) FILTER (WHERE month_of_the_year_num = 7) AS "tt_order_hol_jul",
    count(order_id) FILTER (WHERE month_of_the_year_num = 8) AS "tt_order_hol_aug",
    count(order_id) FILTER (WHERE month_of_the_year_num = 9) AS "tt_order_hol_sep",
    count(order_id) FILTER (WHERE month_of_the_year_num = 10) AS "tt_order_hol_oct",
    count(order_id) FILTER (WHERE month_of_the_year_num = 11) AS "tt_order_hol_nov",
    count(order_id) FILTER (WHERE month_of_the_year_num = 12) AS "tt_order_hol_dec"
FROM if_common.dim_dates
JOIN byroneji4734_staging.orders
ON order_date = dim_dates.calendar_dt
WHERE orders.order_date IN(

    SELECT dim_dates.calendar_dt
    FROM if_common.dim_dates
    WHERE dim_dates.calendar_dt >= CURRENT_DATE - INTERVAL '1 year'
    AND dim_dates.calendar_dt <= CURRENT_DATE
    AND working_day = False
    AND day_of_the_week_num BETWEEN 1 AND 5
    GROUP BY month_of_the_year_num, dim_dates.calendar_dt
    ORDER BY month_of_the_year_num
)

"""

In [12]:
query_best_performing_product = """
SELECT *

     

FROM

(

SELECT CURRENT_DATE AS ingestion_date, product_name, calendar_dt AS most_ordered_day,
     bool_and(working_day = False AND day_of_the_week_num BETWEEN 1 AND 5)AS is_public_holiday,
     sum(review) AS tt_review_points


FROM if_common.dim_dates

FULL JOIN byroneji4734_staging.shipment_deliveries
ON shipment_deliveries.delivery_date = calendar_dt


JOIN byroneji4734_staging.reviews
ON reviews.product_id = 
(
    SELECT reviews.product_id
    FROM byroneji4734_staging.reviews
    GROUP BY reviews.product_id
    ORDER BY sum(review) DESC
    LIMIT 1


)

LEFT JOIN if_common.dim_products
ON dim_products.product_id = reviews.product_id

WHERE dim_dates.calendar_dt = 
(

SELECT order_date
FROM byroneji4734_staging.orders


WHERE orders.product_id = 
(
    SELECT reviews.product_id
    FROM byroneji4734_staging.reviews
    GROUP BY reviews.product_id
    ORDER BY sum(review) DESC
    LIMIT 1
    
)

GROUP BY  order_date, orders.product_id
ORDER BY sum (quantity) DESC
LIMIT 1

)
GROUP BY calendar_dt, product_name, reviews.product_id

) AS a,



(

SELECT 
                  ROUND(COUNT(*) * 1.0 / SUM(COUNT(*)) 
          OVER (PARTITION BY review), 3) as pct_one_star_review

FROM byroneji4734_staging.reviews
WHERE review = 1
GROUP BY  product_id, review
LIMIT 1
     
) AS b,


(

SELECT 
                  ROUND(COUNT(*) * 1.0 / SUM(COUNT(*)) 
          OVER (PARTITION BY review=1), 3) as pct_two_star_review

FROM byroneji4734_staging.reviews
WHERE review = 2
GROUP BY  product_id, review
LIMIT 1
     
) AS c,


(

SELECT 
                  ROUND(COUNT(*) * 1.0 / SUM(COUNT(*)) 
          OVER (PARTITION BY review), 3) as pct_three_star_review

FROM byroneji4734_staging.reviews
WHERE review = 3
GROUP BY  product_id, review
LIMIT 1
     
) AS d,


(

SELECT 
                  ROUND(COUNT(*) * 1.0 / SUM(COUNT(*)) 
          OVER (PARTITION BY review), 3) as pct_four_star_review

FROM byroneji4734_staging.reviews
WHERE review = 4
GROUP BY  product_id, review
LIMIT 1
     
) AS e,


(

SELECT 
                  ROUND(COUNT(*) * 1.0 / SUM(COUNT(*)) 
          OVER (PARTITION BY review), 3) as pct_five_star_review

FROM byroneji4734_staging.reviews
WHERE review = 5
GROUP BY  product_id, review
LIMIT 1
     
) AS f,


(


     SELECT    1 - (count(orders.order_id) FILTER (WHERE shipment_date >= (orders.order_date + 6)
                AND shipment_deliveries.delivery_date IS Null AND orders.product_id = (

                            SELECT reviews.product_id
                            FROM byroneji4734_staging.reviews
                            GROUP BY reviews.product_id
                            ORDER BY sum(review) DESC
                            LIMIT 1

                )
                
                )/CAST(COUNT(orders.order_id) AS float)) AS pct_early_shipments,


                ((count(orders.order_id) FILTER (WHERE shipment_date >= (orders.order_date + 6)
                AND shipment_deliveries.delivery_date IS Null AND orders.product_id = (

                            SELECT reviews.product_id
                            FROM byroneji4734_staging.reviews
                            GROUP BY reviews.product_id
                            ORDER BY sum(review) DESC
                            LIMIT 1

                )
                
                )/CAST(COUNT(orders.order_id) AS float))) AS pct_late_shipments        
    


    FROM byroneji4734_staging.shipment_deliveries

    JOIN byroneji4734_staging.orders
    ON orders.order_id = shipment_deliveries.order_id
    WHERE shipment_date >= (orders.order_date + 6)
    AND shipment_deliveries.delivery_date IS Null





) as g

"""

In [13]:
query_all_agg_public_holiday = """
SELECT *
FROM byroneji4734_analytics.agg_public_holiday
"""

In [14]:
query_all_agg_shipments = """
SELECT *
FROM byroneji4734_analytics.agg_shipments
"""

In [15]:
query_all_best_performing_product = """
SELECT *
FROM byroneji4734_analytics.best_performing_product
"""

LOADING ANALYSIS DATAFRAME TO S3 BUCKET AS CSV

In [16]:
def upload_to_dataframe_bucket_as_csv(df_file,file_name):
    """Upload a file to an S3 bucket

    :param df_file: Dataframe to process
    :param file_name: Desired name of file in the bucket

    """

    # Upload the file
    
    csv_buffer = StringIO()
    df_file.to_csv(csv_buffer)
    s3_resource = boto3.resource('s3')
    
    try:
        s3_resource.Object(bucket_name, save_path+file_name+'.csv').put(Body=csv_buffer.getvalue())
        print('Successfully uploaded dataframe as CSV to S3 bucket')
    except :
        #logging.error(e)
        print('failed to upload csv to s3')
        return False
    

In [18]:
if __name__ == "__main__":

    connection = create_server_connection(host, user, pw)

    #download_csv_from_s3_bucket()

    orders = read_and_convert_csv_to_dataframe('orders','orders.csv')

    reviews = read_and_convert_csv_to_dataframe('reviews','reviews.csv')

    shipment_deliveries = read_and_convert_csv_to_dataframe('shipment_deliveries','shipment_deliveries.csv')

    load_df_to_postgres(orders, 'byroneji4734_staging.orders')

    load_df_to_postgres(reviews, 'byroneji4734_staging.reviews')

    load_df_to_postgres(shipment_deliveries, 'byroneji4734_staging.shipment_deliveries')

    update_postgres_for_weird_dates(query_update_for_null_shipment_date)

    update_postgres_for_weird_dates(query_update_for_null_delivery_date)

    agg_shipments = read_and_convert_query_to_dataframe(query_agg_shipments)

    agg_public_holiday = read_and_convert_query_to_dataframe(query_agg_public_holiday)

    best_performing_product = read_and_convert_query_to_dataframe(query_best_performing_product)


    load_df_to_postgres(agg_shipments, 'byroneji4734_analytics.agg_shipments')

    load_df_to_postgres(agg_public_holiday, 'byroneji4734_analytics.agg_public_holiday')

    load_df_to_postgres(best_performing_product, 'byroneji4734_analytics.best_performing_product')



    all_agg_shipments = read_and_convert_query_to_dataframe(query_all_agg_shipments)

    all_agg_public_holiday = read_and_convert_query_to_dataframe(query_all_agg_public_holiday)

    all_best_performing_product = read_and_convert_query_to_dataframe(query_all_best_performing_product)


    upload_to_dataframe_bucket_as_csv(all_agg_shipments,'agg_shipments')

    upload_to_dataframe_bucket_as_csv(all_agg_public_holiday,'agg_public_holiday')

    upload_to_dataframe_bucket_as_csv(all_best_performing_product,'best_performing_product')


    
    

PostgreSql Database connection successful
Error: duplicate key value violates unique constraint "orders_pkey"
DETAIL:  Key (order_id)=(1) already exists.

load to postgresql successful
Error: duplicate key value violates unique constraint "shipment_deliveries_pkey"
DETAIL:  Key (shipment_id)=(1) already exists.

weird dates casted to Null
weird dates casted to Null
Error: duplicate key value violates unique constraint "agg_shipments_pkey"
DETAIL:  Key (ingestion_date)=(2022-10-02) already exists.

Error: duplicate key value violates unique constraint "agg_public_holiday_pkey"
DETAIL:  Key (ingestion_date)=(2022-10-02) already exists.

Error: duplicate key value violates unique constraint "best_performing_product_pkey"
DETAIL:  Key (ingestion_date)=(2022-10-02) already exists.

Successfully uploaded dataframe as CSV to S3 bucket
Successfully uploaded dataframe as CSV to S3 bucket
Successfully uploaded dataframe as CSV to S3 bucket
