In [1]:
adls_account_name = "ecommerceanalytics23"
adls_container_name = "ecommerce-logs"
file_path = "enrichedData"

# Construct the full ADLS path
adls_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/{file_path}"

try:
    df = spark.read.parquet(adls_path)

except Exception as e:
    print(f"Error reading file: {e}")
    print("Please double-check your ADLS account name, container name, and file path.")

StatementMeta(ECommSparkPool, 8, 2, Finished, Available, Finished)

In [6]:
df.printSchema()

StatementMeta(ECommSparkPool, 7, 7, Finished, Available, Finished)

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: double (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- weekNumber: integer (nullable = true)



In [7]:
df.count()

StatementMeta(ECommSparkPool, 7, 8, Finished, Available, Finished)

42089570

In [2]:
df_no_nulls = df.na.drop()
df_no_nulls.count() # Display the DataFrame after dropping nulls

StatementMeta(ECommSparkPool, 8, 3, Finished, Available, Finished)

42089564

In [3]:
from pyspark.sql.functions import col, hour, weekofyear

df_no_nulls = df_no_nulls.withColumn("hour",hour(col("event_time")))  \
                            .withColumn("weekNumber",weekofyear(col("event_time")))


StatementMeta(ECommSparkPool, 8, 4, Finished, Available, Finished)

### Unique visitors per day

In [4]:
from pyspark.sql.functions import to_date, col, countDistinct 

uniqueVisitorPerDay = df_no_nulls.withColumn("event_date",to_date(col("event_time"))) \
                                    .groupBy("event_date") \
                                    .agg(countDistinct("user_id").alias("unique_visitor")) \
                                    .orderBy("event_date")

StatementMeta(ECommSparkPool, 8, 5, Finished, Available, Finished)

In [5]:
output_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/BatchProcessingResult/uniqueVisitorPerDay/"
try:
    uniqueVisitorPerDay.write.mode("overwrite").parquet(output_path)
except Exception as e:
    print(e)

StatementMeta(ECommSparkPool, 8, 6, Finished, Available, Finished)

In [11]:
df_no_nulls.createOrReplaceTempView("CleanData")
spark.sql("""
    SELECT
        CAST(event_time AS DATE) AS event_date,
        COUNT(DISTINCT user_id) AS unique_visitors
    FROM
        CleanData
    GROUP BY
        CAST(event_time AS DATE)
    ORDER BY
        event_date
""").show()

StatementMeta(ECommSparkPool, 7, 12, Finished, Available, Finished)

+----------+---------------+
|event_date|unique_visitors|
+----------+---------------+
|2019-11-01|         160828|
|2019-11-02|         167895|
|2019-11-03|         174567|
|2019-11-04|         197915|
|2019-11-05|         183132|
|2019-11-06|         179224|
|2019-11-07|         194694|
|2019-11-08|         205990|
|2019-11-09|         193528|
|2019-11-10|         200168|
|2019-11-11|         210147|
|2019-11-12|         205921|
|2019-11-13|         207896|
|2019-11-14|         236287|
|2019-11-15|         315108|
|2019-11-16|         393822|
|2019-11-17|         388098|
|2019-11-18|         226772|
|2019-11-19|         196928|
|2019-11-20|         194896|
+----------+---------------+
only showing top 20 rows



### During a certain time, the users add products to their carts but don’t buy them

In [12]:
spark.sql("""
    SELECT
        DISTINCT(event_type)
    FROM
        CleanData
""").show()

StatementMeta(ECommSparkPool, 7, 13, Finished, Available, Finished)

+----------+
|event_type|
+----------+
|  purchase|
|      view|
|      cart|
+----------+



In [6]:
df_cart_event = df_no_nulls.filter(col("event_type") == "cart")
df_purchase_event = df_no_nulls.filter(col("event_type") == "purchase")

abandoned_carts = df_cart_event.alias("cart").join(df_purchase_event.alias("purchase"),
                    (col("cart.user_id") == col("purchase.user_id")) &
                    (col("cart.product_id") == col("purchase.product_id")) &
                    (col("cart.user_session") == col("purchase.user_session")),"left_anti")  \
                    .select(
                        col("cart.user_id").alias("user_id"),
                        col("cart.product_id").alias("product_id"),
                        col("cart.category_id").alias("category_id"),
                        col("cart.brand").alias("brand"),
                        col("cart.event_time").alias("event_time")
                    ).distinct()

# abandoned_carts.show()

StatementMeta(ECommSparkPool, 8, 7, Finished, Available, Finished)

In [7]:
output_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/BatchProcessingResult/abandoned_carts/"
try:
    abandoned_carts.write.mode("overwrite").parquet(output_path)
except Exception as e:
    print(e)

StatementMeta(ECommSparkPool, 8, 8, Finished, Available, Finished)

### Top categories per hour or weekday (i.e. to promote discounts based on trends)

In [10]:
from pyspark.sql.functions import col, hour, count, desc

top_categories_hourly = df_purchase_event.groupBy("hour", "category_id") \
                                    .agg(count("*").alias("purchase_count")) \
                                    .orderBy("hour", desc("purchase_count"))
# top_categories_hourly.show()

StatementMeta(ECommSparkPool, 8, 11, Finished, Available, Finished)

In [11]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, hour, count, desc

window_spec = Window.partitionBy("hour").orderBy(desc("purchase_count"))

top_n_categories_hourly = top_categories_hourly.withColumn("rank", rank().over(window_spec)) \
    .filter(col("rank") <= 5)  
# top_n_categories_hourly.show()

output_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/BatchProcessingResult/top_n_categories_hourly/"
try:
    top_n_categories_hourly.write.mode("overwrite").parquet(output_path)
except Exception as e:
    print(e)

StatementMeta(ECommSparkPool, 8, 12, Finished, Available, Finished)

In [12]:
from pyspark.sql.functions import col, dayofweek, count, desc, when

df_withWeekDays = df_purchase_event.withColumn("weekDay", when(dayofweek(col("event_time")) == 1, "Sunday")
                                                        .when(dayofweek(col("event_time")) == 2, "Monday")
                                                        .when(dayofweek(col("event_time")) == 3,"Tuesday")
                                                        .when(dayofweek(col("event_time")) == 4,"Wednesday")
                                                        .when(dayofweek(col("event_time")) == 5,"Thursday")
                                                        .when(dayofweek(col("event_time")) == 6, "Friday")
                                                        .otherwise("Saturday"))

top_categories_weekDay = df_withWeekDays.groupBy("weekDay", "category_id") \
                                    .agg(count("*").alias("purchase_count")) \
                                    .orderBy("weekDay", desc("purchase_count"))

window_spec_weekDay = Window.partitionBy("weekDay").orderBy(desc("purchase_count"))

top_n_categories_weekDay = top_categories_weekDay.withColumn("rank",rank().over(window_spec_weekDay))  \
                                                .filter(col("rank") <= 5)
# top_categories_weekDay.show()
# top_n_categories_weekDay.show()

output_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/BatchProcessingResult/top_n_categories_weekDay/"
try:
    top_n_categories_weekDay.write.mode("overwrite").parquet(output_path)
except Exception as e:
    print(e)

StatementMeta(ECommSparkPool, 8, 13, Finished, Available, Finished)

### To know which brands need more marketing

In [13]:
from pyspark.sql.functions import col, count,avg

df_view_event = df_no_nulls.filter(col("event_type") == "view")

view_counts = df_view_event.groupBy("brand").agg(count("*").alias("view_count"))
purchase_counts = df_purchase_event.groupBy("brand").agg(count("*").alias("purchase_count"))

brand_performance = view_counts.join(purchase_counts, on="brand", how="left") \
    .na.fill(0, subset=["purchase_count"]) \
    .withColumn("conversion_rate", col("purchase_count") / col("view_count"))

average_conversion_rate = brand_performance.agg(avg("conversion_rate").alias("avg_conversion")).collect()[0]["avg_conversion"]

brands_need_marketing_conversion = brand_performance.filter(col("conversion_rate") < average_conversion_rate).orderBy(col("conversion_rate"))

#brands_need_marketing_conversion.orderBy(col("conversion_rate")).show()

StatementMeta(ECommSparkPool, 8, 14, Finished, Available, Finished)

In [15]:
output_path = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/BatchProcessingResult/brands_need_marketing_conversion/"
try:
    brands_need_marketing_conversion.write.mode("overwrite").parquet(output_path)
except Exception as e:
    print(e)

StatementMeta(ECommSparkPool, 8, 16, Finished, Available, Finished)