## DATALAKE CREATION

- **Name:** Vikranth Ale
- **Email:** alevikranth407@gmail.com
- **Date:** July 24, 2023

In [0]:
# Importing required libraries for Layer 1
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
import json
from pyspark.sql import Row, Column
import pyspark.sql.functions as F


# Spark Configurations
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

spark = SparkSession.builder.appName("TopItemsNCustomers").getOrCreate()

In [0]:
# Checking available datasets (csv files)
dbutils.fs.ls("dbfs:/FileStore/tables")

Out[156]: [FileInfo(path='dbfs:/FileStore/tables/event.csv', name='event.csv', size=155970517, modificationTime=1689721794000),
 FileInfo(path='dbfs:/FileStore/tables/item.csv', name='item.csv', size=184207, modificationTime=1689721138000),
 FileInfo(path='dbfs:/FileStore/tables/order.csv', name='order.csv', size=4413105, modificationTime=1689721111000),
 FileInfo(path='dbfs:/FileStore/tables/user.csv', name='user.csv', size=8305657, modificationTime=1689721911000)]

### LAYER 1
- 1. Load RAW datasets
- 2. Verify the datatypes

##### LOADING RAW FILES AND CREATING SPARK DATAFRAMES

In [0]:
# Load csv files

file_location = [dataset.path for dataset in dbutils.fs.ls("dbfs:/FileStore/tables")[::-1]]

print(file_location)
print('')

file_type = "csv"
first_row_is_header = "True"
infer_schema = "False"
delimiter = ","

users_df = spark.read.format(file_type) \
          .option("inferSchema", infer_schema) \
          .option("header", first_row_is_header) \
          .option("sep", delimiter) \
          .load(file_location[0])
            
orders_df = spark.read.format(file_type) \
          .option("inferSchema", infer_schema) \
          .option("header", first_row_is_header) \
          .option("sep", delimiter) \
          .load(file_location[1])

items_df = spark.read.format(file_type) \
          .option("inferSchema", infer_schema) \
          .option("header", first_row_is_header) \
          .option("sep", delimiter) \
          .load(file_location[2])

events_df = spark.read.format(file_type) \
          .option("inferSchema", infer_schema) \
          .option("header", first_row_is_header) \
          .option("sep", delimiter) \
          .option("escape", "\"")   \
          .load(file_location[3])


['dbfs:/FileStore/tables/user.csv', 'dbfs:/FileStore/tables/order.csv', 'dbfs:/FileStore/tables/item.csv', 'dbfs:/FileStore/tables/event.csv']



##### VERIFYING THE SCHEMA FOR ALL THE DATASETS

In [0]:
# Printing the schema for USERS
print("SCHEMA FOR DATASET : {}".format("USERS"))
users_df.printSchema()

SCHEMA FOR DATASET : USERS
root
 |-- created_at: string (nullable = true)
 |-- deleted_at: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- merged_at: string (nullable = true)
 |-- parent_user_id: string (nullable = true)



In [0]:
# visualize the dataset
users_df.show(5)

+-------------------+----------+--------------------+----------+--------+---------+---------+--------------+
|         created_at|deleted_at|       email_address|first_name|      id|last_name|merged_at|parent_user_id|
+-------------------+----------+--------------------+----------+--------+---------+---------+--------------+
|2014-12-20 07:07:45|      null|ArataHopper@earth...|     Arata| 51590.0|   Hopper|     null|          null|
|2016-10-14 05:39:20|      null|Riya_Gruber1974@m...|      Riya|158891.0|   Gruber|     null|          null|
|2017-01-21 10:20:09|      null|Peter_Sousa@yahoo...|     Peter|179949.0|    Sousa|     null|          null|
|2015-10-30 21:31:30|      null|D_Kowalski1962@gm...|         D| 98000.0| Kowalski|     null|          null|
|2015-10-24 16:27:27|      null| SNovak1966@mail.com|         S| 93994.0|    Novak|     null|          null|
+-------------------+----------+--------------------+----------+--------+---------+---------+--------------+
only showing top 5 

In [0]:
# Printing the schema for ORDERS
print("SCHEMA FOR DATASET : {}".format("ORDERS"))
orders_df.printSchema()

SCHEMA FOR DATASET : ORDERS
root
 |-- InvoiceId: string (nullable = true)
 |-- LineItemId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- ItemName: string (nullable = true)
 |-- ItemCategory: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- CreatedAt: string (nullable = true)
 |-- PaidAt: string (nullable = true)



In [0]:
# visualize the dataset
orders_df.show(5)

