# Load Data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("model").config("fs.defaultFS", "file:///").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

df = spark.read.csv("/data/BigData/ecommerce_data_with_trends.csv", header=True)

24/12/07 17:27:59 WARN Utils: Your hostname, flo-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.162 instead (on interface wlp3s0)
24/12/07 17:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/07 17:27:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.functions import isnan, when, count, col

# Check how many null values and nan values are there in the dataframe
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

[Stage 1:====>                                                    (1 + 11) / 12]

+--------------+---------+-----------+-------------+----+-------------+------------+--------+-----+--------+------------+
|transaction_id|timestamp|customer_id|customer_name|city|customer_type|product_name|category|price|quantity|total_amount|
+--------------+---------+-----------+-------------+----+-------------+------------+--------+-----+--------+------------+
|             0|        0|          0|            0|   0|            0|           0|       0|    0|       0|           0|
+--------------+---------+-----------+-------------+----+-------------+------------+--------+-----+--------+------------+



                                                                                

# Churn Analysis

In [3]:
from churn import CustomerChurnMLAnalysis

# Assuming 'spark' is your SparkSession and 'df' is your DataFrame
ml_analysis = CustomerChurnMLAnalysis(spark, df)
results = ml_analysis.predict_customer_churn()





                                                                                

In [4]:
lr =  results['logistic_regression']
rf = results['random_forest']
for key in rf.keys():
    if key == 'predictions' or key == 'churn_summary'or key == 'feature_importance':
        print(key)
        print('Logistic Regression: ')
        lr[key].show(5)
        print('Random Forest: ')
        rf[key].show(5)
    else:
        print(key)
        print('Logistic Regression: ', lr[key])
        print('Random Forest: ', rf[key])
    print('')

lr['feature_importance'].show()


model
Logistic Regression:  LogisticRegressionModel: uid=LogisticRegression_b40995fae747, numClasses=2, numFeatures=5869
Random Forest:  RandomForestClassificationModel: uid=RandomForestClassifier_ef0574085214, numTrees=100, numClasses=2, numFeatures=5869

predictions
Logistic Regression: 


                                                                                

+-----------+--------------------+--------------+-------------+------------------+----------------------+---------------------+--------------------+-------------------------+-----------------+------------+-----+------------------------+-------------------------------+-----------------------+-------------------+----------+--------------------------+---------------------------------+-------------------------+---------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
|customer_id|       customer_name|          city|customer_type|total_transactions|avg_transaction_amount|total_purchase_amount|  last_purchase_date|unique_products_purchased|unique_categories|recency_days|churn|total_transactions_index|unique_products_purchased_index|unique_categories_index|customer_type_index|city_index|total_transactions_encoded|unique_products_purchased_encoded|unique_categories_encoded|customer_type_encoded|       city_encoded|

                                                                                

+-----------+--------------------+--------------+-------------+------------------+----------------------+---------------------+--------------------+-------------------------+-----------------+------------+-----+------------------------+-------------------------------+-----------------------+-------------------+----------+--------------------------+---------------------------------+-------------------------+---------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
|customer_id|       customer_name|          city|customer_type|total_transactions|avg_transaction_amount|total_purchase_amount|  last_purchase_date|unique_products_purchased|unique_categories|recency_days|churn|total_transactions_index|unique_products_purchased_index|unique_categories_index|customer_type_index|city_index|total_transactions_encoded|unique_products_purchased_encoded|unique_categories_encoded|customer_type_encoded|       city_encoded|

                                                                                

+-----+-----+-----------------+-------------------+------------------+
|churn|count| avg_transactions|avg_purchase_amount|  avg_recency_days|
+-----+-----+-----------------+-------------------+------------------+
|    1| 1488|99.51948924731182|  179854.8714247312|42.656586021505376|
|    0| 1451|99.86078566505859| 2833506.6143418327|42.648518263266716|
+-----+-----+-----------------+-------------------+------------------+

