In [0]:
spark
%pip install textblob

Python interpreter will be restarted.
Collecting textblob
  Downloading textblob-0.18.0.post0-py3-none-any.whl (626 kB)
Collecting nltk>=3.8
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting tqdm
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting regex>=2021.8.3
  Downloading regex-2024.5.15-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (774 kB)
Installing collected packages: tqdm, regex, nltk, textblob
Successfully installed nltk-3.8.1 regex-2024.5.15 textblob-0.18.0.post0 tqdm-4.66.4
Python interpreter will be restarted.


##Importing the required libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from textblob import TextBlob
from pyspark.ml.feature import VectorAssembler, Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
spark = SparkSession.builder.appName("E-Commerce Data Analysis").getOrCreate()

In [0]:
#spark.conf.set("fs.s3a.access.key", "Enter access key here")
#spark.conf.set("fs.s3a.secret.key", "Enter secret key here")
#spark.conf.set("fs.s3a.endpoint", "Enter s3 endpoint here")

##Schema Definition

In [0]:
olist_orders_dataset_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_approved_at", TimestampType(), True),
    StructField("order_delivered_carrier_date", TimestampType(), True),
    StructField("order_delivered_customer_date", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True)
])

In [0]:
olist_order_reviews_dataset_schema =  StructType([
    StructField("review_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("review_score", IntegerType(), True),
    StructField("review_comment_title", StringType(), True),
    StructField("review_comment_message", StringType(), True),
    StructField("review_creation_date", TimestampType(), True),
    StructField("review_answer_timestamp", TimestampType(), True)
])

In [0]:
olist_order_items_dataset_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", FloatType(), True),
    StructField("freight_value", FloatType(), True)
])

In [0]:
olist_products_dataset_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_category_name", StringType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_length", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("product_weight_g", IntegerType(), True),
    StructField("product_length_cm", IntegerType(), True),
    StructField("product_height_cm", IntegerType(), True),
    StructField("product_width_cm", IntegerType(), True)
])

In [0]:
olist_order_payments_dataset_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", FloatType(), True)
])

In [0]:
olist_orders_dataset_df = spark.read.schema(olist_orders_dataset_schema).format("csv").option("header","true").load("s3://e-commerce-pjt/olist_orders_dataset.csv")
olist_order_reviews_dataset_df = spark.read.schema(olist_order_reviews_dataset_schema).format("csv").option("header","true").load("s3://e-commerce-pjt/olist_order_reviews_dataset.csv")
olist_order_items_dataset_df = spark.read.schema(olist_order_items_dataset_schema).format("csv").option("header","true").load("s3://e-commerce-pjt/olist_order_items_dataset.csv")
olist_products_dataset_df = spark.read.schema(olist_products_dataset_schema).format("csv").option("header","true").load("s3://e-commerce-pjt/olist_products_dataset.csv")
olist_order_payments_dataset_df = spark.read.schema(olist_order_payments_dataset_schema).format("csv").option("header","true").load("s3://e-commerce-pjt/olist_order_payments_dataset.csv")

