In [0]:
# Check the contents in tables folder
dbutils.fs.ls('dbfs:/mnt/mount_s3/')

# Layer 1 (all attributes are string type) 

In [0]:

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import split
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import from_json, regexp_replace
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp

# event.csv would be read as text file, as it is malformed. The better option is to preprocess this malformed csv, before reading it in the spark. But the assignment says to read it in the # # first layer as it is. 
df_event = spark.read.text("dbfs:/mnt/mount_s3/event.csv")
df_event.columns
display(df_event)

In [0]:
# File location for files except event.csv
file_location_item = "dbfs:/mnt/mount_s3/item.csv"
file_location_order = "dbfs:/mnt/mount_s3/order.csv"
file_location_user = "dbfs:/mnt/mount_s3/user.csv"
file_type = "csv"
 
# CSV options
first_row_is_header = "true"
delimiter = ","
 
merged_schema_item = StructType([
    StructField("adjective", StringType(), True),
    StructField("category", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("id", StringType(), True),
    StructField("modifier", StringType(), True),
    StructField("name", StringType(), True),
    StructField("price", StringType(), True)
])

merged_schema_order = StructType([
    StructField("InvoiceId", StringType(), True),
    StructField("LineItemId", StringType(), True),
    StructField("UserId", StringType(), True),
    StructField("ItemId", StringType(), True),
    StructField("ItemName", StringType(), True),
    StructField("ItemCategory", StringType(), True),
    StructField("Price", StringType(), True),
    StructField("CreatedAt", StringType(), True),
    StructField("PaidAt", StringType(), True)
])

merged_schema_user = StructType([
    StructField("created_at", StringType(), True),
    StructField("deleted_at", StringType(), True),
    StructField("email_address", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("id", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("merged_at", StringType(), True),
    StructField("parent_user_id", StringType(), True)
])
 
df_item = spark.read.format(file_type) \
  .option("inferSchema", 'false') \
  .schema(merged_schema_item) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_item)

df_order = spark.read.format(file_type) \
  .option("inferSchema", 'false') \
  .schema(merged_schema_order) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_order)

df_user = spark.read.format(file_type) \
  .option("inferSchema", 'false') \
  .schema(merged_schema_user) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_user)
 
# Take a look at the data
display(df_item)
display(df_order)
display(df_user)
display(df_event)


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS db")

In [0]:
%sql
drop table if exists db.event_table;
drop table if exists db.order_table;
drop table if exists db.item_table;
drop table if exists db.user_table;
drop table if exists db.event_table;

In [0]:
df_event.write.option("mergeSchema", "true").mode('overwrite').saveAsTable("db.event_table")
df_item.write.mode('overwrite').saveAsTable("db.item_table")
df_order.write.mode('overwrite').saveAsTable("db.order_table")
df_user.write.mode('overwrite').saveAsTable("db.user_table")

# Layer 2 (data transformed, flattened, parsed with proper datatype)

In [0]:
# Read data from the tables
df_event = spark.sql("SELECT * FROM db.event_table")
df_item = spark.sql("SELECT * FROM db.item_table")
df_order = spark.sql("SELECT * FROM db.order_table")
df_user = spark.sql("SELECT * FROM db.user_table")

# Show the data from each DataFrame
df_event.show()
df_item.show()
df_order.show()
df_user.show()

In [0]:
#All dataset from the previous layer
df_item_unprocessed = df_item
df_event_unprocessed = df_event
df_order_unprocessed = df_order
df_user_unprocessed = df_user

In [0]:
# process event data 
df_event_unprocessed = df_event_unprocessed.withColumn("id", monotonically_increasing_id())
df_event_unprocessed = df_event_unprocessed.filter(df_event_unprocessed.id > 0).drop("id")

# Split whoel text string by ","
df_event_unprocessed = df_event_unprocessed.withColumn("value", split(df_event_unprocessed["value"], ","))

df_event_unprocessed = df_event_unprocessed.withColumn("event_id", df_event_unprocessed["value"].getItem(0)) \
                     .withColumn("event_time", df_event_unprocessed["value"].getItem(1)) \
                     .withColumn("user_id", df_event_unprocessed["value"].getItem(2)) \
                     .withColumn("event_name", df_event_unprocessed["value"].getItem(3)) \
                     .withColumn("platform", df_event_unprocessed["value"].getItem(4)) \
                     .withColumn("parameter_name", df_event_unprocessed["value"].getItem(5)) \
                     .withColumn("parameter_value", df_event_unprocessed["value"].getItem(6))