Random Forest: 


                                                                                

+-----+-----+-----------------+-------------------+------------------+
|churn|count| avg_transactions|avg_purchase_amount|  avg_recency_days|
+-----+-----+-----------------+-------------------+------------------+
|    1| 1488|99.51948924731182|  179854.8714247312|42.656586021505376|
|    0| 1451|99.86078566505859| 2833506.6143418327|42.648518263266716|
+-----+-----+-----------------+-------------------+------------------+


+--------------------+--------------------+
|             feature|          importance|
+--------------------+--------------------+
|  total_transactions|  0.9881058875513138|
|avg_transaction_a...|   0.979501918676392|
|total_purchase_am...| 0.06360423926452081|
|                city|0.024019819255055606|
|unique_products_p...|0.011588844957606854|
|       customer_type|0.011350763874787178|
|        recency_days|0.005564842811738...|
|   unique_categories|0.002451888369337...|
+--------------------+--------------------+



# Customer Segmentation

In [5]:
from customer_seg import CustomerSegMLAnalysis

ml_analysis = CustomerSegMLAnalysis(spark, df)
clustered_customers_df, silhouette = ml_analysis.cluster_customers(k=10)
print(silhouette)

                                                                                

-0.061620028883509544


In [6]:
# Assuming `clustered_customers_df` is a PySpark DataFrame
for cluster_id in range(10):  # Loop through all clusters
    print(f"Cluster {cluster_id}:")
    clustered_customers_df.filter(clustered_customers_df.cluster == cluster_id).show()

Cluster 0:


                                                                                

+-----------+--------------------+-------+--------------------+--------------------+
|customer_id|       customer_name|cluster|         top_product|   spending_variance|
+-----------+--------------------+-------+--------------------+--------------------+
|       1001|      Nicholas Wolfe|      0|Accessories Produ...|4.7139714837479126E8|
|       1009|  Mr. Martin Hammond|      0|Supplements Produ...|  1233305.8830931948|
|       1015|        William Tran|      0|     Audio Product_7|5.1819040292093205E8|
|       1029|     Nicole Carrillo|      0|Kitchen Appliance...|  1519026.2154582455|
|       1047|        Nathan Smith|      0|Personal Care Pro...| 4.597733439227818E8|
|       1049|         Julia Hines|      0|Home Decor Product_9| 5.657236510934278E8|
|        105|      Elizabeth Ruiz|      0|   Bedding Product_8|  1416754.4615485955|
|       1050|Christopher Anderson|      0|  Cameras Product_10|  1532092.4847565852|
|       1053|      Zachary Morris|      0|     Shoes Product_8|  

                                                                                

In [7]:
from pyspark.sql import functions as F