In [0]:
# 'delivery_time_days' column to include time taken to deliver an order
olist_orders_dataset_df = olist_orders_dataset_df.withColumn("delivery_time_days", datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")))

olist_orders_dataset_df.select("delivery_time_days").show(5)

+------------------+
|delivery_time_days|
+------------------+
|                 8|
|                14|
|                 9|
|                14|
|                 3|
+------------------+
only showing top 5 rows



### 1. Order Value per Day

In [0]:
# Average order value per day for each customer, total order value and number of days for each customer

order_value_df = olist_order_items_dataset_df.groupBy("order_id").agg(sum(col("price")).alias("order_value"))
orders_with_value_df = olist_orders_dataset_df.join(order_value_df, "order_id")

customer_orders = orders_with_value_df.groupby("customer_id").agg(sum("order_value").alias("total_order_value"), (datediff(max("order_purchase_timestamp"), min("order_purchase_timestamp")) + 1).alias("days_since_first_order"))

customer_orders = customer_orders.withColumn("order_value_per_day", col("total_order_value") / col("days_since_first_order"))

customer_orders.show(5)

+--------------------+------------------+----------------------+-------------------+
|         customer_id| total_order_value|days_since_first_order|order_value_per_day|
+--------------------+------------------+----------------------+-------------------+
|f54a9f0e6b351c431...|19.899999618530273|                     1| 19.899999618530273|
|2a1dfb647f32f4390...|             445.0|                     1|              445.0|
|4f28355e5c17a4a42...|             139.0|                     1|              139.0|
|4632eb5a8f175f6fe...|  79.9000015258789|                     1|   79.9000015258789|
|843ff05b30ce4f75b...|  84.9000015258789|                     1|   84.9000015258789|
+--------------------+------------------+----------------------+-------------------+
only showing top 5 rows



### 2. Customer Recency, Frequency and Monetary (RFM) Features

In [0]:
# RFM features to analyze customer behavior.

# Assuming current date is 2023-07-01
rfm = orders_with_value_df.groupBy("customer_id").agg(
    datediff(max("order_purchase_timestamp"), to_date(lit('2023-07-01'))).alias("recency"),  
    count("order_id").alias("frequency"),
    sum("order_value").alias("monetary")
)

rfm.show(5)

+--------------------+-------+---------+------------------+
|         customer_id|recency|frequency|          monetary|
+--------------------+-------+---------+------------------+
|f54a9f0e6b351c431...|  -2350|        1|19.899999618530273|
|2a1dfb647f32f4390...|  -1856|        1|             445.0|
|4f28355e5c17a4a42...|  -2235|        1|             139.0|
|4632eb5a8f175f6fe...|  -2039|        1|  79.9000015258789|
|843ff05b30ce4f75b...|  -2056|        1|  84.9000015258789|
+--------------------+-------+---------+------------------+
only showing top 5 rows



### 3. Average Delivery Time

In [0]:
# Calculate average delivery time for each customer
avg_delivery_time = olist_orders_dataset_df.groupBy("customer_id").agg(avg("delivery_time_days").alias("avg_delivery_time")
)

avg_delivery_time.show(5)

+--------------------+-----------------+
|         customer_id|avg_delivery_time|
+--------------------+-----------------+
|f54a9f0e6b351c431...|             10.0|
|2a1dfb647f32f4390...|             17.0|
|4f28355e5c17a4a42...|             11.0|
|4632eb5a8f175f6fe...|              5.0|
|843ff05b30ce4f75b...|             28.0|
+--------------------+-----------------+
only showing top 5 rows



### 4. Total Items Ordered

In [0]:

# Calculate total items ordered by each customer
total_items_ordered = olist_order_items_dataset_df.groupBy("order_id").count()
total_items_ordered = total_items_ordered.join(orders_with_value_df, "order_id").groupBy("customer_id").sum("count")
total_items_ordered = total_items_ordered.select(col("customer_id"), col("sum(count)").alias("total_items_ordered"))

total_items_ordered.show(5)

+--------------------+-------------------+
|         customer_id|total_items_ordered|
+--------------------+-------------------+
|f54a9f0e6b351c431...|                  1|
|2a1dfb647f32f4390...|                  1|
|4f28355e5c17a4a42...|                  1|
|4632eb5a8f175f6fe...|                  1|
|843ff05b30ce4f75b...|                  1|
+--------------------+-------------------+
only showing top 5 rows



### 5. Unique Categories Purchased

In [0]:

# Join order_items_df with products_df to get product categories
order_items_with_categories = olist_order_items_dataset_df.join(olist_products_dataset_df, "product_id")

# Calculate unique categories purchased by each customer
unique_categories_purchased = order_items_with_categories.groupBy("order_id", "product_category_name").count()
unique_categories_purchased = unique_categories_purchased.join(orders_with_value_df, "order_id").groupBy("customer_id").count().alias("unique_categories")
unique_categories_purchased = unique_categories_purchased.select(col("customer_id"), col("count").alias("unique_categories_purchased"))

unique_categories_purchased.show(5)

+--------------------+---------------------------+
|         customer_id|unique_categories_purchased|
+--------------------+---------------------------+
|61c7dca833a12eaf7...|                          1|
|0c30d6931a3ecd772...|                          1|
|8626a2b044790a9ee...|                          1|
|30f523f7def36192e...|                          1|
|6aa0f075a2225b4dd...|                          1|
+--------------------+---------------------------+
only showing top 5 rows



### 6. Average Product Price


In [0]:

# Join olist_order_items_dataset_df with olist_products_dataset_df to get product prices
order_items_with_prices = olist_order_items_dataset_df.join(olist_products_dataset_df, "product_id")

# Calculate average product price for each customer
avg_product_price_per_order = order_items_with_prices.groupBy("order_id").agg(avg("price").alias("avg_price_per_order"))

order_avg_price_with_customer = avg_product_price_per_order.join(olist_orders_dataset_df, "order_id")

average_product_price = order_avg_price_with_customer.groupBy("customer_id").agg(avg("avg_price_per_order").alias("average_product_price"))

average_product_price.show(5)

+--------------------+---------------------+
|         customer_id|average_product_price|
+--------------------+---------------------+
|f82d1df095bf073a1...|    89.98999786376953|
|6442504c76c94895e...|   59.900001525878906|
|e139948a02a65f3d6...|                117.0|
|fb6f8e0cb2c65dfad...|    116.9000015258789|
|8baeca32aac79a831...|   45.970001220703125|
+--------------------+---------------------+
only showing top 5 rows



### 7. Preferred Payment Type


In [0]:

# Calculate the most frequently used payment type for each customer
payment_type_count = olist_order_payments_dataset_df.groupBy("order_id", "payment_type").count()
payment_type_count = payment_type_count.join(olist_orders_dataset_df, "order_id").groupBy("customer_id", "payment_type").sum("count").alias("total_count")

# Use window function to rank payment types by count for each customer
window = Window.partitionBy("customer_id").orderBy(col("sum(count)").desc())
ranked_payment_types = payment_type_count.withColumn("rank", row_number().over(window))

# Select the most frequently used payment type for each customer
preferred_payment_type = ranked_payment_types.filter(col("rank") == 1).select("customer_id", "payment_type")

preferred_payment_type.show(5)

+--------------------+------------+
|         customer_id|payment_type|
+--------------------+------------+
|00050bf6e01e69d5c...| credit_card|
|000598caf2ef41174...| credit_card|
|000bf8121c3412d30...| credit_card|
|00114026c1b7b52ab...| credit_card|
|0013cd8e350a7cc76...| credit_card|
+--------------------+------------+
only showing top 5 rows



##Sentiment Scores for Customer Reviews
Apply the UDF to the customer reviews data and calculate the sentiment scores.

In [0]:
# Define UDF to calculate sentiment polarity
def get_sentiment(text):
    try:
        return TextBlob(text).sentiment.polarity
    except:
        return None

# Register the UDF
sentiment_udf = udf(get_sentiment, FloatType())

In [0]:
# Apply UDF to calculate sentiment polarity
reviews_df = olist_order_reviews_dataset_df.join(olist_orders_dataset_df, "order_id")
reviews_df = reviews_df.withColumn("sentiment_score", sentiment_udf(col("review_comment_message")))

# Calculate average sentiment score for each customer
customer_sentiment = reviews_df.groupBy("customer_id").agg(avg("sentiment_score").alias("avg_sentiment_score"))

customer_sentiment.show(5)


+--------------------+-------------------+
|         customer_id|avg_sentiment_score|
+--------------------+-------------------+
|f54a9f0e6b351c431...|               null|
|2a1dfb647f32f4390...|              0.375|
|4f28355e5c17a4a42...|                0.0|
|4632eb5a8f175f6fe...|               null|
|843ff05b30ce4f75b...|                0.0|
+--------------------+-------------------+
only showing top 5 rows



##Combining all the features into a single dataframe

In [0]:
# Combine all features into a single DataFrame
features_df = customer_orders \
    .join(rfm, "customer_id", "left") \
    .join(avg_delivery_time, "customer_id", "left") \
    .join(customer_sentiment, "customer_id", "left") \
    .join(total_items_ordered, "customer_id", "left") \
    .join(unique_categories_purchased, "customer_id", "left") \
    .join(average_product_price, "customer_id", "left") \
    .join(preferred_payment_type, "customer_id", "left")
features_df = features_df.fillna(0)
features_df.show(5)

+--------------------+------------------+----------------------+-------------------+-------+---------+------------------+-----------------+-------------------+-------------------+---------------------------+---------------------+------------+
|         customer_id| total_order_value|days_since_first_order|order_value_per_day|recency|frequency|          monetary|avg_delivery_time|avg_sentiment_score|total_items_ordered|unique_categories_purchased|average_product_price|payment_type|
+--------------------+------------------+----------------------+-------------------+-------+---------+------------------+-----------------+-------------------+-------------------+---------------------------+---------------------+------------+
|f82d1df095bf073a1...| 89.98999786376953|                     1|  89.98999786376953|  -1796|        1| 89.98999786376953|              9.0|                0.0|                  1|                          1|    89.98999786376953| credit_card|
|6442504c76c94895e...|59.900

#Spark ML portion

## 1. Customer Segmentation using KMeans


In [0]:

assembler = VectorAssembler(
    inputCols=["total_order_value", "days_since_first_order", "order_value_per_day", "recency", "frequency", "monetary", "avg_delivery_time", "avg_sentiment_score", "total_items_ordered", "unique_categories_purchased", "average_product_price"],
    outputCol="features"
)
kmeans = KMeans(k=3, seed=1)
pipeline = Pipeline(stages=[assembler, kmeans])

model = pipeline.fit(features_df)
predictions = model.transform(features_df)

# Show cluster centers
centers = model.stages[-1].clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)

