In [55]:
import pandas as pd

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as F

In [57]:
df = pd.read_excel("online_retail.xlsx")
df.to_csv('online_retail.csv', index=False)

In [58]:
spark = SparkSession.builder \
  .master("local[1]") \
  .appName("SparkFirst") \
  .appName("Timeout Troubleshooting") \
  .config("spark.executor.memory", "12g")\
  .config("spark.executor.cores", 8) \
  .config("spark.dynamicAllocation.enabled", "true") \
  .config("spark.dynamicAllocation.maxExecutors", 100) \
  .config("spark.network.timeout", "600s") \
  .config("spark.shuffle.service.enabled", "true") \
.getOrCreate()

In [59]:
df = spark.read.csv('online_retail.csv', header=True)

In [60]:
df.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



Количество строк в файле

In [61]:
lines_cnt = df.count()
print(f'Кол-во строк = {lines_cnt}')

Кол-во строк = 541909


Уникальные клиенты

In [62]:
df.select("CustomerID").distinct().count()

4373

Страна с максимальными продажами

In [63]:
df.select("Country").groupby("Country").count().sort("count", ascending=False).limit(1).show()

+--------------+------+
|       Country| count|
+--------------+------+
|United Kingdom|495478|
+--------------+------+



Самая ранняя покупка

In [64]:
df.agg({'InvoiceDate':'min'}).show()

+-------------------+
|   min(InvoiceDate)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



Самая поздняя покупка

In [65]:
df.agg({'InvoiceDate':'max'}).show()

+-------------------+
|   max(InvoiceDate)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



RFM-анализ

1) Денежная ценность клиента (monetary_value) - сумма всех покупок клиента
2) Частота покупок клиента (frequency) - количество покупок
3) Давность (recency) - текущая дата минут дата покупки

In [66]:
df_mon=df.withColumn('Total_sum_order', round(df.UnitPrice * df.Quantity, 2))\
            .groupBy('CustomerID')\
            .agg(sum('Total_sum_order').alias('Monetary_value'))
df_mon.show(5)

+----------+-----------------+
|CustomerID|   Monetary_value|
+----------+-----------------+
|   15039.0|19786.43999999997|
|   16553.0|5664.569999999999|
|   13178.0|5725.470000000003|
|   17786.0|           278.74|
|   12891.0|            331.0|
+----------+-----------------+
only showing top 5 rows



In [67]:
df_freq = df.groupBy('CustomerID').agg(count('InvoiceNo').alias('Frequency'))
df_freq.show(5)

+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|   15039.0|     1508|
|   16553.0|       86|
|   13178.0|      265|
|   17786.0|       72|
|   12891.0|        3|
+----------+---------+
only showing top 5 rows



In [68]:
today = current_date()

In [69]:
df_rec = df.groupBy('CustomerID')\
            .agg(max('InvoiceDate').alias('Max_date'))
df_rec = df_rec.withColumn('Recency', datediff(today, df_rec['Max_date']))

df_rec.show(5)

+----------+-------------------+-------+
|CustomerID|           Max_date|Recency|
+----------+-------------------+-------+
|      null|2011-12-09 10:26:00|   4296|
|   12346.0|2011-01-18 10:17:00|   4621|
|   12347.0|2011-12-07 15:52:00|   4298|
|   12348.0|2011-09-25 13:13:00|   4371|
|   12349.0|2011-11-21 09:51:00|   4314|
+----------+-------------------+-------+
only showing top 5 rows



In [70]:
df_new = df.join(df_mon, 'CustomerID', how='inner').join(df_freq, 'CustomerID', how='inner').join(df_rec, 'CustomerID', how='inner')

df_new.show(5)

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----------------+---------+-------------------+-------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|   Monetary_value|Frequency|           Max_date|Recency|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----------------+---------+-------------------+-------+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|United Kingdom|5288.630000000009|      312|2011-02-10 14:38:00|   4598|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|United Kingdom|5288.630000000009|      312|2011-02-10 14:38:00|   4598|
|   17850.0|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|United Kingdom|5288.630000000009|      312|2011-02-10 14:38:00|   4598|
|   17850.0|   5

In [71]:
quantiles_r = df_new.approxQuantile('Recency', [0.33, 0.66], 0.01)
quantiles_f = df_new.approxQuantile('Frequency', [0.33, 0.66], 0.01)
quantiles_m = df_new.approxQuantile('Monetary_value', [0.33, 0.66], 0.01)

In [72]:
df_new = df_new.withColumn('Recency_factor', when(df_new['Recency'] <= quantiles_r[0], 'C')
                .when((df_new['Recency'] > quantiles_r[0]) & (df_new['Recency'] <= quantiles_r[1]), 'B')
                .otherwise('A'))

In [73]:
df_new = df_new.withColumn('Frequency_factor', when(df_new['Frequency'] <= quantiles_f[0], 'C')
                .when((df_new['Frequency'] > quantiles_f[0]) & (df_new['Frequency'] <= quantiles_f[1]), 'B')
                .otherwise('A'))

In [74]:
df_new = df_new.withColumn('Monetary_factor', when(df_new['Monetary_value'] <= quantiles_m[0], 'C')
                .when((df_new['Monetary_value'] > quantiles_m[0]) & (df_new['Monetary_value'] <= quantiles_m[1]), 'B')
                .otherwise('A'))

In [75]:
df_new.show(5)

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----------------+---------+-------------------+-------+--------------+----------------+---------------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|   Monetary_value|Frequency|           Max_date|Recency|Recency_factor|Frequency_factor|Monetary_factor|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+-----------------+---------+-------------------+-------+--------------+----------------+---------------+
|   17850.0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|United Kingdom|5288.630000000009|      312|2011-02-10 14:38:00|   4598|             A|               B|              A|
|   17850.0|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|United Kingdom|5288.630000000009|      312|2011-02-10 14:38:

In [76]:
rfm_table = df_new.withColumn('groups', F.concat(F.col('Recency_factor'),F.col('Frequency_factor'),F.col('Monetary_factor')))

In [77]:
rfm_table.select('CustomerID', 
                 'Recency', 'Frequency', 'Monetary_value', 
                 'Recency_factor', 'Frequency_factor', 'Monetary_factor', 
                 'groups').show(5)

+----------+-------+---------+-----------------+--------------+----------------+---------------+------+
|CustomerID|Recency|Frequency|   Monetary_value|Recency_factor|Frequency_factor|Monetary_factor|groups|
+----------+-------+---------+-----------------+--------------+----------------+---------------+------+
|   15039.0|   4305|     1508|19786.43999999997|             B|               A|              A|   BAA|
|   15039.0|   4305|     1508|19786.43999999997|             B|               A|              A|   BAA|
|   15039.0|   4305|     1508|19786.43999999997|             B|               A|              A|   BAA|
|   15039.0|   4305|     1508|19786.43999999997|             B|               A|              A|   BAA|
|   15039.0|   4305|     1508|19786.43999999997|             B|               A|              A|   BAA|
+----------+-------+---------+-----------------+--------------+----------------+---------------+------+
only showing top 5 rows



In [78]:
best_clients = rfm_table.select('CustomerID').filter(rfm_table.groups == 'AAA').distinct()

In [79]:
best_clients.toPandas().to_csv('best_clients.csv', index=False)