# Install and Imports

In [1]:
isNeedInstall = False
if isNeedInstall:
    import subprocess
    import sys

    def install(package):
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

    install("pyspark")
    install("psutil")
    install("nbconvert")
    install("ipykernel")
    install("py4j")

In [2]:
from pyspark import SparkConf, SparkContext
import os
import shutil

In [3]:
# Change to True to run the program on full dataset
isProd = True

In [4]:
number_cores = 2
memory_gb = 4
# Create a configuration object and
# set the name of the application
conf = (
    SparkConf()
        .setAppName("mandarin")
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
# Create a Spark Context object
sc = SparkContext.getOrCreate()#(conf=conf)

# Solution

In [5]:
# Read the input file
if isProd:
    if not os.path.exists('input/Reviews.csv'):
        sc.stop()
        raise Exception("""
            Download the 'Reviews.csv' file from https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews
            and put it in 'input' folder
        """)
    else:
        inputRdd = sc.textFile("input/Reviews.csv")
else:
    inputRdd = sc.textFile("input/Sample.csv")

In [6]:
# Remove the header
filteredInput = inputRdd.filter(lambda line: line.startswith("Id,") == False)

1. З вхідного датасету створити RDD, що містить пару (tuple) UserId та список всіх ProductId для всіх продуктів, які купував/ревьював цей юзер. В списку повинні бути лише унікальні продукти (ProductId  для одного юзера не повинні повторюватись). Наприклад:
("A1", ["B1", "B2", "B5"])
("A2", ["B1", "B3", "B5"])

In [7]:
userProductMap = filteredInput.map(lambda x: x.split(",")[2] + "," + x.split(",")[1]).map(lambda x: x.split(","))
userProductMap.collect()

[['A2', 'B1'],
 ['A4', 'B1'],
 ['A5', 'B1'],
 ['A1', 'B2'],
 ['A2', 'B3'],
 ['A3', 'B3'],
 ['A4', 'B3'],
 ['A5', 'B3'],
 ['A4', 'B4'],
 ['A2', 'B5'],
 ['A4', 'B5'],
 ['A2', 'B1'],
 ['A4', 'B5'],
 ['A5', 'B5']]

In [8]:
zero_value = set()

def seq_op(x,y):
    x.add(y)
    return x

def comb_op(x,y):
    return x.union(y)

userProducts = userProductMap.aggregateByKey(zero_value, seq_op, comb_op).sortByKey()
userProducts.collect()

[('A1', {'B2'}),
 ('A2', {'B1', 'B3', 'B5'}),
 ('A3', {'B3'}),
 ('A4', {'B1', 'B3', 'B4', 'B5'}),
 ('A5', {'B1', 'B3', 'B5'})]

2. Маючи списки продуктів для кожного юзера, отримати всі пари продуктів які він міг купувати разом. Для кожної такої пари створити tuple де першим елементом є пара, другим число 1. Наприклад для попереднього списку:
("B1,B2", 1)
("B1,B5", 1)
("B2,B5", 1)
("B1,B3", 1)
("B1,B5", 1)
("B3,B5", 1)

In [9]:
productPairsMap = list(userProducts.reduceByKey(lambda a,b: b.lookup(a)).map(lambda r: r[1]).filter(lambda x: len(x)>1).collect())
print(productPairsMap)

[{'B5', 'B1', 'B3'}, {'B5', 'B1', 'B3', 'B4'}, {'B5', 'B1', 'B3'}]


In [10]:
i = 0
j = 0 
tupleProduct = []
tempList = []
for x in productPairsMap:
    tempList.append(list(x))
while i < len(tempList):
    while j<len(tempList[i])-1:
        tupleProduct.append(tempList[i][j]+tempList[i][j+1])
        j+=1
    i+=1
    j=0
print(tupleProduct)

['B5B1', 'B1B3', 'B5B1', 'B1B3', 'B3B4', 'B5B1', 'B1B3']


In [11]:
nextStep = sc.parallelize(tupleProduct)
tupleProductWithOne = nextStep.map(lambda x: (x,1))
tupleProductWithOne.collect()

[('B5B1', 1),
 ('B1B3', 1),
 ('B5B1', 1),
 ('B1B3', 1),
 ('B3B4', 1),
 ('B5B1', 1),
 ('B1B3', 1)]

3. Підрахувати кількість всіх пар продуктів, відсортувати їх за кількістю.

In [12]:
allTupleProd = list(tupleProductWithOne.countByKey().items())
tempProd = sc.parallelize(allTupleProd)
productPairsCounts = tempProd.sortBy(lambda x: -x[1])
productPairsCounts.collect()

[('B5B1', 3), ('B1B3', 3), ('B3B4', 1)]

4. Взяти лише перші 10 пар продуктів та їх кількість. Зберегти в файл. Наприклад:
("B1,B5", 23495)
("B2,B5", 3340)
("B3,B5", 217)

In [13]:
result = productPairsCounts.zipWithIndex().filter(lambda vi: vi[1] < 10).keys()
result.collect()

[('B5B1', 3), ('B1B3', 3), ('B3B4', 1)]

In [14]:
outpath = 'output/first_output'
if os.path.exists(outpath) and os.path.isdir(outpath):
    shutil.rmtree(outpath)

result.saveAsTextFile(outpath)

# Stop the Spark Context

In [15]:
sc.stop()