Cluster Centers:
[ 9.16727452e+01  1.00000000e+00  9.16727452e+01 -2.00712401e+03
  1.00000000e+00  9.16727452e+01  1.20858998e+01  1.06732130e-02
  1.12422895e+00  1.00759878e+00  8.50056097e+01]
[ 4.73584930e+02  1.00000000e+00  4.73584930e+02 -2.01376068e+03
  1.00000000e+00  4.73584930e+02  1.35080053e+01  1.01336108e-02
  1.30937763e+00  1.01408451e+00  4.22240725e+02]
[ 1.66930318e+03  1.00000000e+00  1.66930318e+03 -2.01089552e+03
  1.00000000e+00  1.66930318e+03  1.37072331e+01  9.65963461e-03
  1.34098737e+00  1.00803674e+00  1.50334501e+03]


Model Evaluation for KMeans

In [0]:
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")


Silhouette Score: 0.8351127057076846


## 2. Predictive Modeling for Customer Lifetime Value (CLV)


In [0]:
assembler = VectorAssembler(
    inputCols=["total_order_value", "days_since_first_order", "order_value_per_day", "recency", "frequency", "monetary", "avg_delivery_time", "avg_sentiment_score", "total_items_ordered", "unique_categories_purchased", "average_product_price"],
    outputCol="features"
)
output = assembler.transform(features_df)
final_data = output.select("features", "total_order_value")

train_data, test_data = final_data.randomSplit([0.8, 0.2])

lr = LinearRegression(labelCol="total_order_value")
lr_model = lr.fit(train_data)

test_results = lr_model.evaluate(test_data)
print(f"R2: {test_results.r2}, RMSE: {test_results.rootMeanSquaredError}")

R2: 1.0, RMSE: 3.639715960453995e-07
