In [1]:
#################################
# Task 1
#################################

from pyspark import SparkContext, SparkConf
from datetime import datetime
conf = SparkConf().setAppName("Lab7")
sc = SparkContext(conf=conf)

In [2]:
# Set input and output folders
inputPath  = "../ReviewsSample.csv" # "ReviewsSample.csv"
outputPath = "res_out_Lab6/" 

In [3]:
# Read the content of the input file
reviewsRDD = sc.textFile(inputPath)

In [4]:
# Discard the header
reviewsRDDnoHeader = reviewsRDD.filter(lambda line: line.startswith("Id,")==False)

In [15]:
# This Python function splits the input line and returns a tuple (userId, productId)
def extractUserIdProductID(line):
    columns = line.split(",")
    userId= columns[2]
    productId= columns[1]
    
    return (userId,productId)


# Generate one pair (UserId, ProductId) from each input line
pairUserProductRDD = reviewsRDDnoHeader.map(extractUserIdProductID)
pairUserProductRDD.collect()

[('A2', 'B1'),
 ('A4', 'B1'),
 ('A5', 'B1'),
 ('A1', 'B2'),
 ('A2', 'B3'),
 ('A3', 'B3'),
 ('A4', 'B3'),
 ('A5', 'B3'),
 ('A4', 'B4'),
 ('A2', 'B5'),
 ('A4', 'B5'),
 ('A2', 'B1'),
 ('A4', 'B5'),
 ('A5', 'B5')]

In [14]:
# Remove duplicate pairs, if any
pairUserProductDistinctRDD = pairUserProductRDD.distinct()
pairUserProductDistinctRDD.collect()

[('A2', 'B1'),
 ('A4', 'B1'),
 ('A2', 'B3'),
 ('A3', 'B3'),
 ('A4', 'B3'),
 ('A4', 'B4'),
 ('A5', 'B5'),
 ('A5', 'B1'),
 ('A1', 'B2'),
 ('A5', 'B3'),
 ('A2', 'B5'),
 ('A4', 'B5')]

In [9]:
# Generate one "transaction" for each user
# (user_id, list of the product_ids reviewed by user_id)
UserIDListOfReviewedProductsRDD = pairUserProductDistinctRDD.groupByKey()
UserIDListOfReviewedProductsRDD = UserIDListOfReviewedProductsRDD.mapValues(list)
UserIDListOfReviewedProductsRDD.take(5)

[('A2', ['B1', 'B3', 'B5']),
 ('A4', ['B1', 'B3', 'B4', 'B5']),
 ('A3', ['B3']),
 ('A1', ['B2']),
 ('A5', ['B5', 'B1', 'B3'])]

In [10]:
# We are interested only in the value part (the lists of products that have been reviewed together)
transactionsRDD = UserIDListOfReviewedProductsRDD.values()
transactionsRDD.take(5)

[['B1', 'B3', 'B5'],
 ['B1', 'B3', 'B4', 'B5'],
 ['B3'],
 ['B2'],
 ['B5', 'B1', 'B3']]

In [11]:
# Given an input transaction (i.e., a list of products reviewed by the same user), 
# this Python function returns all the possible pair of products. Each pair of product is associated with
# a frequency equal to 1. Hence, this method returns a set of (key, value) pairs, where
# - key = pairs of products
# - value = 1
def extractPairsOfProducts(transaction):

    products = list(transaction)

    returnedPairs = []
    
    for product1 in products:
        for product2 in products:
            if product1<product2:
                returnedPairs.append( ((product1, product2), 1) )
                
    return returnedPairs


# Generate an RDD of (key,value) pairs, where
# - key = pairs of products
# - value = 1

# One pair is returned for each combination of products appearing in the same transaction  
pairsOfProductsOneRDD = transactionsRDD.flatMap(extractPairsOfProducts)
pairsOfProductsOneRDD.take(5)

[(('B1', 'B3'), 1),
 (('B1', 'B5'), 1),
 (('B3', 'B5'), 1),
 (('B1', 'B3'), 1),
 (('B1', 'B4'), 1)]

In [12]:
# Count the frequency (i.e., number of occurrences) of each key (= pair of products)
pairsFrequenciesRDD = pairsOfProductsOneRDD.reduceByKey(lambda count1, count2: count1 + count2)
pairsFrequenciesRDD.take(5)

[(('B1', 'B3'), 3),
 (('B1', 'B4'), 1),
 (('B3', 'B4'), 1),
 (('B1', 'B5'), 3),
 (('B3', 'B5'), 3)]

In [13]:
# Select only the pairs that appear more than once and their frequencies.
atLeast2PairsFrequenciesRDD = pairsFrequenciesRDD.filter(lambda inputTuple: inputTuple[1]> 1)
atLeast2PairsFrequenciesRDD.take(5)

[(('B1', 'B3'), 3), (('B1', 'B5'), 3), (('B3', 'B5'), 3)]

In [None]:
# Sort pairs of products by decreasing frequency
atLeast2PairsFrequenciesSortedRDD = atLeast2PairsFrequenciesRDD.sortBy(lambda inputTuple: inputTuple[1], False).cache()

In [None]:
# Store the result in the output folder
atLeast2PairsFrequenciesSortedRDD.saveAsTextFile(outputPath)

In [None]:
#################################
# Task 2 - Bonus track
#################################

In [None]:
# The pairs of products in atLeast2PairsFrequenciesSortedRDD are already sorted by frequency.
# The first 10 pairs are already the top 10 ones
topPairsOfProducts = atLeast2PairsFrequenciesSortedRDD.take(10)

In [None]:
# Print the selected pairs of products on the standard ouput of the driver
for pairOfProducts in topPairsOfProducts:
    print(pairOfProducts)