Copyright (c) Microsoft Corporation. 
Licensed under the MIT license. 
# Power BI Data Model

Transform the cleaned dataset for reporting in Power BI, storing tables as CSV files in the Data Lake.

The resulting data model includes four tables:

1. Customer: user info, growth/no growth, & aggregated session metrics
1. Activity: user clickstream activity, e.g. product views, purchases
1. Products: reference table with additional product information
1. Categories: reference table with additional product category information.


## Library Imports


In [1]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *

## Read in Data from Delta Lake


In [None]:
data_lake_account_name = ''
file_system_name = ''

In [2]:
paths = [f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/transformed_data/cleaned_data_electronics']
full_dataset = spark.read.format("delta").load(*paths)

In [4]:
# add month & year, re-order columns
cleaned_df = full_dataset.withColumn('month', month('event_time')) \
    .withColumn('year', year('event_time')) \
    .drop('category_code') \
    .select('user_id', 'year', 'month', 'event_type', 'product_id', 'category_id', 'category', 'subcategory', 'brand', 'price', 'user_session', 'event_time')

In [5]:
# write cleaned_df table to an intermediate spark table
cleaned_df.write.format("delta").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/intermediate_tables/cleaned_df")

In [2]:
# read cleaned_df table from intermediate spark table
cleaned_df = spark.read.format("delta").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/intermediate_tables/cleaned_df")

## Data Modeling

Transform data to create four tables for reporting, each with a unique identifier (UID, product_id, or category_id) to be linked in Power BI.

### Customer Table

**Growth indicator:** Classify customers as growth or no growth based on month-over-month change in net revenue.

1. Growth if >10% net revenue increase
1. No growth if >10% net revenue decrease
1. No change if in between


In [3]:
# get monthly revenue
growth = cleaned_df.filter(col('event_type') == 'purchase') \
    .withColumn('revenue', cleaned_df['price'].cast('double'))\
    .groupBy('user_id', 'year', 'month') \
    .sum('revenue') \
    .withColumnRenamed('sum(revenue)', 'total_net_revenue') \
    .orderBy('user_id', 'year', 'month')

In [4]:
# get deltas for previous month
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window_specs = Window.partitionBy('user_id').orderBy('user_id', 'year', 'month')

growth_lag = growth.withColumn('last_month_revenue', lag(growth.total_net_revenue).over(window_specs).cast('double'))
growth_delta = growth_lag.withColumn('delta_net_revenue', (growth_lag.total_net_revenue - growth_lag.last_month_revenue).cast('double'))

In [5]:
# identify growth vs. no growth customers
# growth defined as +/-10% revenue month-over-month

df_growth_a = growth_delta.withColumn('percent_delta_revenue', growth_delta['delta_net_revenue']/growth_delta['last_month_revenue'].cast('double'))
df_growth = df_growth_a.withColumn('growth', 
        when(df_growth_a['percent_delta_revenue'] > .1, 'growth')
        .when(df_growth_a['percent_delta_revenue'] < -.1, 'decline')
        .otherwise('no change')) \
        .drop('last_month_revenue', 'delta_net_revenue') \
        .filter(col('growth').isNotNull())

**Session & buying behavior:** Calculated on a per user, per month basis.

* Number of sessions
* Average session duration
* Average conversion rate
* Average order value
* Average cart abandon rate


In [6]:
# sessions per user
sessions_per_user_per_month = cleaned_df.groupBy('user_id', 'year', 'month') \
    .agg(countDistinct('user_session').alias('sessions_per_user_per_month')) \
    .fillna({'sessions_per_user_per_month': 0}) \
    .orderBy('user_id', 'year', 'month')

In [7]:
# avg session duration
# time between start & end of each session, aggregated per user per month
session_durations = cleaned_df.groupBy('user_id', 'year', 'month', 'user_session') \
    .agg(
        unix_timestamp(min('event_time')).alias('session_start_time'),
        unix_timestamp(max('event_time')).alias('session_end_time')) \
    .withColumn('session_duration', col('session_end_time')-col('session_start_time')) \
    .drop('user_session', 'session_start_time', 'session_end_time')

avg_session_duration_per_user_per_month = session_durations.groupBy('user_id', 'year', 'month') \
    .agg(mean('session_duration').cast('double').alias('avg_session_duration_per_user_per_month')) \
    .orderBy('user_id', 'year', 'month')

#avg_session_duration_per_user_per_month.orderBy(desc('avg_session_duration_per_user_per_month')).show(5)

In [8]:
# avg conversion rate
# avg # purchases / # views per user per month
avg_conversion_rate_per_user_per_month = cleaned_df.groupBy('user_id', 'year', 'month') \
    .agg(
        count(when(col('event_type') == 'view', True)).alias('num_views'),
        count(when(col('event_type') == 'purchase', True)).alias('num_purchases')) \
    .fillna({'num_views': 0, 'num_purchases': 0}) \
    .withColumn('avg_conversion_rate_per_user_per_month', (col('num_purchases')/col('num_views')).cast('double')) \
    .drop('num_views', 'num_purchases') \
    .orderBy('user_id', 'year', 'month')

#avg_conversion_rate_per_user_per_month.orderBy(desc('avg_conversion_rate_per_user_per_month')).show(5)

In [9]:
# avg order value
# price per user per month, for purchases only
avg_order_value_per_user_per_month = cleaned_df.filter(col('event_type') == 'purchase') \
    .groupBy('user_id', 'year', 'month') \
    .agg(mean('price').cast('double').alias('avg_order_value_per_user_per_month')) \
    .orderBy('user_id', 'year', 'month')

#avg_order_value_per_user_per_month.show(5)

In [10]:
# avg_cart_abandon_rate
# items that were added to cart, but not purchased
abandon_rate_per_session = cleaned_df.filter((col('event_type') == 'purchase') | (col('event_type') == 'cart')) \
    .groupBy('user_id', 'year', 'month', 'user_session', 'product_id') \
    .pivot('event_type').agg(count('product_id')) \
    .fillna({'cart':0, 'purchase':0}) \
    .withColumn('cart_abandon_rate', (col('cart')-col('purchase'))/col('cart'))

avg_cart_abandon_rate = abandon_rate_per_session.groupBy('user_id', 'year', 'month') \
    .agg(mean('cart_abandon_rate').cast('double').alias('avg_cart_abandon_rate'))

#avg_cart_abandon_rate.show(5)

**Join all Customer DataFrames**


In [11]:
# join customer dfs
def join_dfs (df_list):
    joined_df = df_growth
    for l in df_list:
        joined_df = joined_df.join(l, ['user_id', 'year', 'month'], how='left')
    return joined_df

customers_joined = join_dfs([sessions_per_user_per_month, \
    avg_session_duration_per_user_per_month, \
    avg_conversion_rate_per_user_per_month, \
    avg_order_value_per_user_per_month, \
    avg_cart_abandon_rate])

In [12]:
# add unique identifier
customers = customers_joined.withColumn('UID', concat(customers_joined['user_id'], lit('-'), customers_joined['year'], lit('-'), customers_joined['month']))

### Clickstream Activity Table

A transaction table that lists each clickstream event, including product views, add to cart, and purchases.

In [13]:
# filter to only rows where 'growth' is applicable, i.e. rows in customer table
activity = cleaned_df.withColumn('UID', concat(cleaned_df['user_id'], lit('-'), cleaned_df['year'], lit('-'), cleaned_df['month'])) \
    .join(customers, ['UID'], how='right') \
    .select('UID', 'event_type', 'product_id')

### Products Table

A reference table with additional product information.

In [14]:
products = cleaned_df.select('product_id', 'brand', 'price', 'category_id').dropDuplicates(['product_id'])

### Categories Table

A reference table with additional product category information.

In [15]:
categories = cleaned_df.select('category_id', 'category', 'subcategory').dropDuplicates(['category_id'])

## Save Tables to Data Lake

Persist the four tables to CSV files in the Data Lake for reporting.


In [16]:
save_path = f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/reporting/'

customers.coalesce(1).write.option('header', 'true').mode('overwrite').option("overwriteSchema", "true").csv(save_path+'customers')
activity.write.option('header', 'true').mode('overwrite').option("overwriteSchema", "true").csv(save_path+'activity')
products.coalesce(1).write.option('header', 'true').option("overwriteSchema", "true").mode('overwrite').csv(save_path+'products')
categories.coalesce(1).write.option('header', 'true').option("overwriteSchema", "true").mode('overwrite').csv(save_path+'categories')