In [171]:
# import findspark
# findspark.init()
# findspark.find()

In [172]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [173]:
import pandas as pd


pd.read_excel('./online_retail.xlsx', index_col=0).to_csv('./online_retail.csv', sep=";")


spark = SparkSession.builder.master("local[8]")\
    .appName("SparkFirst")\
    .config("spark.executor.memory", "10g")\
    .config("spark.executor.cores", 5)\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", 5)\
    .config("spark.shuffle.service.enabled", "true")\
    .getOrCreate()

data_schema = StructType([
               StructField('InvoiceNo', StringType(), True),
               StructField('StockCode', StringType(), True),
               StructField('Description', StringType(), True),
               StructField('Quantity', IntegerType(), True),
               StructField('InvoiceDate', DateType(), True),
               StructField('UnitPrice', DoubleType(), True),
               StructField('CustomerID', StringType(), True),
               StructField('Country', StringType(), True),
            ])
                
df = spark.read.csv("./online_retail.csv", sep=";", header=True, schema=data_schema)

In [174]:
df.createOrReplaceTempView("sales_df")
df.show()

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6| 2010-12-01|     4.25|   17850.0|United Kingdom|


In [175]:
print("Количество строк в файле: ", df.count())

Количество строк в файле:  541909


In [176]:
print("Количество уникальных клиентов: ",
      df.select("CustomerID")
          .groupby("CustomerID")
          .count()
          .count()
     )

Количество уникальных клиентов:  4373


In [177]:
print("В какой стране совершается большинство покупок:")
df.select("Country").groupby("Country").count().sort("count", ascending=False).limit(1).show()

В какой стране совершается большинство покупок:
+--------------+------+
|       Country| count|
+--------------+------+
|United Kingdom|495478|
+--------------+------+



In [178]:
print("Дата самой последней покупки на платформе:")
df.agg({"InvoiceDate": "max"}).show()

Дата самой последней покупки на платформе:
+----------------+
|max(InvoiceDate)|
+----------------+
|      2011-12-09|
+----------------+



In [179]:
print("Дата самой ранней покупки на платформе:")
# df.agg(min("InvoiceDate")).show()
df.agg({"InvoiceDate": "min"}).show()

Дата самой ранней покупки на платформе:
+----------------+
|min(InvoiceDate)|
+----------------+
|      2010-12-01|
+----------------+



In [217]:
recency = spark.sql("""
WITH sales AS (
    SELECT 
    CustomerID,
    sum(Quantity * UnitPrice) as sum_quantity,
    count(InvoiceNo) as sales_quantity,
    sum(Quantity) as frequency_quantity,
    CASE
        WHEN to_date(InvoiceDate) <= to_date("2011-12-09") AND to_date(InvoiceDate) > to_date("2011-12-02") THEN "A"
        WHEN to_date(InvoiceDate) <= to_date("2011-12-01") AND to_date(InvoiceDate) > to_date("2011-11-09") THEN "B"
        ELSE "C"
    END recency_group
    FROM global_temp.sales_df
    WHERE CustomerID IS NOT NULL
    GROUP BY CustomerID, recency_group
    ORDER BY CustomerID ASC
),
frequency_groups AS (
    SELECT
        avg(sales_quantity) * 2 as A,
        avg(sales_quantity) as B  
    FROM sales
),
customer_frequency_groups AS (
    SELECT
        CustomerID,
        CASE
            WHEN sales_quantity >= frequency_groups.A THEN "A"
            WHEN sales_quantity < frequency_groups.A AND sales_quantity >= frequency_groups.B THEN "B"
            ELSE "C"
        END as Frequency
    FROM sales
    JOIN frequency_groups
    ORDER BY CustomerID ASC
),
customer_frequency AS (
    select 
        CustomerID,
        min(Frequency) OVER (PARTITION BY CustomerID) as Frequency
    from customer_frequency_groups
    GROUP BY CustomerID, Frequency
),
customer_recency_group AS (
    SELECT
        CustomerID,
        min(recency_group) OVER (PARTITION BY CustomerID) as Recency
    FROM sales
),
customer_recency AS (
    SELECT
        CustomerID,
        Recency
    FROM customer_recency_group
    GROUP BY CustomerID, Recency
),
monetary_groups AS (
    SELECT
        avg(sum_quantity) * 2 as A,
        avg(sum_quantity) as B
    FROM sales
),
customer_monetary_groups AS (
    SELECT
        CustomerID,
        CASE
            WHEN sales_quantity >= monetary_groups.A THEN "A"
            WHEN sales_quantity < monetary_groups.A AND sales_quantity >= monetary_groups.B THEN "B"
            ELSE "C"
        END as Monetary
    FROM sales
    JOIN monetary_groups
    GROUP BY CustomerID, Monetary
    ORDER BY Monetary ASC
)

select
cast(m.CustomerID as int),
m.Monetary,
r.Recency,
f.Frequency
FROM customer_monetary_groups m
JOIN customer_recency r ON m.CustomerID = r.CustomerID
JOIN customer_frequency f ON m.CustomerID = f.CustomerID
WHERE r.Recency = "A" AND f.Frequency = "A" AND m.Monetary = "A"
GROUP BY m.CustomerID, m.Monetary, r.Recency, f.Frequency

""")

In [220]:
# recency.toDF('CustomerID', 'Monetary', 'Recency', 'Frequency')
pandasDF = recency.toDF('CustomerID', 'Monetary', 'Recency', 'Frequency').toPandas().to_csv('./rfmDF.csv', sep=";")

In [219]:
print(pandasDF)

   CustomerID Monetary Recency Frequency
0       17841        A       A         A
1       14911        A       A         A
2       12748        A       A         A