+---------+----------+------+------+--------------------+------------+-----+---------------+---------------+
|InvoiceId|LineItemId|UserId|ItemId|            ItemName|ItemCategory|Price|      CreatedAt|         PaidAt|
+---------+----------+------+------+--------------------+------------+-----+---------------+---------------+
|   192320|     83118|178481|  3526|   digital apparatus|   apparatus|  330|6/28/2017 21:14|6/27/2017 21:19|
|   192320|    207309|178481|  1514|miniature apparat...|   apparatus|   99|6/28/2017 21:14|6/27/2017 21:19|
|   192320|    392027|178481|  3712|miniature apparat...|   apparatus|   99|6/28/2017 21:14|6/27/2017 21:19|
|    80902|    243831|154133|  3586|reflective instru...|  instrument| 57.2| 10/9/2016 6:57|10/7/2016 10:08|
|    80902|    399806|154133|  1061|extra-strength in...|  instrument| 17.6| 10/9/2016 6:57|10/7/2016 10:08|
+---------+----------+------+------+--------------------+------------+-----+---------------+---------------+
only showing top 5 

In [0]:
# Printing the schema for ITEMS
print("SCHEMA FOR DATASET : {}".format("ITEMS"))
items_df.printSchema()

SCHEMA FOR DATASET : ITEMS
root
 |-- adjective: string (nullable = true)
 |-- category: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- modifier: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)



In [0]:
# visualize the dataset
items_df.show(5)

+-------------------+-----------+-------------------+------+-------------+--------------------+-----+
|          adjective|   category|         created_at|    id|     modifier|                name|price|
+-------------------+-----------+-------------------+------+-------------+--------------------+-----+
|              fuzzy|contraption|2014-01-15 21:36:09|2512.0|carrying_case|fuzzy contraption...|150.0|
|               null| instrument|2013-05-14 05:20:50| 482.0|       refill|   instrument refill| 35.2|
|industrial-strength|     module|2014-02-04 19:28:32|2446.0|         null|industrial-streng...|300.0|
|            digital|       tool|2013-02-25 12:23:18|1312.0|carrying_case|digital tool carr...| 16.5|
|          miniature|     device|2013-08-05 17:20:45|3556.0|      cleaner|miniature device ...| 16.5|
+-------------------+-----------+-------------------+------+-------------+--------------------+-----+
only showing top 5 rows



In [0]:
# Printing the schema for EVENTS
print("SCHEMA FOR DATASET : {}".format("EVENTS"))
events_df.printSchema()

SCHEMA FOR DATASET : EVENTS
root
 |-- event_id: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event.payload: string (nullable = true)



In [0]:
# visualize the dataset
events_df.show(5)