# Split each column by ":"
df_event_unprocessed = df_event_unprocessed.withColumn("event_name", split(df_event_unprocessed["event_name"], ":")) \
                     .withColumn("platform", split(df_event_unprocessed["platform"], ":")) \
                     .withColumn("parameter_name", split(df_event_unprocessed["parameter_name"], ":")) \
                     .withColumn("parameter_value", split(df_event_unprocessed["parameter_value"], ":"))

# get second item from the splitted string
df_event_unprocessed = df_event_unprocessed.withColumn("event_name", df_event_unprocessed["event_name"].getItem(1)) \
                         .withColumn("platform", df_event_unprocessed["platform"].getItem(1)) \
                         .withColumn("parameter_name", df_event_unprocessed["parameter_name"].getItem(1)) \
                         .withColumn("parameter_value", df_event_unprocessed["parameter_value"].getItem(1))
                             

# regex remove double quotes 
df_event_unprocessed = df_event_unprocessed.withColumn('event_name', F.regexp_replace(df_event_unprocessed['event_name'], '""', '')) \
                     .withColumn('platform', F.regexp_replace(df_event_unprocessed['platform'], '""', '')) \
                     .withColumn('parameter_name', F.regexp_replace(df_event_unprocessed['parameter_name'], '""', '')) \
                     .withColumn('parameter_value', F.regexp_replace(df_event_unprocessed['parameter_value'], '""', '')) \

# regex remove "}"
df_event_unprocessed = df_event_unprocessed.withColumn('parameter_value', F.regexp_replace(df_event_unprocessed['parameter_value'], '}"', '')).drop('value')

# df_event_unprocessed.columns
display(df_event_unprocessed)

In [0]:
#All attributes have the common naming convntions
df_order_unprocessed = df_order_unprocessed \
    .withColumnRenamed("InvoiceId", "invoice_id") \
    .withColumnRenamed("LineItemId", "line_item_id") \
    .withColumnRenamed("UserId", "user_id") \
    .withColumnRenamed("ItemId", "item_id") \
    .withColumnRenamed("ItemName", "item_name") \
    .withColumnRenamed("ItemCategory", "item_category") \
    .withColumnRenamed("Price", "price") \
    .withColumnRenamed("CreatedAt", "created_at") \
    .withColumnRenamed("PaidAt", "paid_at")

In [0]:
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql import DataFrame

# This is demonstration of functional usage. I use enumerate and zip methods to loop concurrently over multiple lists. Rest I kept in simple code bloks (without functions). 
def process_dataframes(df_item_unprocessed: DataFrame, df_event_unprocessed: DataFrame,
                       df_order_unprocessed: DataFrame, df_user_unprocessed: DataFrame) -> tuple:
    '''
    Take dfs and return tuple of processed dfs
    '''
    schema_event = {
        "event_id": "string",
        "event_time": "timestamp",
        "user_id": "integer",
        "event_name": "string",
        "platform": "string",
        "parameter_name": "string",
        "parameter_value": "integer"
    }

    schema_item = {
        "adjective": "string",
        "category": "string",
        "created_at": "timestamp",
        "id": "integer",
        "modifier": "string",
        "name": "string",
        "price": "integer"
    }

    schema_order = {
        "invoice_id": "integer",
        "line_item_id": "integer",
        "user_id": "integer",
        "item_id": "integer",
        "item_name": "string",
        "item_category": "string",
        "price": "integer",
        "created_at": "string",
        "paid_at": "string"
    }

    schema_user = {
        "created_at": "timestamp",
        "deleted_at": "timestamp",
        "email_address": "string",
        "first_name": "string",
        "id": "integer",
        "last_name": "string",
        "merged_at": "string",
        "parent_user_id": "string"
    }

    dataframes = [df_event_unprocessed, df_item_unprocessed, df_order_unprocessed, df_user_unprocessed]

    schemas = [schema_event, schema_item, schema_order, schema_user]

    # loop over schemas and dfs concurrently
    for index, (df, schema) in enumerate(zip(dataframes, schemas)):
        for col_name, col_type in schema.items():
            df = df.withColumn(col_name, col(col_name).cast(col_type))
        dataframes[index] = df  # Update the dataframe in the list

    # Access the list
    df_event_processed, df_item_processed, df_order_processed, df_user_processed = dataframes

    # Additional processing for df_order_processed
    df_order_processed = df_order_processed.withColumn("created_at", to_timestamp(df_order_processed["created_at"], "M/d/yyyy H:mm")) \
                                           .withColumn("paid_at", to_timestamp(df_order_processed["paid_at"], "M/d/yyyy H:mm"))

    return df_event_processed, df_item_processed, df_order_processed, df_user_processed

