## DATA MART ASSIGNMENT

### Import Libraries

In [0]:
from functools import reduce
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, DateType, StringType, TimestampType, DateType
from pyspark.sql import *

### Configs and Constants

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy",'Legacy')

# File location and type
LIST_OF_TABLES = [ 
    "user",
    "order",
    "item",
    "event"
]

RAW_PREFIX = "_raw"
STAGE_PREFIX = "_stage"
S3_SOURCE_PREFIX = "s3://data-mart-source/"
S3_OUTPUT_PREFIX = "s3://data-mart-source/databricks_delta_lake/"
FILE_TYPE = "csv"
INFER_SCHEMA = "true"
FIRST_ROW_IS_HEADER = "true"
DELIMETER = ","

## Layer 1
▪ Contains external tables for all prerequisite files </br>
▪ All attributes are of STRING type. No transformations are applied

In [0]:
# Iterating all tables.
for table_name in LIST_OF_TABLES:    
    file_location = f"{S3_SOURCE_PREFIX}{table_name}.{FILE_TYPE}"

    # The applied options are for CSV files. For other file types, these will be ignored.
    df = spark.read.format(FILE_TYPE) \
      .option("header", FIRST_ROW_IS_HEADER) \
      .option("sep", DELIMETER) \
      .option("quote", "\"") \
      .option("escape", "\"") \
      .load(file_location)
    
    # store the data in parquet format and creating external tables.
    df.write.mode("OVERWRITE").option("path", f"{S3_OUTPUT_PREFIX}{table_name}{RAW_PREFIX}").saveAsTable(f"{table_name}{RAW_PREFIX}")
    
    # unpersist the df from the memory
    df.unpersist()

## Layer 2 
▪ Contains all datasets from the first layer </br>
▪ All attributes have common naming convention </br>
▪ All attributes have proper datatypes based on the attribute name and common logic </br>
▪ All struct collection attributes are flattened and transformed to proper data
types </br>
▪ Fact tables are properly partitioned based on meaningful attributes </br>

In [0]:
# All column names in snake case. (Following same convention for all attributes)
column_names = {
    "user":  [
                "created_at",
                "deleted_at",
                "email_address",
                "first_name",
                "id",
                "last_name",
                "merged_at",
                "parent_user_id"
            ],    
    "order": [
                "invoice_id",
                "line_item_id",
                "user_id",
                "item_id",
                "item_name",
                "item_category",
                "price",
                "created_at",
                "paid_at"
            ],
    "item": [
                "adjective",
                "category",
                "created_at",
                "id",
                "modifier",
                "name",
                "price"
            ],
    "event": [
                "event_id",
                "event_time",
                "user_id",
                "payload"
            ]
}

all_dataframes = {}

for table_name in LIST_OF_TABLES:
    df_table=spark.read.table(f"{table_name}{RAW_PREFIX}")
    old_columns = df_table.columns
    new_columns = column_names[table_name]
    stage_df = reduce(lambda df_table, idx: df_table.withColumnRenamed(old_columns[idx], new_columns[idx]), range(len(old_columns)), df_table)
    all_dataframes[table_name] = stage_df