+--------------------+-------------------+--------+--------------------+
|            event_id|         event_time| user_id|       event.payload|
+--------------------+-------------------+--------+--------------------+
|b9de71c5c3cc4cd7a...|2017-06-26 11:23:39|178481.0|{"event_name":"vi...|
|23267713c9ea44419...|2017-06-27 10:46:39|178481.0|{"event_name":"vi...|
|1b7822fa7b854e019...|2017-06-27 11:15:39|178481.0|{"event_name":"vi...|
|2a7a188a626841ac9...|2016-10-05 20:43:10|154133.0|{"event_name":"vi...|
|631d657264cc4616a...|2016-10-04 03:29:10|154133.0|{"event_name":"vi...|
+--------------------+-------------------+--------+--------------------+
only showing top 5 rows



### LAYER 2

#### APPLYING TRANSFORMATIONS TO DATA TYPES
- 1. Include all datasets from layer 1
- 2. Rename columns to follow common naming conventions
- 3. Change data types based on fields
- 4. Flatten struct type for events dataset
- 5. Partition FACT tables

In [0]:
# Importing required libraries for layer 2 transformations
from pyspark.sql.types import ( StringType, IntegerType, FloatType, DateType ,TimestampType, StructType, StructField )
from pyspark.sql.functions import col, from_json, to_timestamp, when, expr , date_format, concat_ws, year

##### Applying transformations & Creating Dimensions for USERS dataset

In [0]:
# set the timestamp format
set_format = "yyyy-MM-dd HH:mm:ss"

users_df2  = users_df.withColumn("UserId", col("id").cast(IntegerType())) \
                     .withColumn("UserParentId", col("parent_user_id").cast(IntegerType())) \
                     .withColumn("UserFirstName", col("first_name").cast(StringType())) \
                     .withColumn("UserLastName", col("last_name").cast(StringType())) \
                     .withColumn("UserFullName", concat_ws(" ", col("UserFirstName"), col("UserLastName"))) \
                     .withColumn("UserEmailAddress", col("email_address").cast(StringType())) \
                     .withColumn("UserCreatedAt", to_timestamp(col("created_at"),set_format)) \
                     .withColumn("UserMergedAt", to_timestamp(col("merged_at"), set_format)) \
                     .withColumn("UserDeletedAt", to_timestamp(col("deleted_at"),set_format)) \
                     .drop("id","first_name", "last_name", "email_address", "created_at","merged_at","deleted_at")
                    
usersDIM_df = users_df2.select("UserId","UserFullName","UserEmailAddress","UserCreatedAt","UserMergedAt","UserDeletedAt")
usersDIM_df.show(5)     

+------+------------+--------------------+-------------------+------------+-------------+
|UserId|UserFullName|    UserEmailAddress|      UserCreatedAt|UserMergedAt|UserDeletedAt|
+------+------------+--------------------+-------------------+------------+-------------+
| 51590|Arata Hopper|ArataHopper@earth...|2014-12-20 07:07:45|        null|         null|
|158891| Riya Gruber|Riya_Gruber1974@m...|2016-10-14 05:39:20|        null|         null|
|179949| Peter Sousa|Peter_Sousa@yahoo...|2017-01-21 10:20:09|        null|         null|
| 98000|  D Kowalski|D_Kowalski1962@gm...|2015-10-30 21:31:30|        null|         null|
| 93994|     S Novak| SNovak1966@mail.com|2015-10-24 16:27:27|        null|         null|
+------+------------+--------------------+-------------------+------------+-------------+
only showing top 5 rows



In [0]:
# verify the updated schema 
usersDIM_df.printSchema()

root
 |-- UserId: integer (nullable = true)
 |-- UserFullName: string (nullable = false)
 |-- UserEmailAddress: string (nullable = true)
 |-- UserCreatedAt: timestamp (nullable = true)
 |-- UserMergedAt: timestamp (nullable = true)
 |-- UserDeletedAt: timestamp (nullable = true)



##### Applying transformations to ORDERS dataset

In [0]:
# Schema before applying transformations
orders_df.printSchema()

root
 |-- InvoiceId: string (nullable = true)
 |-- LineItemId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ItemId: string (nullable = true)
 |-- ItemName: string (nullable = true)
 |-- ItemCategory: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- CreatedAt: string (nullable = true)
 |-- PaidAt: string (nullable = true)



In [0]:
# Applying data transformations

orders_df2 = orders_df.withColumn("OrderInvoiceId", col("InvoiceId").cast(IntegerType())) \
                      .withColumn("OrderLineItemId", col("LineItemId").cast(IntegerType())) \
                      .withColumn("UserId", col("UserId").cast(IntegerType())) \
                      .withColumn("ItemId", col("ItemId").cast(IntegerType())) \
                      .withColumn("OrderItemName", col("ItemName").cast(StringType())) \
                      .withColumn("OrderItemCategory", col("ItemCategory").cast(StringType())) \
                      .withColumn("OrderPrice", col("Price").cast(FloatType())) \
                      .withColumn("tempCreatedAt", to_timestamp("CreatedAt", "M/d/yyyy H:mm")) \
                      .withColumn("OrderCreatedAt", to_timestamp(col("tempCreatedAt"), set_format)) \
                      .withColumn("tempPaidAt", to_timestamp("PaidAt", "M/d/yyyy H:mm")) \
                      .withColumn("OrderPaidAt", to_timestamp(col("tempPaidAt"), set_format)) \
                      .withColumn("OrderYear", year(col("OrderCreatedAt"))) \
                      .drop("InvoiceId","LineItemId", "ItemName","ItemCategory","Price","CreatedAt","tempCreatedAt","PaidAt","tempPaidAt")
                    
ordersDIM_df = orders_df2.select("OrderInvoiceId","UserId","ItemId","OrderItemName","OrderItemCategory","OrderPrice","OrderYear","OrderCreatedAt")

ordersDIM_df.show(5)

+--------------+------+------+--------------------+-----------------+----------+---------+-------------------+
|OrderInvoiceId|UserId|ItemId|       OrderItemName|OrderItemCategory|OrderPrice|OrderYear|     OrderCreatedAt|
+--------------+------+------+--------------------+-----------------+----------+---------+-------------------+
|        192320|178481|  3526|   digital apparatus|        apparatus|     330.0|     2017|2017-06-28 21:14:00|
|        192320|178481|  1514|miniature apparat...|        apparatus|      99.0|     2017|2017-06-28 21:14:00|
|        192320|178481|  3712|miniature apparat...|        apparatus|      99.0|     2017|2017-06-28 21:14:00|
|         80902|154133|  3586|reflective instru...|       instrument|      57.2|     2016|2016-10-09 06:57:00|
|         80902|154133|  1061|extra-strength in...|       instrument|      17.6|     2016|2016-10-09 06:57:00|
+--------------+------+------+--------------------+-----------------+----------+---------+-------------------+
o

In [0]:
# Schema After applying transformations
ordersDIM_df.printSchema()

root
 |-- OrderInvoiceId: integer (nullable = true)
 |-- UserId: integer (nullable = true)
 |-- ItemId: integer (nullable = true)
 |-- OrderItemName: string (nullable = true)
 |-- OrderItemCategory: string (nullable = true)
 |-- OrderPrice: float (nullable = true)
 |-- OrderYear: integer (nullable = true)



##### Applying transformations to Items dataset

In [0]:
# Printing schema before transformations
items_df.printSchema()

root
 |-- adjective: string (nullable = true)
 |-- category: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- modifier: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)



In [0]:
# Applying data transformations

items_df2 = items_df.withColumn("ItemId", col("id").cast(IntegerType())) \
                      .withColumn("ItemName", col("name").cast(StringType())) \
                      .withColumn("ItemPrice", col("price").cast(FloatType())) \
                      .withColumn("ItemCategory", col("category").cast(StringType())) \
                      .withColumn("ItemModifier", col("modifier").cast(StringType())) \
                      .withColumn("ItemAdjective", col("adjective").cast(StringType())) \
                      .withColumn("ItemCreatedAt", col("created_at").cast(TimestampType()))\
                      .drop("id","name", "price", "category", "modifier","adjective","created_at")
                      
itemsDIM_df = items_df2.select("ItemId","ItemName","ItemPrice","ItemCategory","ItemModifier", "ItemCreatedAt")

itemsDIM_df.show(5)                                            

+------+--------------------+---------+------------+-------------+-------------------+
|ItemId|            ItemName|ItemPrice|ItemCategory| ItemModifier|      ItemCreatedAt|
+------+--------------------+---------+------------+-------------+-------------------+
|  2512|fuzzy contraption...|    150.0| contraption|carrying_case|2014-01-15 21:36:09|
|   482|   instrument refill|     35.2|  instrument|       refill|2013-05-14 05:20:50|
|  2446|industrial-streng...|    300.0|      module|         null|2014-02-04 19:28:32|
|  1312|digital tool carr...|     16.5|        tool|carrying_case|2013-02-25 12:23:18|
|  3556|miniature device ...|     16.5|      device|      cleaner|2013-08-05 17:20:45|
+------+--------------------+---------+------------+-------------+-------------------+
only showing top 5 rows



In [0]:
# Schema After applying transformations
itemsDIM_df.printSchema()

root
 |-- ItemId: integer (nullable = true)
 |-- ItemName: string (nullable = true)
 |-- ItemPrice: float (nullable = true)
 |-- ItemCategory: string (nullable = true)
 |-- ItemModifier: string (nullable = true)
 |-- ItemCreatedAt: timestamp (nullable = true)



##### Applying transformations to EVENTS dataset

In [0]:
# Printing data structure before transformations
events_df.show(5)

+--------------------+-------------------+--------+--------------------+
|            event_id|         event_time| user_id|       event.payload|
+--------------------+-------------------+--------+--------------------+
|b9de71c5c3cc4cd7a...|2017-06-26 11:23:39|178481.0|{"event_name":"vi...|
|23267713c9ea44419...|2017-06-27 10:46:39|178481.0|{"event_name":"vi...|
|1b7822fa7b854e019...|2017-06-27 11:15:39|178481.0|{"event_name":"vi...|
|2a7a188a626841ac9...|2016-10-05 20:43:10|154133.0|{"event_name":"vi...|
|631d657264cc4616a...|2016-10-04 03:29:10|154133.0|{"event_name":"vi...|
+--------------------+-------------------+--------+--------------------+
only showing top 5 rows



For EVENTS dataset, before applying transformations to datatypes, we need to do a bit preprocessing and flatten the nested struct collection

##### FLATTENING THE STRUCT COLLECTION

In [0]:
# Defining the schema based on the structure of JSON string
json_schema = StructType([
    StructField("event_name", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("parameter_name", StringType(), True),
    StructField("parameter_value", StringType(), True)
    
])

# Parsing the JSON string into a StructType and creating new columns
events_df = events_df.withColumn("json_string", from_json(col("`event.payload`"), json_schema))

# Extracting the individual fields into separate columns
events_df = events_df.withColumn("event_name", col("json_string.event_name")) \
        .withColumn("platform", col("json_string.platform"))\
        .withColumn("parameter_name", col("json_string.parameter_name")) \
        .withColumn("parameter_value", col("json_string.parameter_value"))
       # Add more withColumn calls for other fields in your JSON data

# dropping original columns
drop_columns = ["event.payload","json_string"]

# Dropping the original JSON string column
events_df = events_df.drop(*drop_columns)

events_df.show(5)


+--------------------+-------------------+--------+----------+--------+--------------+---------------+
|            event_id|         event_time| user_id|event_name|platform|parameter_name|parameter_value|
+--------------------+-------------------+--------+----------+--------+--------------+---------------+
|b9de71c5c3cc4cd7a...|2017-06-26 11:23:39|178481.0| view_item| android|       item_id|           3526|
|23267713c9ea44419...|2017-06-27 10:46:39|178481.0| view_item| android|       item_id|           1514|
|1b7822fa7b854e019...|2017-06-27 11:15:39|178481.0| view_item| android|       item_id|           3712|
|2a7a188a626841ac9...|2016-10-05 20:43:10|154133.0| view_item| android|       item_id|           3586|
|631d657264cc4616a...|2016-10-04 03:29:10|154133.0| view_item| android|       item_id|           1061|
+--------------------+-------------------+--------+----------+--------+--------------+---------------+
only showing top 5 rows



##### APPLYING THE TRANSFORMATIONS TO FLATTENED DATASET

In [0]:
# Printing the schema after transformations
events_df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- parameter_name: string (nullable = true)
 |-- parameter_value: string (nullable = true)



In [0]:
# Applying datatype transformations to events dataset
events_df2 =  events_df.withColumn("EventId", col("event_id").cast(StringType())) \
                        .withColumn("UserId", col("user_id").cast(IntegerType())) \
                        .withColumn("EventName", col("event_name").cast(StringType())) \
                        .withColumn("EventPlatform", col("platform").cast(StringType())) \
                        .withColumn("EventParameterName", col("parameter_name").cast(StringType())) \
                        .withColumn("EventParameterValue", col("parameter_value").cast(StringType())) \
                        .withColumn("EventTime", col("event_time").cast(TimestampType()))\
                        .drop("event_id","event_name", "user_id", "platform", "parameter_name","parameter_value","event_time")


eventsDIM_df = events_df2.select("EventId","UserId","EventName","EventPlatform", "EventParameterValue","EventTime")                    
                        
eventsDIM_df.show(5)

+--------------------+------+---------+-------------+-------------------+-------------------+
|             EventId|UserId|EventName|EventPlatform|EventParameterValue|          EventTime|
+--------------------+------+---------+-------------+-------------------+-------------------+
|b9de71c5c3cc4cd7a...|178481|view_item|      android|               3526|2017-06-26 11:23:39|
|23267713c9ea44419...|178481|view_item|      android|               1514|2017-06-27 10:46:39|
|1b7822fa7b854e019...|178481|view_item|      android|               3712|2017-06-27 11:15:39|
|2a7a188a626841ac9...|154133|view_item|      android|               3586|2016-10-05 20:43:10|
|631d657264cc4616a...|154133|view_item|      android|               1061|2016-10-04 03:29:10|
+--------------------+------+---------+-------------+-------------------+-------------------+
only showing top 5 rows



In [0]:
# Printing schema after applying transformations
eventsDIM_df.printSchema()

root
 |-- EventId: string (nullable = true)
 |-- UserId: integer (nullable = true)
 |-- EventName: string (nullable = true)
 |-- EventPlatform: string (nullable = true)
 |-- EventParameterValue: string (nullable = true)
 |-- EventTime: timestamp (nullable = true)



### LAYER 3

#### CREATING DATA MARTS


##### DATAMART 1 
- TOP_ITEMS

##### RESEARCH QUESTIONS

- **top_items** data mart with all sold items with additional attributes:
  - For every year (based on the created_at attribute):
  - Total number of an items sold in a particular year
  - Rank of an item based on the total number of items sold in a particular year
  - Total sales from an item in a particular year
  - Rank of an item based on the total sales in a particular year
  - Total number of items sold in all years
  - Rank of an item based on the total number of sales
  - Total sales of an item in all years
  - Rank of an item based on the total sales

In [0]:
# Importing required libraries for Layer 3 
from pyspark.sql.functions import year, month, count, sum 
from pyspark.sql import functions as F
from pyspark.sql import Window

In [0]:
# Creating a dimension table for orders and items 
items_sold_DM_df = (
    ordersDIM_df
    .join(itemsDIM_df, "ItemId", "inner")
    .select("OrderInvoiceId","UserId","ItemId","ItemName","ItemCategory","ItemPrice","OrderYear")
)
items_sold_DM_df.show(5)

+--------------+------+------+--------------------+------------+---------+---------+
|OrderInvoiceId|UserId|ItemId|            ItemName|ItemCategory|ItemPrice|OrderYear|
+--------------+------+------+--------------------+------------+---------+---------+
|        192320|178481|  3526|   digital apparatus|   apparatus|    330.0|     2017|
|        192320|178481|  1514|miniature apparat...|   apparatus|     99.0|     2017|
|        192320|178481|  3712|miniature apparat...|   apparatus|     99.0|     2017|
|         80902|154133|  3586|reflective instru...|  instrument|     57.2|     2016|
|         80902|154133|  1061|extra-strength in...|  instrument|     17.6|     2016|
+--------------+------+------+--------------------+------------+---------+---------+
only showing top 5 rows



- **1.1** For every year (based on the created_at attribute):
  - **1.1.1** Total number of an items sold in a particular year

In [0]:
# Total number of sales in a particular year

TotalItemsCountByYear_df = items_sold_DM_df.groupBy("OrderYear") \
                      .agg(count("ItemId").alias("ItemCounts")) \
                      .orderBy("OrderYear")

#Sorting the output based on the recent most year
TotalItemsCountByYear_df = TotalItemsCountByYear_df.orderBy(col("OrderYear").desc())            
TotalItemsCountByYear_df.show()            

+---------+----------+
|OrderYear|ItemCounts|
+---------+----------+
|     2018|      6202|
|     2017|     13577|
|     2016|     11647|
|     2015|      8989|
|     2014|      5637|
|     2013|      1350|
+---------+----------+



- **1.1.2** Rank of an item based on the total number of items sold in a particular year

In [0]:
# Rank of an item based on the total number of items sold in a particular year e.g 2018
# Filter the DataFrame to get data for the year e.g 2018
items_sold_DM_df_2018 = items_sold_DM_df.filter(items_sold_DM_df["OrderYear"] == 2018)

# Get the total number of items sold for each item for the year 2018
item_quantity_df = items_sold_DM_df_2018.groupBy("ItemId", "OrderYear").agg(
    F.count("ItemId").alias("Quantity")
)

# Create a partition to extract the data
window_spec = Window.partitionBy("OrderYear").orderBy(F.col("Quantity").desc())

# Rank the items based on year
RankTotalItemsCountByYear_df = item_quantity_df.withColumn("item_rank", F.rank().over(window_spec)) \
                                               .select("ItemId", "Quantity", "item_rank", "OrderYear")

RankTotalItemsCountByYear_df.show(10)

+------+--------+---------+---------+
|ItemId|Quantity|item_rank|OrderYear|
+------+--------+---------+---------+
|  2454|       9|        1|     2018|
|   194|       9|        1|     2018|
|  3882|       9|        1|     2018|
|   132|       9|        1|     2018|
|  1586|       8|        5|     2018|
|  3277|       8|        5|     2018|
|  1793|       8|        5|     2018|
|  1295|       8|        5|     2018|
|  2647|       8|        5|     2018|
|  2260|       8|        5|     2018|
+------+--------+---------+---------+
only showing top 10 rows



- **1.1.3** Total sales from an item in a particular year

In [0]:
# Get the DataFrame to get data for the year 2013
items_sold_DM_df_2013 = items_sold_DM_df.filter(items_sold_DM_df["OrderYear"] == 2013)

# Grouping the data and calculating aggregations
TotalSalesItems_df = items_sold_DM_df_2013.groupBy("ItemId").agg(
                                            F.count("ItemId").alias("Quantity"),
                                            F.first("ItemPrice").alias("ItemUnitPrice"),
                                            F.sum("ItemPrice").alias("TotalSales")
                                        ).orderBy(F.col("Quantity").desc())

TotalSalesItems_df.show(10)

+------+--------+-------------+------------------+
|ItemId|Quantity|ItemUnitPrice|        TotalSales|
+------+--------+-------------+------------------+
|  3192|       6|        165.0|             990.0|
|  3264|       5|         18.0|              90.0|
|  1149|       5|         35.0|             175.0|
|  2261|       4|          9.0|              36.0|
|   731|       4|        330.0|            1320.0|
|  1975|       4|         20.0|              80.0|
|  1944|       4|         11.7| 46.79999923706055|
|   774|       4|         13.2| 52.79999923706055|
|  3978|       4|         30.0|             120.0|
|   760|       4|          2.7|10.800000190734863|
+------+--------+-------------+------------------+
only showing top 10 rows



- **1.1.4**  Rank of an item based on the total sales in a particular year

In [0]:
# Filter the DataFrame to get data for the year e.g 2013
items_sold_DM_df_2013 = items_sold_DM_df.filter(ordersDIM_df["OrderYear"] == 2013)

# Group the data and calculate the total sales
item_total_sales_df = items_sold_DM_df_2013.groupBy("ItemId", "ItemPrice", "OrderYear") \
                                           .agg(F.sum("ItemPrice").alias("TotalSales"))

# Create a partition of OrderYear and order by TotalSales
window_spec = Window.partitionBy("OrderYear").orderBy(F.col("TotalSales").desc())

# Creating new column and ranking the total sales
RankTotalSalesItems_df = item_total_sales_df.withColumn("item_rank", F.rank().over(window_spec)) \
                                            .select("ItemId", "TotalSales", "item_rank", "OrderYear")

RankTotalSalesItems_df.show(10)

+------+----------+---------+---------+
|ItemId|TotalSales|item_rank|OrderYear|
+------+----------+---------+---------+
|  2592|    2500.0|        1|     2013|
|   844|    1800.0|        2|     2013|
|   746|    1500.0|        3|     2013|
|   649|    1500.0|        3|     2013|
|   457|    1500.0|        3|     2013|
|  2676|    1500.0|        3|     2013|
|   183|    1375.0|        7|     2013|
|  2516|    1350.0|        8|     2013|
|   181|    1335.0|        9|     2013|
|   731|    1320.0|       10|     2013|
+------+----------+---------+---------+
only showing top 10 rows



- **1.2** Total number of items sold in all years

In [0]:
# Total number of items sold in all years
TotalItemsAllYears_df = ordersDIM_df.agg(F.count("ItemId").alias("Quantity"))
TotalItemsAllYears_df.show()

+--------+
|Quantity|
+--------+
|   47402|
+--------+



- **1.3** Rank of an item based on the total number of sales

In [0]:
# Rank of an item based on the total number of sales in all years
group_TotalItemsAllYears_df = ordersDIM_df.groupBy("ItemId").agg(F.count("ItemId").alias("Quantity"))
window_spec = Window.orderBy(F.desc("Quantity"))
RankTotalItemsAllYears_df = group_TotalItemsAllYears_df.withColumn("item_rank", F.rank().over(window_spec))

RankTotalItemsAllYears_df.show(10)

+------+--------+---------+
|ItemId|Quantity|item_rank|
+------+--------+---------+
|  3486|      40|        1|
|  1679|      39|        2|
|  1119|      38|        3|
|  1812|      37|        4|
|  2647|      36|        5|
|   730|      36|        5|
|  3455|      36|        5|
|  3733|      36|        5|
|  3033|      36|        5|
|  3639|      35|       10|
+------+--------+---------+
only showing top 10 rows



- **1.4** Total sales of an item in all years

In [0]:
# Total sales of an item in all years
totalSalesAllYears_df = ordersDIM_df.groupBy("ItemId").agg(F.sum("OrderPrice").alias("TotalSales"))                                                      
totalSalesAllYears_df.show(10)

+------+------------------+
|ItemId|        TotalSales|
+------+------------------+
|   471|             630.0|
|  1238|             165.0|
|  3794|            2640.0|
|  2366|             840.0|
|  2659|            2887.5|
|  3918|2418.1299362182617|
|  2142|               0.0|
|  1088|            5346.0|
|   463|             522.5|
|  1342|           12000.0|
+------+------------------+
only showing top 10 rows



- **1.5** Rank of an item based on the total sales

In [0]:
# Rank of an item based on the total sales in all years

grouped_totalSalesAllYears_df = ordersDIM_df.groupBy("ItemId").agg(F.sum("OrderPrice").alias("TotalSales"))
window_spec = Window.orderBy(F.desc("TotalSales"))
RanktotalSalesAllYears_df = grouped_totalSalesAllYears_df.withColumn("item_rank", F.rank().over(window_spec))

RanktotalSalesAllYears_df.show(10)

+------+----------+---------+
|ItemId|TotalSales|item_rank|
+------+----------+---------+
|  2354|   69750.0|        1|
|   669|   46875.0|        2|
|  3878|   29700.0|        3|
|  2592|   28750.0|        4|
|   427|   24000.0|        5|
|  3719|   24000.0|        5|
|  2492|   23625.0|        7|
|  2225|   23562.5|        8|
|  2397|   23250.0|        9|
|  2676|   22500.0|       10|
+------+----------+---------+
only showing top 10 rows



##### DATAMART 2 
- TOP_BUYERS

#### RESEARCH QUESTIONS
- **top_buyers** data mart with top 20 customers who contributed on the total sales the most with additional attributes:
  - Total sales contributed
  - Rank based on the total sales
  - Last order creation date
  - The overall most viewed item of a customer

In [0]:
 # Creating a Dimension table for top 20 customers with required information for further analysis
top_buyers_DIM_df = (
                    ordersDIM_df
                    .join(usersDIM_df, "UserId", "inner")
                    .join(eventsDIM_df, "UserId", "inner")
                    .select("UserId","UserFullName","UserEmailAddress","ItemId","OrderItemName","OrderPrice","OrderCreatedAt","EventName","EventParameterValue")
                )
top_buyers_DIM_df.show(5)

+------+------------+--------------------+------+--------------------+----------+-------------------+---------+-------------------+
|UserId|UserFullName|    UserEmailAddress|ItemId|       OrderItemName|OrderPrice|     OrderCreatedAt|EventName|EventParameterValue|
+------+------------+--------------------+------+--------------------+----------+-------------------+---------+-------------------+
|231502|     R Rossi|R_Rossi@earthlink...|  1921|  instrument charger|       8.8|2017-12-08 09:00:00|view_item|               1921|
|231502|     R Rossi|R_Rossi@earthlink...|  2416|    fuzzy instrument|      44.0|2017-12-08 09:00:00|view_item|               1921|
|231502|     R Rossi|R_Rossi@earthlink...|  3901|rechargable instr...|     45.76|2017-12-08 09:00:00|view_item|               1921|
|231502|     R Rossi|R_Rossi@earthlink...|  2298|reflective instru...|     11.44|2017-12-08 09:00:00|view_item|               1921|
|231502|     R Rossi|R_Rossi@earthlink...|  1921|  instrument charger|      

- **2.1** Total sales by each customer for Top 20 Customers
- **2.2** Rank based on the total sales

In [0]:
# calculating the total sales and ranking each customer
total_sales_df  =   ordersDIM_df.groupby("UserId").agg(F.sum("OrderPrice").alias("TotalSales"))
window_spec = Window.orderBy(F.desc("TotalSales"))
rank_Buyers_df = total_sales_df.withColumn("User_rank", F.rank().over(window_spec))
rank_top20_buyers_df = rank_Buyers_df.filter(F.col("User_rank") <= 20)                      
rank_top20_buyers_df.show()

+------+------------------+---------+
|UserId|        TotalSales|User_rank|
+------+------------------+---------+
|166460|            4027.5|        1|
|201255|            3675.0|        2|
|226790| 3637.664999961853|        3|
| 81386|            3534.0|        4|
| 22576|            3320.0|        5|
|117506|            3306.0|        6|
|268611|            3300.0|        7|
|105080|           3143.25|        8|
| 17805|3099.9500007629395|        9|
|180997|            3082.5|       10|
| 79463|            3015.0|       11|
|214827|            3013.5|       12|
|156167|            3000.0|       13|
| 52567|            2992.5|       14|
|197634|            2860.0|       15|
| 76381|            2850.0|       16|
| 46550|            2850.0|       16|
|127166|            2838.0|       18|
|227472|            2835.0|       19|
| 41147|            2800.0|       20|
+------+------------------+---------+



- **2.3** Last order creation date

In [0]:
# extracing last order creation date for each customer 
lastOrderCreatedDate_df = ordersDIM_df.groupBy("UserId").agg(F.max("OrderCreatedAt").alias("LastOrderCreationDate"))
lastOrderCreatedDate_df.show()

+------+---------------------+
|UserId|LastOrderCreationDate|
+------+---------------------+
|231502|  2017-12-08 09:00:00|
|237568|  2017-11-18 09:31:00|
| 34239|  2014-10-17 16:11:00|
|176073|  2017-07-08 07:15:00|
| 18979|  2014-07-03 02:49:00|
|241533|  2018-02-22 14:26:00|
| 29601|  2014-07-10 12:05:00|
|  4900|  2013-11-20 13:31:00|
| 89878|  2015-12-21 13:32:00|
|207103|  2017-07-06 13:00:00|
| 68098|  2015-09-03 13:27:00|
|148267|  2016-11-24 00:14:00|
| 78478|  2015-09-03 19:28:00|
| 11858|  2013-12-07 18:42:00|
|  4101|  2014-01-15 15:51:00|
|167621|  2017-04-26 07:06:00|
| 51415|  2015-03-31 18:36:00|
|195263|  2017-07-24 08:41:00|
|144475|  2016-12-08 20:04:00|
| 82672|  2015-12-29 23:49:00|
+------+---------------------+
only showing top 20 rows



- **2.4** The overall most viewed item of a customer

In [0]:
# Filtering the data on events dataset based on the event type = view_item
# Cleaning the EventParameterValue column as it contains string like google_search, promo_click etc which is not of our interest
# So Im extracting only Integer value which is a ItemId and getting their view counts
#   
item_views_df = eventsDIM_df.filter(eventsDIM_df["EventName"] == "view_item") \
                            .select("UserId","EventName","EventParameterValue")
item_views_df = item_views_df.filter(item_views_df["EventParameterValue"].rlike("^[0-9]+$")) \
                             .groupBy("EventParameterValue","UserId") \
                             .agg(F.count("EventParameterValue").alias("ViewCount"))  
                                                                                
most_viewed_items_df = item_views_df.orderBy(F.col("ViewCount").desc()) \
                                    .select("UserId","EventParameterValue","ViewCount")

# # Show the most viewed items
most_viewed_items_df.show()

+------+-------------------+---------+
|UserId|EventParameterValue|ViewCount|
+------+-------------------+---------+
| 45418|               1671|        3|
|105745|               3932|        3|
| 63987|               3426|        3|
| 15170|               2810|        3|
|212503|               3942|        3|
|236421|                228|        2|
|120305|               3042|        2|
|105527|               3806|        2|
|103381|               2751|        2|
| 24540|               2088|        2|
| 76547|                 55|        2|
|166012|                760|        2|
|106525|               3036|        2|
| 52870|               3193|        2|
|229913|               3035|        2|
|194168|               1254|        2|
|202451|               3328|        2|
| 62998|               1968|        2|
|247359|                153|        2|
|201288|                719|        2|
+------+-------------------+---------+
only showing top 20 rows



##### CREATION OF DATAMART: TOP BUYERS

- **DATAMART**: TOP_BUYERS
- **Attributes**
    - **UserID**:  Id of the customer in users dataset joined with orders and events
    - **UserFullName**: Fullname of customer combining the columns firstname and last name 
    - **TotalSales**: Total sales made by user based on the count of items purchased multiplied by item price 
    - **LastOrderCreationDate**: Most recent order purchased by the user 
    - **EventParameterValue**: This column is preprocessed to extract integer value, it can be joined with items dataframe to get full information of the item

In [0]:
# Merging all the data frames created from research questions to capture information like
# Top customers and their total sales, their ranking, last order creation date, viewed item and view count

top_buyers_DM_df = rank_top20_buyers_df.join(usersDIM_df, on="UserId").select(
    "UserId",
    "UserFullName",
    "TotalSales",
    "User_rank"
)

# merging last order create date and most viewd items
top_buyers_DM_df = top_buyers_DM_df.join(lastOrderCreatedDate_df, on="UserId")
top_buyers_DM_df = top_buyers_DM_df.join(most_viewed_items_df, on="UserId")

top_buyers_DM_df.show(20)

+------+-------------+------------------+---------+---------------------+-------------------+---------+
|UserId| UserFullName|        TotalSales|User_rank|LastOrderCreationDate|EventParameterValue|ViewCount|
+------+-------------+------------------+---------+---------------------+-------------------+---------+
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|               2354|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|               3124|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|                318|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|               1852|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|               3243|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 04:25:00|               3757|        1|
| 52567|      K Saito|            2992.5|       14|  2015-06-17 

**Note** : I have done my analysis keeping research questions in mind, but it would be interesting to dive deeper and perform more data exploration, apply transformations and extract many more meaningful insights like what kind of products are customers mainly interested, customer shopping pattern, ordering pattern based on device type etc.  