# Call the function to process the dataframes
df_event_processed, df_item_processed, df_order_processed, df_user_processed = process_dataframes(
    df_item_unprocessed, df_event_unprocessed, df_order_unprocessed, df_user_unprocessed
)


In [0]:
display(df_event_processed)
display(df_item_processed)
display(df_order_processed)
display(df_user_processed)

In [0]:
# write the data to db, depending in the usage - bronze, silver gold concept of data lake
df_event_processed.write.format("delta").mode('overwrite').saveAsTable("db.event_processed_table")
df_item_processed.write.format("delta").mode('overwrite').saveAsTable("db.item_processed_table")
df_order_processed.write.format("delta").mode('overwrite').saveAsTable("db.order_processed_table")
df_user_processed.write.format("delta").mode('overwrite').saveAsTable("db.user_processed_table")


# Layer 3
##### top_items

##### I will use CTEs here. CTE is used to couple the queries. CTEs are not necessary but if we have large number of queries, we can couple them. This makes the whole flow of information more readble. In the end of layer 3 I will also deconstruct this CTE so as to understand what subquery returns so as to understand more. 

In [0]:
%sql

-- create a temperory view, this view is the dimension table, just to demonstrate the star schema in a data lake - dimension/fact table
CREATE OR REPLACE TEMPORARY VIEW v_test_dim_item AS
SELECT id AS item_id,
       name AS item_name,
       category AS item_category
FROM db.item_processed_table;

--  this is the CTE, complicated one. but the explanation is given at the end, where I deconstruct it in more simple sql statements. Please follow that.
CREATE OR REPLACE TABLE db.fact_top_items_table AS
SELECT y.year,
       y.item_id,
       a.item_name,
       y.yearly_total_items_sold,
       y.yearly_total_sales,
       y.yearly_item_rank,
       y.yearly_item_sales_rank,
       a.total_items_sold_all_years,
       a.total_sales_all_years_per_item,
       a.rank_based_on_total_no_of_sales_all_years,
       a.rank_based_on_total_sales_all_years
FROM (
    SELECT YEAR(o.created_at) AS year,
           o.item_id,
           COUNT(*) AS yearly_total_items_sold,
           SUM(o.price) AS yearly_total_sales, 
           ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY COUNT(*) DESC) AS yearly_item_rank,
           ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY SUM(o.price) DESC) AS yearly_item_sales_rank
    FROM db.order_processed_table o
    JOIN v_test_dim_item i ON o.item_id = i.item_id
    GROUP BY YEAR(o.created_at), o.item_id
) AS y
LEFT JOIN (
    SELECT
        o.item_name,
        o.item_id as item_id,
        COUNT(*) AS total_items_sold_all_years,
        SUM(o.price) AS total_sales_all_years_per_item,
        RANK() OVER (ORDER BY COUNT(*) DESC) AS rank_based_on_total_no_of_sales_all_years,
        RANK() OVER (ORDER BY SUM(o.price) DESC) AS rank_based_on_total_sales_all_years
    FROM db.order_processed_table o
    JOIN v_test_dim_item i ON o.item_id = i.item_id
    GROUP BY o.item_name, o.item_id
) AS a ON y.item_id = a.item_id
ORDER BY y.year DESC, y.item_id;

SELECT * FROM db.fact_top_items_table;

##### top_buyers

###### This time I will use simple SQL statements instead of CTEs to make things simpler

In [0]:
%sql

-- Create tem view. select from user_processed_table, id, first_name, last_name
-- SUM(o.price) AS total_sales_contributed uses aggregate function SUM. SUM(o.price) sums the price column grouped by id. So for all years, for each id it will sum. e.g. year 2013, 2014...
--  RANK() OVER (ORDER BY SUM(o.price) DESC) AS sales_rank. Rank() is a window function. Over (ORDER BY SUM(o.price) DESC) gives the rank ordered by highest sum of ittm to lowest sum of item
-- MAX(o.created_at) AS last_order_creation_date. It calulates the recent date group by id, so for each item_id
-- we are joining user table with order table on user_id and id. This is because both are same (JOINS are done common columns)
--  Finally order by max sale to min sales (descending)
CREATE OR REPLACE TEMPORARY VIEW v_top_customers AS
SELECT
    u.id AS user_id,
    u.first_name,
    u.last_name,
    SUM(o.price) AS total_sales_contributed,
    RANK() OVER (ORDER BY SUM(o.price) DESC) AS sales_rank,
    MAX(o.created_at) AS last_order_creation_date
