# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [None]:
%timeout 15
%idle_timeout 15
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

#### Create a new data base imba_parquet ####

In [None]:
import boto3
# Initialize the Glue client
glue_client = boto3.client('glue')

# Create a new database
response = glue_client.create_database(
    DatabaseInput={
        'Name': 'imba_parquet',
        'Description': 'a new database that points to the parquet files',
    }
)

## Leverage GlueContext to convert newly uploaded csv data files to parquet.

**(need to consider the choices for updateBehavior, partitionKeys, enableUpdateCatalog)**

**aisles**

In [None]:
# read metadata from database='imba'
aisles_read = glueContext.create_dynamic_frame. \
    from_catalog(database="imba2", table_name="aisles", transformation_ctx="S3_out")

# write csv file to parquet file
aisles_parquet = glueContext. \
getSink(path="s3://imba-tgou1055/data3/aisles/", connection_type="s3", \
        updateBehavior="LOG", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3_in")
aisles_parquet.setCatalogInfo(catalogDatabase="imba_parquet",catalogTableName="aisles")
aisles_parquet.setFormat("glueparquet", compression="snappy")
aisles_parquet.writeFrame(aisles_read)

**departments**

In [None]:
# read metadata from database='imba'
departments_read = glueContext.create_dynamic_frame. \
    from_catalog(database="imba2", table_name="departments", transformation_ctx="S3_out")

# write csv file to parquet file
departments_parquet = glueContext. \
getSink(path="s3://imba-tgou1055/data3/departments/", connection_type="s3", \
        updateBehavior="LOG", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3_in")
departments_parquet.setCatalogInfo(catalogDatabase="imba_parquet",catalogTableName="departments")
departments_parquet.setFormat("glueparquet", compression="snappy")
departments_parquet.writeFrame(departments_read)


**orders**

In [None]:
# read metadata from database='imba'
orders_read = glueContext.create_dynamic_frame. \
    from_catalog(database="imba2", table_name="orders", transformation_ctx="S3_out")

# write csv file to parquet file
orders_parquet = glueContext. \
getSink(path="s3://imba-tgou1055/data3/orders/", connection_type="s3", \
        updateBehavior="LOG", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3_in")
orders_parquet.setCatalogInfo(catalogDatabase="imba_parquet",catalogTableName="orders")
orders_parquet.setFormat("glueparquet", compression="snappy")
orders_parquet.writeFrame(orders_read)

**products**

In [None]:
# read metadata from database='imba'
products_read = glueContext.create_dynamic_frame. \
    from_catalog(database="imba2", table_name="products", transformation_ctx="S3_out")

# write csv file to parquet file
products_parquet = glueContext. \
getSink(path="s3://imba-tgou1055/data3/products/", connection_type="s3", \
        updateBehavior="LOG", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3_in")
products_parquet.setCatalogInfo(catalogDatabase="imba_parquet",catalogTableName="products")
products_parquet.setFormat("glueparquet", compression="snappy")
products_parquet.writeFrame(products_read)

**order_products**

In [None]:
# read metadata from database='imba'
order_products_read = glueContext.create_dynamic_frame. \
    from_catalog(database="imba2", table_name="order_products", transformation_ctx="S3_out")

# write csv file to parquet file
order_products_parquet = glueContext. \
getSink(path="s3://imba-tgou1055/data3/order_products/", connection_type="s3", \
        updateBehavior="LOG", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3_in")
order_products_parquet.setCatalogInfo(catalogDatabase="imba_parquet",catalogTableName="order_products")
order_products_parquet.setFormat("glueparquet", compression="snappy")
order_products_parquet.writeFrame(order_products_read)

In [None]:
# commit job
job.commit()

## Perform Spark transformations of assignment 1

In [None]:
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import avg, sum, min, max, round, count, when, col, countDistinct, desc, asc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [None]:
"""
Q1. Create a table called order_products_prior by using the last SQL query you created from the
previous assignment. It should be similar to below (note you need to replace the s3 bucket
name “imba” to yours own bucket name):

    CREATE TABLE order_products_prior AS
        (SELECT a.
        *
        ,
        b.product_id,
        b.add_to_cart_order,
        b.reordered
        FROM orders a
        JOIN order_products b
        ON a.order_id = b.order_id
        WHERE a.eval_set = 'prior')
"""
orders_df = spark.read.parquet("s3://imba-tgou1055/data3/orders/")
order_products_df = spark.read.parquet("s3://imba-tgou1055/data3/order_products/")

order_products_prior_df = orders_df.filter(
                          orders_df.eval_set == 'prior').join(order_products_df, \
                          orders_df.order_id == order_products_df.order_id, 'inner').select(
                          orders_df["*"], order_products_df.product_id, order_products_df.add_to_cart_order, order_products_df.reordered)

parquet_file_path = "s3://imba-tgou1055/data3/order_products_prior/"
order_products_prior_df.write.mode("overwrite").parquet(parquet_file_path)

# load parquet file
order_products_prior_df = spark.read.parquet(parquet_file_path)
print("transformation job finished")

In [None]:
"""
Q2.Create a SQL query (user_features_1). Based on table orders, for each user, calculate the
max order_number, the sum of days_since_prior_order and the average of
days_since_prior_order.

SELECT user_id,
       MAX(order_number) as max_order_number, 
       CAST(SUM(days_since_prior_order) AS INT) as sum_days_prior,
       ROUND(AVG(days_since_prior_order),2) as avg_days_prior
FROM orders 
GROUP BY user_id 
ORDER BY user_id;

"""

orders_df = spark.read.parquet("s3://imba-tgou1055/data3/orders/")

user_features_1_df = orders_df.withColumn("days_since_prior_order", orders_df["days_since_prior_order"].cast(IntegerType()) ). \
    groupBy("user_id").agg(
    max("order_number").alias("max_order_number"),
    sum("days_since_prior_order").alias("sum_days_prior"),
    round(avg("days_since_prior_order"),).alias("avg_days_prior") )

parquet_file_path = "s3://imba-tgou1055/data3/user_features_1/"
user_features_1_df.write.mode("overwrite").parquet(parquet_file_path)
#user_features_1_df = spark.read.parquet(parquet_file_path)

print("transformation job finished")

In [None]:
"""
Q3.Create a SQL query (user_features_2). Similar to above, based on table
order_products_prior, for each user calculate the total number of products, total number of
distinct products, and user reorder ratio(number of reordered = 1 divided by number of
order_number > 1)


WITH user_ratio AS (SELECT user_id, 
			   COUNT(*) as product_bought, 
               COUNT(DISTINCT(product_id)) as unique_product_bought, 
			   COUNT(CASE WHEN reordered = 1 THEN 1 ELSE NULL END) as num_reordered, 
               COUNT(CASE WHEN order_number > 1 THEN 1 ELSE NULL END) as num_order_number
		    FROM order_products_prior
		    GROUP BY user_id
		    ORDER BY user_id) SELECT user_id, 
 					     product_bought, 
					     unique_product_bought, 
					     num_reordered, num_order_number, 
					     ROUND(CAST(num_reordered AS DOUBLE) / num_order_number ,4) AS reorder_ratio 
				      FROM user_ratio

"""

user_features_2_df = order_products_prior_df.groupBy("user_id").agg(
    count("*").alias("num_product_bought"),
    countDistinct("product_id").alias("num_distinct_product_bought"),
    round(count(when(col("reordered") == 1, True)) / count(when(col("order_number") > 1, True)),4).alias("reordered_ratio")
)

parquet_file_path = "s3://imba-tgou1055/data3/user_features_2/"
user_features_2_df.write.mode("overwrite").parquet(parquet_file_path)
user_features_2_df = spark.read.parquet(parquet_file_path)
print("transformation job finished")

In [None]:
"""
Q4:
    Create a SQL query (up_features). Based on table order_products_prior, for each user and
    product, calculate the total number of orders, minimum order_number, maximum
    order_number and average add_to_cart_order.

    SELECT user_id, 
       product_id, 
       COUNT(*) as num_of_orders, 
       MIN(order_number) as min_order_num, 
       MAX(order_number) as max_order_num, 
       ROUND(AVG(add_to_cart_order),2) as seq_add_to_order
    FROM order_products_prior
    GROUP BY user_id, product_id
    ORDER BY user_id, product_id;
"""

up_features_df = order_products_prior_df.groupBy("user_id","product_id").agg(
    count("*").alias("number_of_orders"),
    min("order_number").alias("min_order_num"),
    max("order_number").alias("max_order_num"),
    round(avg("add_to_cart_order"),2).alias("seq_add_to_order")
)

parquet_file_path = "s3://imba-tgou1055/data3/up_features/"
up_features_df.write.mode("overwrite").parquet(parquet_file_path)
up_features_df = spark.read.parquet(parquet_file_path)
print("transformation job finished")

In [None]:
"""
Q5. Create a SQL query (prd_features). Based on table order_products_prior, first write a sql
query to calculate the sequence of product purchase for each user, and name it
product_seq_time. Then on top of this query, for each product, calculate the count, sum of reordered, count of
product_seq_time = 1 and count of product_seq_time = 2.

WITH product_seq AS (SELECT user_id, 
			    order_number, 
			    product_id,
			    ROW_NUMBER() OVER (PARTITION BY user_id, product_id ORDER BY order_number ASC) AS product_seq_time,
			    reordered
		     FROM order_products_prior
		     ORDER BY user_id, order_number, product_seq_time) SELECT product_id, 
									      COUNT(*) AS num_product_ordered, 
									      SUM(reordered) as sum_reordered, 
									      COUNT(CASE WHEN product_seq_time = 1 THEN 1 ELSE NULL END) as seq_is_one, 
									      COUNT(CASE WHEN product_seq_time = 2 THEN 1 ELSE NULL END) as seq_is_two
									      FROM product_seq
									      GROUP BY product_id
									      ORDER BY product_id;
"""

# Define a Window specification to partition and order the data
windowSpec = Window.partitionBy("user_id", "product_id").orderBy("order_number")
prd_features_df = order_products_prior_df.withColumn("product_seq_time", row_number().over(windowSpec))

prd_features_df = prd_features_df.groupBy("product_id").agg (
                count("*").alias("num_product_ordered"),
                sum("reordered").alias("sum_reordered"),
                count(when(col("product_seq_time") == 1, True)).alias("seq_is_one"),
                count(when(col("product_seq_time") == 2, True)).alias("seq_is_two")
)

parquet_file_path = "s3://imba-tgou1055/data3/prd_features/"
prd_features_df.write.mode("overwrite").parquet(parquet_file_path)
prd_features_df = spark.read.parquet(parquet_file_path)
print("transformation job finished")

In [None]:
# stop sparkSession
spark.stop()