In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark").getOrCreate()
spark


In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window


In [3]:
def printdf(df, l=5):
    return df.limit(l).toPandas()

def nullcount(df):
    return {col: df.filter(df[col].isNull()).count() for col in df.columns}

def shape(df):
    # df.toPandas().shape
    print((df.count(), len(df.columns)))

In [4]:
data = spark.read.csv("2019-Oct.csv", 
                      inferSchema=True, 
                      header=True)

#print('Dataframe dimensions:', (data.count(), len(data.columns)))
#printdf(data)

In [5]:
data.columns

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

In [6]:
new_data = data.withColumn("date_type",F.to_date("event_time"))
new_data.show()

+--------------------+----------------+----------+-------------------+-------------+--------+-----+---------+--------------------+----------+
|          event_time|      event_type|product_id|        category_id|category_code|   brand|price|  user_id|        user_session| date_type|
+--------------------+----------------+----------+-------------------+-------------+--------+-----+---------+--------------------+----------+
|2019-10-01 00:00:...|            cart|   5773203|1487580005134238553|         null|  runail| 2.62|463240011|26dd6e6e-4dac-477...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5773353|1487580005134238553|         null|  runail| 2.62|463240011|26dd6e6e-4dac-477...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5881589|2151191071051219817|         null|  lovely|13.48|429681830|49e8d843-adf3-428...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5723490|1487580005134238553|         null|  runail| 2.62|463240011|26dd6e6e-4dac-477...|2019-10-01|
|2019-

In [7]:
len(new_data.columns)

10

In [8]:
nullcount(new_data)

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 4034806,
 'brand': 1659261,
 'price': 0,
 'user_id': 0,
 'user_session': 637,
 'date_type': 0}

In [9]:

rtl_data = new_data\
    .groupBy("event_type")\
    .agg(F.count("user_id").alias("Count"))\
    .withColumn('Total', F.sum('Count').over(Window.partitionBy()))\
    .withColumn('%', (F.col('Count')/F.col('Total'))*100)\
    .sort("Count", ascending=False)

rtl_data.show()


+----------------+-------+-------+------------------+
|      event_type|  Count|  Total|                 %|
+----------------+-------+-------+------------------+
|            view|1862164|4102283|45.393357796134495|
|            cart|1232385|4102283|30.041442777107285|
|remove_from_cart| 762110|4102283|18.577704171067673|
|        purchase| 245624|4102283| 5.987495255690551|
+----------------+-------+-------+------------------+



In [10]:
#data.count()
#len(data.columns)
new_data.select(F.countDistinct("user_id","user_session","event_type","event_time","product_id")).show()

+-------------------------------------------------------------------------+
|count(DISTINCT user_id, user_session, event_type, event_time, product_id)|
+-------------------------------------------------------------------------+
|                                                                  3888554|
+-------------------------------------------------------------------------+



In [11]:


# Filter out null customer ids

rtl_data = new_data.filter(F.col("user_session").isNotNull())
rtl_data = new_data.filter(F.col("category_code").isNotNull())
rtl_data = new_data.filter(F.col("brand").isNotNull())

rtl_data.show()
# rtl_data.toPandas().describe()

+--------------------+----------------+----------+-------------------+--------------------+---------+-----+---------+--------------------+----------+
|          event_time|      event_type|product_id|        category_id|       category_code|    brand|price|  user_id|        user_session| date_type|
+--------------------+----------------+----------+-------------------+--------------------+---------+-----+---------+--------------------+----------+
|2019-10-01 00:00:...|            cart|   5773203|1487580005134238553|                null|   runail| 2.62|463240011|26dd6e6e-4dac-477...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5773353|1487580005134238553|                null|   runail| 2.62|463240011|26dd6e6e-4dac-477...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5881589|2151191071051219817|                null|   lovely|13.48|429681830|49e8d843-adf3-428...|2019-10-01|
|2019-10-01 00:00:...|            cart|   5723490|1487580005134238553|                null|   runail

In [12]:
nullcount(rtl_data)

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 2394337,
 'brand': 0,
 'price': 0,
 'user_id': 0,
 'user_session': 424,
 'date_type': 0}

In [13]:
rtl_data.select(F.max('date_type')).collect()

[Row(max(date_type)=datetime.date(2019, 10, 31))]

In [14]:
# Recency   = Overall latest invoice date - individual customer's last invoice date
# Frequency = count of invoice no. of transaction(s)
# Monetary = Sum of Total amount for each customer

# to calculate recency in days.
latest_date = F.to_date(F.lit("2019-11-01"), 'yyyy-MM-dd')

# Create RFM Modelling scores for each customer


rfm_scores = (rtl_data.groupBy("user_id")
              .agg((F.datediff(latest_date, F.max(F.col("date_type")))).alias("Recency"),
                   F.count("*").alias("Frequency"),
                   F.sum(F.col("price")).alias("Monetary")).sort("user_id"))



rfm_scores.show()

+--------+-------+---------+------------------+
| user_id|Recency|Frequency|          Monetary|
+--------+-------+---------+------------------+
| 4103071|     11|        5|             16.57|
| 8846226|     30|       32|            140.19|
| 9794320|      9|       19|16.359999999999996|
|10280338|      5|       79| 418.7999999999998|
|12055855|     16|        2|              6.46|
|13383118|     20|        1|              8.89|
|13392135|      5|        2|              9.83|
|14297993|     10|       11|147.17000000000002|
|16263196|      8|        2|              96.5|
|20554973|      7|       74| 351.2100000000001|
|22050296|      9|        2|45.879999999999995|
|23668204|     21|        1|              3.76|
|23843972|     26|        3|              6.33|
|25302217|     22|        2|              1.12|
|25893719|     29|       15|49.959999999999994|
|26690649|     13|        5|             80.15|
|26795186|     23|        2|             19.04|
|26964783|      7|        1|            

In [15]:
rfm_scores.describe().show()

+-------+--------------------+------------------+------------------+------------------+
|summary|             user_id|           Recency|         Frequency|          Monetary|
+-------+--------------------+------------------+------------------+------------------+
|  count|              285460|            285460|            285460|            285460|
|   mean|5.2606660292191553E8|16.314608001120998| 8.558193792475302| 78.95023589991192|
| stddev| 6.632501508238726E7|  9.41055013666885|28.147795732899606|253.51984993536047|
|    min|             4103071|                 1|                 1|               0.0|
|    max|           566280278|                31|              2403|22205.659999999843|
+-------+--------------------+------------------+------------------+------------------+