FROM
    db.user_processed_table u
JOIN
    db.order_processed_table o
ON
    u.id = o.user_id
GROUP BY
    u.id, u.first_name, u.last_name
-- Sort the customers by total sales in descending order
ORDER BY
    total_sales_contributed DESC
-- Select the top 20 customers
LIMIT 20;

select * from v_top_customers;


-- here we are just counting the number of times view_item has occured for each user_id. 
CREATE OR REPLACE TEMPORARY VIEW v_top_customers_2 AS
SELECT e.user_id, COUNT(*) AS view_count
FROM db.event_processed_table e
WHERE e.event_name = 'view_item'
GROUP BY e.user_id
ORDER BY view_count DESC
LIMIT 20;


-- this UNION is not necessary. But the interviewer demands that in the pdf. It is not necessary as there is no commomn user_id where we can make a table join.
-- this means the top 20 customers who bought are not necessirily the same top customers who viewed the product.
CREATE OR REPLACE TABLE db.top_customers_table AS
SELECT user_id, first_name, last_name, total_sales_contributed, sales_rank, last_order_creation_date, NULL AS view_count
FROM v_top_customers

UNION ALL

-- Table 2 (2 columns with NULL placeholders for missing columns)
SELECT user_id, NULL AS first_name, NULL AS last_name, NULL AS total_sales_contributed, NULL AS sales_rank, NULL AS last_order_creation_date, view_count
FROM v_top_customers_2;

-- select * from db.top_customers_table;
SELECT* FROM v_top_customers_2;

In [0]:
%sql
SELECT* FROM v_top_customers;

#### End of Project
##### Now the sample queries, to deconstruct the CTE used in top_item blocks, both output the same result, but in a differnt fashion, one uses CTE, other uses simple queries

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW v_dim_item AS
SELECT id AS item_id,
       name AS item_name,
       category AS item_category
FROM db.item_processed_table;

CREATE OR REPLACE TEMPORARY VIEW v_all_year AS
SELECT
    o.item_name,
    o.item_id as item_id,
    COUNT(*) AS total_items_sold_all_years,
    SUM(o.price) AS total_sales_all_years_per_item,
    RANK() OVER (ORDER BY COUNT(*) DESC) AS rank_based_on_total_no_of_sales_all_years,
    RANK() OVER (ORDER BY SUM(o.price) DESC) AS rank_based_on_total_sales_all_years
FROM
    db.order_processed_table o
JOIN 
    v_dim_item i ON o.item_id = i.item_id
GROUP BY
    o.item_name, o.item_id;

SELECT 
       item_name,
       item_id,
       total_sales_all_years_per_item,
       total_items_sold_all_years,
       rank_based_on_total_no_of_sales_all_years,
       rank_based_on_total_sales_all_years
FROM v_all_year
ORDER By item_id desc;

In [0]:
%sql 
CREATE OR REPLACE TEMPORARY VIEW v_yealy AS
SELECT YEAR(o.created_at) AS year,
       o.item_id,
       COUNT(*) AS yearly_total_items_sold,
       SUM(o.price) AS yearly_total_sales, 
       ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY COUNT(*) desc) AS yearly_item_rank,
       ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY SUM(o.price) desc) AS yearly_item_sales_rank
FROM db.order_processed_table o
JOIN v_dim_item i ON o.item_id = i.item_id
GROUP BY YEAR(o.created_at), o.item_id;

-- Retrieve the total number of items sold for every year
SELECT year,
       item_id,
       yearly_total_items_sold,
       yearly_total_sales,
       yearly_item_rank,
       yearly_item_sales_rank
FROM v_yealy
ORDER BY year desc, item_id;

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW v_top_mart AS
SELECT y.year, y.item_id, a.item_name, y.yearly_total_items_sold, y.yearly_total_sales, y.yearly_item_rank, y.yearly_item_sales_rank,
a.total_items_sold_all_years, a.total_sales_all_years_per_item, a.rank_based_on_total_no_of_sales_all_years, a.rank_based_on_total_sales_all_years
FROM v_yealy as y
LEFT JOIN v_all_year a ON y.item_id = a.item_id;

select * from v_top_mart;


>### Explanation

#### Yearly data
Some conclusions:
1. This is top_mart
2. yearly_total_items_sold: This is `count(*)` window function grouped by - `GROUP BY YEAR(o.created_at)`, o.item_id. so each year for each item_id. 
that is, for year 2013, 2016 etc and for each item_id in this case 3397 it is showing this 1 in 2013, 1 in 2016, 5 in 2018 etc. Try this code:

            %sql
            select * from v_top_mart
            where item_id=3997;