#  All attributes have proper datatypes based on the attribute name and common logic
all_dataframes["user"] = all_dataframes["user"] \
    .withColumn('created_year',year(to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss"))) \
    .withColumn('created_at', to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn('deleted_at', to_timestamp(col('deleted_at'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn('id', col('id').cast(IntegerType())) \
    .withColumn('merged_at', to_timestamp(col('merged_at'), "yyyy-MM-dd HH:mm:ss"))

all_dataframes["order"]= all_dataframes["order"] \
    .withColumn('created_year',year(to_timestamp(col('created_at'), "MM/dd/yyyy HH:mm"))) \
    .withColumn('created_at', to_timestamp(col('created_at'), "MM/dd/yyyy HH:mm")) \
    .withColumn('paid_at', to_timestamp(col('paid_at'), "MM/dd/yyyy HH:mm")) \
    .withColumn('price', col('price').cast(IntegerType())) \
    .withColumn('item_id', col('item_id').cast(IntegerType())) \
    .withColumn('invoice_id', col('invoice_id').cast(IntegerType())) \
    .withColumn('user_id', col('user_id').cast(IntegerType())) \
    .withColumn('line_item_id', col('line_item_id').cast(IntegerType()))

all_dataframes["item"] = all_dataframes["item"] \
    .withColumn('created_year',year(to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss"))) \
    .withColumn('created_at', to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn('id', col('id').cast(IntegerType())) \
    .withColumn('price', col('price').cast(DoubleType()))

all_dataframes["event"] = all_dataframes["event"] \
    .withColumn('created_year',year(to_timestamp(col('event_time'), "yyyy-MM-dd HH:mm:ss"))) \
    .withColumn('event_time', to_timestamp(col('event_time'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn('user_id', col('user_id').cast(IntegerType()))


all_dataframes["event"] = all_dataframes["event"].select(col('event_id'),
                        col('created_year'),
                        col('event_time'),
                        col('user_id'),
                        get_json_object(col('`payload`'), '$.event_name').alias('event_name'),
                        get_json_object(col('`payload`'), '$.parameter_name').alias('parameter_name'),
                        get_json_object(col('`payload`'), '$.platform').alias('platform'),
                        get_json_object(col('`payload`'), '$.parameter_value').alias('parameter_value'))


In [0]:
# partitioning raw tables based on year 
for all_df in all_dataframes.keys():
    print(all_df)
    all_dataframes[all_df].write.option("path", f"{S3_OUTPUT_PREFIX}{all_df}{STAGE_PREFIX}").partitionBy("created_year").mode("OVERWRITE").saveAsTable(f"{all_df}{STAGE_PREFIX}")


d
##Top_Buyers

##### data mart with top 20 customers who contributed on the total sales the most with additional attributes:
o Total sales contributed </br>
o Rank based on the total sales </br>
o Last order creation date </br>
o The overall most viewed item of a customer </br>

In [0]:
#reading event_staging table

event_raw=spark.read.table('event_stage') \
              .select(col('event_id'),
                        col('event_time'),
                        col('user_id'),
                      col('parameter_name'),
                      col('parameter_value'))
                        
event_raw.display()

event_id,event_time,user_id,parameter_name,parameter_value
dde18e8c56e241d3964c97f4f27f2feb,2013-11-06T02:57:17.000+0000,5735,item_id,2580
8c3ff296e9cf42cba8143453bec4ff3d,2013-11-09T19:19:52.000+0000,10358,item_id,1163
05aec475712443a898ea89b2b73f8497,2013-11-09T22:37:57.000+0000,5948,item_id,1409
d0be5a7af54e4b34a2bd9d82aa654015,2013-11-11T04:41:57.000+0000,5948,item_id,1994
f8563221a0b0400494e9a90fbb826810,2013-11-10T19:25:57.000+0000,5948,item_id,680
4f4ad5e8745c488d8759faf77365841e,2013-11-12T11:20:57.000+0000,5948,item_id,2013
8a67a52751ea4522816298309d8ccf21,2013-09-26T21:20:52.000+0000,4813,item_id,2215
4e1ad173d7b1486d89c1f87654a53c9a,2013-09-26T19:17:52.000+0000,4813,item_id,3063
4b319a19c1e44b658255f9e0281225aa,2013-12-02T21:37:26.000+0000,9559,item_id,2808
d26f7f30fe614c738f28b4b14fb799dd,2013-12-07T12:11:17.000+0000,3138,item_id,3436


In [0]:
#extracting all the items viewed most by any user
top_buyers_event=event_raw.select(col('user_id'),
                                 col('parameter_value'),
                                 col('parameter_name')) \
                          .filter(col('parameter_name')=='item_id') \
                          .groupBy(col('user_id'),col('parameter_name'),col('parameter_value')) \
                          .agg(count(col('parameter_value')).alias('total_number_of_times_item_viewed')) \
                          .withColumn('most_viewed_item',dense_rank().over(Window.partitionBy(col('user_id')).orderBy(col('total_number_of_times_item_viewed')))) \
                          .filter(col('most_viewed_item')==1) \
                          .select(col('user_id'),
                                 col('parameter_value').alias('item_id'))
top_buyers_event.display()

user_id,item_id
9,3953
9,3780
9,156
9,3928
17,3266
17,3964
17,3219
34,3605
41,299
41,2794


In [0]:
# Aggregating data and calculating:
# 1) total orders placed by users.
# 2) total sales contributed by users.
# 3) last order date by that user.
# 4) rank of user based on total sales made.

top_buyers_raw=spark.read.table('order_stage') \
                    .select(col('user_id'),
                            col('invoice_id'),
                            col('price'),
                            col('created_at')) \
                    .groupBy(col('user_id')) \
                    .agg(count(col('invoice_id')).alias('total_orders_placed'),sum(col('price')).alias('total_sales_contributed'),max(to_date(col('created_at'),"MM/dd/yyyy HH:mm")).alias('last_order_created')) \
                    .withColumn('rank_of_user_based_on_total_sales',dense_rank().over(Window.orderBy(col('total_sales_contributed').desc()))) \
                    .filter(col('rank_of_user_based_on_total_sales')<=20)
top_buyers_raw.display()

user_id,total_orders_placed,total_sales_contributed,last_order_created,rank_of_user_based_on_total_sales
166460,8,4027,2017-03-31,1
201255,4,3675,2017-08-27,2
226790,10,3637,2018-02-24,3
81386,5,3534,2015-10-28,4
22576,4,3320,2014-05-12,5
117506,3,3306,2016-05-25,6
268611,4,3300,2018-04-15,7
105080,5,3142,2016-06-21,8
17805,6,3099,2014-05-30,9
180997,4,3082,2017-04-28,10


In [0]:
#appending most viewed item for that particular user
top_buyers=top_buyers_raw.join(top_buyers_event,['user_id'],how='left') \
                         .groupby('user_id','total_orders_placed','total_sales_contributed','rank_of_user_based_on_total_sales','last_order_created') \
                         .agg (collect_list('item_id').alias('most_viewed_item_id'))
top_buyers.display()


user_id,total_orders_placed,total_sales_contributed,rank_of_user_based_on_total_sales,last_order_created,most_viewed_item_id
166460,8,4027,1,2017-03-31,"List(2468, 537, 2516, 2714, 2694, 1257, 2821, 1322, 2292, 2899, 1981, 3769)"
201255,4,3675,2,2017-08-27,"List(1543, 2993, 2354, 2417)"
226790,10,3637,3,2018-02-24,"List(3469, 3280, 1679, 2354, 2647, 1295, 2855, 3509, 2676, 3223)"
81386,5,3534,4,2015-10-28,"List(537, 2257, 2354, 3601, 2810, 3279, 104, 105, 3041)"
22576,4,3320,5,2014-05-12,"List(1083, 698, 669, 2515)"
117506,3,3306,6,2016-05-25,"List(2354, 2292, 105)"
268611,4,3300,7,2018-04-15,"List(3466, 1510, 2354, 3884, 1771)"
105080,5,3142,8,2016-06-21,"List(1405, 373, 553, 326, 669, 2901, 131, 1409, 2062)"
17805,6,3099,9,2014-05-30,"List(1003, 3691, 3698, 3341, 466, 2354, 173, 3952)"
180997,4,3082,10,2017-04-28,"List(2354, 41, 2550, 2577)"


## Top_Items

##### “top_items” data mart with all sold items with additional attributes:
o For every year (based on the created_at attribute): <br>
-- Total number of an items sold in a particular year <br>
-- Rank of an item based on the total number of items sold in a particular year <br>
-- Total sales from an item in a particular year <br>
-- Rank of an item based on the total sales in a particular year <br>
o Total number of items sold in all years <br>
o Rank of an item based on the total number of sales <br>
o Total sales of an item in all years <br>
o Rank of an item based on the total sales <br>

### For a particular year
 takes *order_year* as parameter declared as a widget variable in the notebook

In [0]:
# Total number of items sold in a partuclar year
items_raw=spark.read.table('item_stage')
total_number_of_items_sold_in_year=items_raw.select(col('id').alias('item_id'),
                                                   year(to_timestamp(col('created_at'),"MM/dd/yyyy HH:mm")).alias('order_year')) \
                                            .groupBy(col('order_year')) \
                                            .agg(count(col('item_id')).alias('total_number_of_items_sold'))
total_number_of_items_sold_in_year.display()

order_year,total_number_of_items_sold
2013,1861
2014,337


In [0]:
# Total sales from an item in a particular year
total_sales_of_items_sold_in_year=items_raw.select(col('price'),
                                                   year(to_timestamp(col('created_at'),"MM/dd/yyyy HH:mm")).alias('order_year')) \
                                            .groupBy(col('order_year')) \
                                            .agg(sum(col('price')).alias('total_sales_of_items_sold'))
total_sales_of_items_sold_in_year.display()

order_year,total_sales_of_items_sold
2013,182348.325
2014,33401.68500000002


In [0]:
# Rank of an item based on the total no of items sold in a particular year

rank_items_total_no_sold=items_raw.select(col('id').alias('item_id'),
                                                   year(to_timestamp(col('created_at'),"MM/dd/yyyy HH:mm")).alias('order_year')) \
                                            .groupBy(col('item_id'),col('order_year')) \
                                            .agg(count(col('item_id')).alias('total_number_of_items_sold')) \
                                            .withColumn('rank_of_an_item',dense_rank().over(Window.partitionBy('order_year').orderBy(col('total_number_of_items_sold').desc()))) 
rank_items_total_no_sold.display()

item_id,order_year,total_number_of_items_sold,rank_of_an_item
3148,2013,1,1
2906,2013,1,1
3914,2013,1,1
570,2013,1,1
129,2013,1,1
2730,2013,1,1
222,2013,1,1
2075,2013,1,1
3841,2013,1,1
3077,2013,1,1


In [0]:
# Rank of an item based on the total sales in a particular year

rank_item_total_sales_made=items_raw.select(col('id').alias('item_id'),
                                                   year(to_timestamp(col('created_at'),"MM/dd/yyyy HH:mm")).alias('order_year')) \
                                            .groupBy(col('item_id'),col('order_year')) \
                                            .agg(count(col('item_id')).alias('total_number_of_items_sold')) \
                                            .withColumn('rank_of_an_item',dense_rank().over(Window.partitionBy('order_year').orderBy(col('total_number_of_items_sold').desc()))) 
rank_item_total_sales_made.display()

item_id,order_year,total_number_of_items_sold,rank_of_an_item
3148,2013,1,1
2906,2013,1,1
3914,2013,1,1
570,2013,1,1
129,2013,1,1
2730,2013,1,1
222,2013,1,1
2075,2013,1,1
3841,2013,1,1
3077,2013,1,1


In [0]:
#Total number of items sold in all years

total_number_of_items_sold=items_raw.select(count(col('id').alias('item_id')).alias('total_number_of_items_sold_in_all_years'))
total_number_of_items_sold.display()

total_number_of_items_sold_in_all_years
2198


In [0]:
#Total sales of an item in all years
total_sales_in_all_years=items_raw.select(col('price'),col('id').alias('item_id')) \
                                  .groupBy(col('item_id')) \
                                  .agg(sum(col('price')).alias('total_sales_of_items_sold'))
total_sales_in_all_years.display()

item_id,total_sales_of_items_sold
2122,31.25
3794,120.0
1645,58.74
1238,7.5
1342,600.0
3918,127.27
1088,198.00000000000003
2366,42.0
2142,0.0
471,30.0


In [0]:
# Rank of an item based on the total number of sales

item_rank_no_item_sold=items_raw.select(col('id').alias('item_id')) \
                                            .groupBy(col('item_id')) \
                                            .agg(count(col('item_id')).alias('total_number_of_items_sold')) \
                                            .withColumn('rank_of_an_item',dense_rank().over(Window.orderBy(col('total_number_of_items_sold').desc())))
item_rank_no_item_sold.display()

item_id,total_number_of_items_sold,rank_of_an_item
2122,1,1
3794,1,1
1645,1,1
1238,1,1
1342,1,1
3918,1,1
1088,1,1
2366,1,1
2142,1,1
471,1,1


In [0]:
#Rank of an item based on the total sales

item_rank_total_sales=items_raw.select(col('id').alias('item_id'),col('Price').alias('price')) \
                                            .groupBy(col('item_id')) \
                                            .agg(sum(col('price')).alias('total_sales_of_items_sold')) \
                                            .withColumn('rank_of_an_item',dense_rank().over(Window.orderBy(col('total_sales_of_items_sold').desc())))
item_rank_total_sales.display()

item_id,total_sales_of_items_sold,rank_of_an_item
2354,2250.0,1
669,1875.0,2
2592,1250.0,3
2292,990.0,4
3241,937.5,5
690,937.5,5
844,900.0,6
2326,900.0,6
817,900.0,6
3658,900.0,6