# Group by cluster and compute statistics
cluster_summary = clustered_customers_df.groupBy("cluster").agg(
    F.mean("total_transactions").alias("avg_transactions"),
    F.mean("avg_transaction_amount").alias("avg_transaction_amount"),
    F.mean("total_purchase_amount").alias("avg_purchase_amount"),
    F.mean("recency_days").alias("avg_recency_days"),
    F.mean("unique_products_purchased").alias("avg_unique_products"),
    F.count("*").alias("customer_count")
)
cluster_summary.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `total_transactions` cannot be resolved. Did you mean one of the following? [`top_product`, `cluster`, `customer_id`, `customer_name`, `spending_variance`].;
'Aggregate [cluster#7051], [cluster#7051, avg('total_transactions) AS avg_transactions#7725, avg('avg_transaction_amount) AS avg_transaction_amount#7727, avg('total_purchase_amount) AS avg_purchase_amount#7729, avg('recency_days) AS avg_recency_days#7731, avg('unique_products_purchased) AS avg_unique_products#7733, count(1) AS customer_count#7735L]
+- Project [customer_id#19, customer_name#20, cluster#7051, top_product#5321, spending_variance#5337]
   +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, recency_days#5383, ... 5 more fields]
      +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, recency_days#5383, ... 4 more fields]
         +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, recency_days#5383, ... 3 more fields]
            +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, recency_days#5383, ... 2 more fields]
               +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, recency_days#5383, UDF(cast(top_product#5321 as string)) AS top_product_index#5695]
                  +- Project [customer_id#19, customer_name#20, city#21, customer_type#22, total_transactions#5299L, avg_transaction_amount#5301, total_purchase_amount#5303, last_purchase_date#5305, unique_products_purchased#5306L, unique_categories#5307L, purchase_frequency#5309L, avg_transaction_value#5311, spending_in_specific_category#5313, spending_25th_percentile#5315, spending_50th_percentile#5317, spending_75th_percentile#5319, top_product#5321, product_diversity#5322L, category_engagement#5323L, category_frequency#5325L, customer_lifetime_value#5327, spending_variance#5337, median_spend#5338, datediff(current_date(Some(Europe/Paris)), to_date(last_purchase_date#5305, None, Some(Europe/Paris), false)) AS recency_days#5383]
                     +- Aggregate [customer_id#19, customer_name#20, city#21, customer_type#22], [customer_id#19, customer_name#20, city#21, customer_type#22, count(transaction_id#17) AS total_transactions#5299L, avg(total_amount#5275) AS avg_transaction_amount#5301, sum(total_amount#5275) AS total_purchase_amount#5303, max(to_timestamp(timestamp#5239, None, TimestampType, Some(Europe/Paris), false)) AS last_purchase_date#5305, count(distinct product_name#23) AS unique_products_purchased#5306L, count(distinct category#24) AS unique_categories#5307L, count(transaction_id#17) AS purchase_frequency#5309L, avg(total_amount#5275) AS avg_transaction_value#5311, sum(CASE WHEN (category#24 = specific_category) THEN total_amount#5275 ELSE cast(0 as double) END) AS spending_in_specific_category#5313, percentile_approx(total_amount#5275, 0.25, 10000, 0, 0) AS spending_25th_percentile#5315, percentile_approx(total_amount#5275, 0.5, 10000, 0, 0) AS spending_50th_percentile#5317, percentile_approx(total_amount#5275, 0.75, 10000, 0, 0) AS spending_75th_percentile#5319, first(product_name#23, false) AS top_product#5321, count(distinct product_name#23) AS product_diversity#5322L, count(distinct category#24) AS category_engagement#5323L, count(category#24) AS category_frequency#5325L, sum(total_amount#5275) AS customer_lifetime_value#5327, var_samp(total_amount#5275) AS spending_variance#5337, percentile(total_amount#5275, cast(0.5 as double), 1, 0, 0, false) AS median_spend#5338]
                        +- Project [transaction_id#17, timestamp#5239, customer_id#19, customer_name#20, city#21, customer_type#22, product_name#23, category#24, price#5251, quantity#5263, cast(total_amount#27 as double) AS total_amount#5275]
                           +- Project [transaction_id#17, timestamp#5239, customer_id#19, customer_name#20, city#21, customer_type#22, product_name#23, category#24, price#5251, cast(quantity#26 as int) AS quantity#5263, total_amount#27]
                              +- Project [transaction_id#17, timestamp#5239, customer_id#19, customer_name#20, city#21, customer_type#22, product_name#23, category#24, cast(price#25 as double) AS price#5251, quantity#26, total_amount#27]
                                 +- Project [transaction_id#17, cast(timestamp#18 as timestamp) AS timestamp#5239, customer_id#19, customer_name#20, city#21, customer_type#22, product_name#23, category#24, price#25, quantity#26, total_amount#27]
                                    +- Relation [transaction_id#17,timestamp#18,customer_id#19,customer_name#20,city#21,customer_type#22,product_name#23,category#24,price#25,quantity#26,total_amount#27] csv