3. yearly_total_sales:  this `SUM(o.price)`  window function again - `GROUP BY YEAR(o.created_at)`, o.item_id. so for each year and for each item_id.
so, for year 2013, 3, price is 3 which in the subsequent code. For 2014: two times in 2014 so 3+3 = 6 and so on.

            %sql
            select * from db.order_processed_table o
            inner join db.item_processed_table i on o.item_id = i.id
            where i.id=3997

4. yearly_item_rank:    `ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY COUNT(*) desc) AS yearly_item_rank`. This means it is using row_number() window function.
`row_number` takes `over()` clause where it partition the dataset according to that clause, so `OVER (PARTITION BY YEAR(o.created_at))` partitions the whole dataset according to year. In addition,
`ORDER BY COUNT(*) desc in over()` clause orders the data in descending order of total items `(count(*))`. 
So `count(*)` will calculate for each year total number of occurance of each item_id. Then it puts it in the desc order so that max is put on the top, and row_number put first rank for that max number of occurnce.  

So for example, in this query:

      %sql
      SELECT YEAR(o.created_at) AS year,
            o.item_id,
            COUNT(*) AS yearly_total_items_sold,
            ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY COUNT(*) desc) AS yearly_item_rank
            FROM db.order_processed_table o
            JOIN v_dim_item i ON o.item_id = i.item_id
            GROUP BY YEAR(o.created_at), o.item_id;

 the number of occurance of Item_id 3192 is added up for 2013 , and its wrutten in teh results: yearly_total_items_sold =6. this is computed by:  `COUNT(*) AS yearly_total_items_sold,GROUP BY YEAR(o.created_at), o.item_id;`  specifically `COUNT(*)` clauses in the above query. Now  `ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY COUNT(*) desc)`. Now, `ROW_NUMBER()` assigns for that item 3192 a rank based on the occurance of 6 (total occurance of item_id=6) which is 1. Desc oreder someting like this: 6, 5, 4, 3. So max gets the rank1. 

5. yearly_item_sales_rank: works with two commands/queries: `ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY SUM(o.price) desc) AS yearly_item_sales_rank` && `GROUP BY YEAR(o.created_at), o.item_id; SUM(o.price)` calculates for each year or specific year, the total number sales for each item. `ROW_NUMBERR()` assigns teh rank based in the that total sale of each item in that year in desc  order, so max price goes on the top and that price is assiged number 1 

            %sql
            SELECT YEAR(o.created_at) AS year,
                  o.item_id,
                  COUNT(*) AS yearly_total_items_sold,
                  SUM(o.price) AS yearly_total_sales, 
                  ROW_NUMBER() OVER (PARTITION BY YEAR(o.created_at) ORDER BY SUM(o.price) desc) AS yearly_item_sales_rank
                  FROM db.order_processed_table o
                  JOIN v_dim_item i ON o.item_id = i.item_id
                  GROUP BY YEAR(o.created_at), o.item_id;

                  --output: 
                  --     year item_id yearly_total_item_sold yearlyu_total_sales yearly_item_sales_rank
                  --     2013 2592      2                       2500               1
                  --     2013 844       2                       1800               2

make sense:)


#### All year data

1. total_items_sold_all_years: this is total in all years, so each of the item_id shoudl be summed up. Simple. It is not grouped by created_date. It is just grouped by item_id:
`COUNT(*) AS total_items_sold_all_years, GROUP BY o.item_name, o.item_id;` You can gorup by only by item_id. And it will work.

2.  total_sales_all_years_per_item: again `SUM(o.price) AS total_sales_all_years_per_item, grouped by item_id`.  easy peasy.
3. rank_based_on_total_no_of_sales_all_years: `RANK() OVER (ORDER BY COUNT(*) DESC) AS rank_based_on_total_no_of_sales_all_years`, here also we are grouping by item_id. and `COUNT(*)` will count the occurance of each item_id in total, not in each year, but in all the years that item woudl be found, so for example, 2013, 2014, 2015 and until the end. the count will assigned in tyhe desc order so max number of occurance will be on the top, finally `RANK()` will give it the rank. rank, dense_rank and row number are nealry same, check here for explanation:https://stackoverflow.com/questions/7747327/sql-rank-versus-row-number

4. `RANK() OVER (ORDER BY SUM(o.price) DESC) AS rank_based_on_total_sales_all_years`, this calculates price of each item in all years (total) and then orders each item regardless of year in desc order and ranks max price of on top then ranks each item by item_id and gives rank. 
