In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 66 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=2a1dfb56be44176604c1b7bc808de7c89f609f6ec520a400969f94d475c281d9
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder\
        .master('local[4]')\
        .appName('beeline_test')\
        .config('spark.ui.port', '4050')\
        .config('spark.executor.instances', 2)\
        .config('spark.executor.memory', '5g')\
        .config('spark.executor.cores', 2)\
        .getOrCreate()

sc = spark.sparkContext

In [553]:
# Считываем данные, прописываем хэдеры.
df_customer = spark.read.csv('customer.csv', sep='\t').toDF('customer_id', 'name', 'email', 'joinDate', 'status')
df_order = spark.read.csv('order.csv', sep='\t').toDF('customer_id', 'order_id', 'product_id', 'numberOfProduct', 'orderDate', 'Status')
df_product = spark.read.csv('product.csv', sep='\t').toDF('product_id', 'product_name', 'price', 'numberOfProducts')

In [554]:
# Объядиняем таблицы
df_result = df_order.join(df_customer, 'customer_id', how='left').join(df_product, 'product_id', how='inner')

In [555]:
# Группируем по названию продукта и имени пользователя. Как метрику пополярности взял сумму купленных товаров по определенной позиции. Также можно было посчитать и по количеству
df_result = df_result.groupBy('product_name', 'name')\
                     .agg(F.sum('numberOfProduct').alias('total_buys'))

In [556]:
# Добавляем ранг по именам пользователей для последующей выборки самой популярной позиции.
# Использовал row_number() тк при использовании dense_rank() в выборке получалось несколько популярных значений в связи с равным объемом покупок
window_spec = Window\
              .partitionBy('name')\
              .orderBy(result.total_buys.desc())

result = result.withColumn('rank', F.row_number().over(window_spec))
# Фильтруем по рангу 
result_to_save = result.select('product_name', 'name').where(result.rank == 1)

In [559]:
result_to_save.show()

+-----------------+---------+
|     product_name|     name|
+-----------------+---------+
|   Apple iPhone 8|Anastasia|
|   Apple iPhone 7|     John|
|   Apple iPhone 7|   Vasili|
|   Apple iPhone 8|   Philip|
|Apple iPad mini 4|   Robert|
|    Apple AirPods|     Sara|
+-----------------+---------+



In [558]:
# Сохраняем в csv
result_to_save.write.csv('bestsellers.csv', sep=',', header=True, encoding='cp1251')