In [34]:
import numpy as np
import pandas as pd
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

### 1. С помощью модуля pandas преобразуйте файл из .xlsx в .csv формат

In [35]:
# Путь к файлу Excel
pathFile = '/content/online_retail.xlsx'

In [36]:
# Загрузка файла
df = pd.read_excel(pathFile)

In [37]:
# Путь для сохранения файла CSV
pathCSV = '/content/online_retail.csv'

In [38]:
# Сохранение DataFrame в формате CSV
df.to_csv(pathCSV, sep=";", index=False)

### 2. Инициализируйте Spark-сессию

In [39]:
# Инициализация Spark и создание сессии
findspark.init()
spark = SparkSession.builder.master("local[1]").appName("task_47").config("spark.executor.memory", "10g")\
    .config("spark.executor.cores", 5).config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", 5).config("spark.shuffle.service.enabled", "true").getOrCreate()

In [40]:
# Определение схемы данных
data_schema = [
    StructField('InvoiceNo', StringType(), True),
    StructField('StockCode', StringType(), True),
    StructField('Description', StringType(), True),
    StructField('Quantity', IntegerType(), True),
    StructField('InvoiceDate', DateType(), True),
    StructField('UnitPrice', DoubleType(), True),
    StructField('CustomerID', StringType(), True),
    StructField('Country', StringType(), True),
]
final_struc = StructType(fields=data_schema)

### 3. Создайте dataframe из скачанного файла

In [46]:
# Чтение файла CSV в DataFrame
dfs = spark.read.csv(pathCSV, sep=";", header=True, schema=final_struc)

### 4. Подсчитайте следующие показатели:

#### a. Количество строк в файле

In [47]:
# Подсчет количества строк в DataFrame
row_count = dfs.count()
print(f"Количество строк в файле: {row_count}")

Количество строк в файле: 541909


#### b. Количество уникальных клиентов

In [48]:
# Подсчет количества уникальных клиентов
unique_customers_count = dfs.select('CustomerID').distinct().count()
print(f"Количество уникальных клиентов: {unique_customers_count}")

Количество уникальных клиентов: 4373


#### c. В какой стране совершается большинство покупок

In [49]:
from pyspark.sql.functions import desc

# Группировка данных по стране и подсчет количества покупок в каждой стране
purchase_count_by_country = dfs.groupBy('Country').count()

# Нахождение страны с наибольшим количеством покупок
most_purchases_country = purchase_count_by_country.orderBy(desc('count')).first()['Country']

print(f"Страна с наибольшим количеством покупок: {most_purchases_country}")

Страна с наибольшим количеством покупок: United Kingdom


#### d. Даты самой ранней и самой последней покупки на платформе

In [51]:
from pyspark.sql.functions import min, max

# Нахождение самой ранней и самой последней даты покупки
earliest_date = dfs.select(min('InvoiceDate')).first()[0]
latest_date = dfs.select(max('InvoiceDate')).first()[0]

print(f"Самая ранняя дата покупки: {earliest_date}")
print(f"Самая последняя дата покупки: {latest_date}")

Самая ранняя дата покупки: 2010-12-01
Самая последняя дата покупки: 2011-12-09


### 5. Проведите RFM-анализ клиентов платформы

#### a. Recency - Давность: как давно каждый покупатель совершил покупку

In [52]:
from pyspark.sql.functions import current_date, expr

# Создание временного представления
dfs.createOrReplaceTempView("online_retail_view")

In [55]:
# Расчет Recency
recency_query = """
    SELECT
        CustomerID,
        DATEDIFF(current_date(), MAX(InvoiceDate)) AS Recency
    FROM
        online_retail_view
    GROUP BY
        CustomerID
"""

recency_result = spark.sql(recency_query)
recency_result.show(5)

+----------+-------+
|CustomerID|Recency|
+----------+-------+
|   15039.0|   4370|
|   16553.0|   4524|
|   13178.0|   4387|
|   17786.0|   4446|
|   12891.0|   4546|
+----------+-------+
only showing top 5 rows



#### b. Frequency- Частота: Как часто они что-то покупали

In [56]:
# Расчет Frequency
frequency_query = """
    SELECT
        CustomerID,
        COUNT(InvoiceNo) AS Frequency
    FROM
        online_retail_view
    GROUP BY
        CustomerID
"""

frequency_result = spark.sql(frequency_query)
frequency_result.show(5)

+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|   15039.0|     1508|
|   16553.0|       86|
|   13178.0|      265|
|   17786.0|       72|
|   12891.0|        3|
+----------+---------+
only showing top 5 rows



#### c. Monetary - Денежная ценность: сколько денег они в среднем тратят при совершении покупок

In [57]:
# Расчет Monetary
monetary_query = """
    SELECT
        CustomerID,
        AVG(Quantity * UnitPrice) AS Monetary
    FROM
        online_retail_view
    GROUP BY
        CustomerID
"""

monetary_result = spark.sql(monetary_query)
monetary_result.show(5)

+----------+------------------+
|CustomerID|          Monetary|
+----------+------------------+
|   15039.0|13.120981432360722|
|   16553.0|  65.8670930232558|
|   13178.0| 21.60554716981133|
|   17786.0| 3.871388888888889|
|   12891.0|110.33333333333333|
+----------+------------------+
only showing top 5 rows



### Добавьте в dataframe для каждого клиента 3 новых поля

In [62]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Добавление новых полей
df_with_rfm = dfs.groupBy("CustomerID") \
    .agg(
        F.max("InvoiceDate").alias("LastPurchaseDate"),
        F.countDistinct("InvoiceNo").alias("Frequency"),
        F.avg(F.expr("Quantity * UnitPrice")).alias("Monetary")
    ) \
    .withColumn("Recency", F.datediff(F.current_date(), F.col("LastPurchaseDate")).cast(IntegerType()))


In [64]:
# 2. Разбиение клиентов на группы для каждого показателя
# Recency
window_spec_recency = Window.orderBy("Recency")
df_with_rfm = df_with_rfm.withColumn("RecencyGroup", F.ntile(3).over(window_spec_recency))

# Frequency
window_spec_frequency = Window.orderBy(F.col("Frequency").desc())
df_with_rfm = df_with_rfm.withColumn("FrequencyGroup", F.ntile(3).over(window_spec_frequency))

# Monetary
window_spec_monetary = Window.orderBy(F.col("Monetary").desc())
df_with_rfm = df_with_rfm.withColumn("MonetaryGroup", F.ntile(3).over(window_spec_monetary))

In [65]:
# 3. Добавление итогового столбца с "суммой" значений групп
df_with_rfm = df_with_rfm.withColumn("TotalGroup",
    F.concat(F.col("RecencyGroup").cast("string"),
             F.col("FrequencyGroup").cast("string"),
             F.col("MonetaryGroup").cast("string")))

In [66]:
# 4. Сохранение только клиентов с группой "AAA" в отдельный CSV-файл
df_filtered = df_with_rfm.filter("TotalGroup = '111'")
df_filtered.write.csv('/content/filtered_customers.csv', header=True, mode='overwrite')