# Робота з Spark RDD

## Найпопулярніші пари продуктів

В якості датасету для завдання необхідно використати [Amazon Reviews](https://www.kaggle.com/snap/amazon-fine-food-reviews).

> Для більш зручної розробки логіки в додатково в Класрумі є скорочений файл схожої структури `sample.csv`.

Датасет має наступну структуру. Друга колонка - ідентифікатор продукту, третя - ідентифікатор юзера:

```
Id,ProductId,UserId,ProfileName,HelpfulnessNumerator,HelpfulnessDenominator,Score,Time,Summary,Text
```

Наприклад:

| Id | ProductId | UserId | ProfileName | ...інші колонки |
|----|-----------|--------|-------------|-----------------|
| 1  | B1        | A2     | Patron      | ...             |

### Опис завдання

1. Зчитати скорочений або повний датасет як RDD. Після читання вирізати лише потрібні колонки: `UserId` та `ProductId`.
2. Створити `RDD`, що містить пари (`tuple`) `UserId` та список всіх `ProductId` для всіх продуктів, які придбав цей юзер. В списку повинні бути лише **унікальні** продукти (`ProductId` для одного юзера не повинні повторюватись). Наприклад:

```python
("A1", ["B1", "B2", "B5"])
("A2", ["B1", "B3", "B5"])
...
```

3. Взявши списки продуктів для кожного юзера, отримати всі пари продуктів які він міг купувати разом. Для кожної такої пари створити `tuple` де першим елементом є пара, другим число `1`. Наприклад для попереднього списку:
```python
("B1,B2", 1)
("B1,B5", 1)
("B2,B5", 1)
("B1,B3", 1)
("B1,B5", 1)
("B3,B5", 1)
...
```

> Два продукти вважаються придбаними разом, якщо вони обидва з’являються у списку, який ви отримали на попередньому кроці.

4. Підрахувати кількість всіх пар продуктів, відсортувати їх за кількістю від найбільшої до найменшої.
5. Взяти лише перші `10` пар продуктів та їх кількість. Наприклад:
```python
("B1,B5", 23495)
("B2,B5", 3340)
("B3,B5", 217)
...
```
6. Зберегти результат в текстовий файл.

## Конфігурація

- `number_cores`: Кількість ядер, виділених під Spark
- `memory_gb`: Обʼєм оперативної памʼяті, виділеної під Spark (в Гб)
- `is_full_dataset`: Читати повний чи скорочений датасет.

In [None]:
number_cores = 2
memory_gb = 4
is_full_dataset = True

In [None]:
from pyspark import SparkConf, SparkContext
import os

number_cores = 2
memory_gb = 4
conf = (
    SparkConf()
        .setAppName("Spark Rdd Task")
        .setMaster(f'local[{number_cores}]')
        .set('spark.driver.memory', f'{memory_gb}g')
)

sc = SparkContext(conf=conf)

## Рішення

In [None]:
if is_full_dataset:
    if not os.path.exists('Reviews.csv'):
        sc.stop()
        raise Exception("""
            Download the 'Reviews.csv' file from https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews
            and put it in 'input' folder
        """)
    else:
        inputRdd = sc.textFile("Reviews.csv")
else:
    inputRdd = sc.textFile("sample.csv")

In [None]:
# Видалення рядку заголовку
filteredInput = inputRdd.filter(lambda line: line.startswith("Id,") == False)

In [None]:
from itertools import combinations

user_product_pairs = filteredInput.map(lambda line: line.split(",")).map(lambda x: (x[2], x[1]))
user_products = user_product_pairs.groupByKey().mapValues(set)
user_product_pairs_combinations = user_products.flatMap(lambda x: [(pair, 1) for pair in combinations(sorted(x[1]), 2)])
product_pair_counts = user_product_pairs_combinations.reduceByKey(lambda x, y: x + y)
sorted_product_pair_counts = product_pair_counts.sortBy(lambda x: x[1], ascending=False)
top_10_product_pairs = sorted_product_pair_counts.take(10)
sc.parallelize(top_10_product_pairs).coalesce(1).saveAsTextFile("output")

## Зупинка Spark

In [None]:
sc.stop()