In [1]:
from __future__ import print_function

import sys

from pyspark.sql import SparkSession

from math import sqrt

from heapq import nlargest

In [2]:
#   DEFINE your input path
#input_path = sys.argv[1]
input_path = "D:/Academic/SCU_Q3/BigData/Assignments/HW3/customers_books.txt"
print("input_path: ", input_path)


input_path:  D:/Academic/SCU_Q3/BigData/Assignments/HW3/customers_books.txt


In [3]:
#   CREATE an instance of a SparkSession object
spark = SparkSession\
    .builder\
    .appName("Book_Recommendation_System")\
    .getOrCreate()
    
#   To MINIMIZE the Verbosity of Spark we set log level to Warn
sc = spark.sparkContext
sc.setLogLevel("WARN")

In [4]:
#   CREATE a new RDD[String]
rdd = spark.sparkContext.textFile(input_path)
print("\n First five Output from rdd: \n" , rdd.take(5), "\n")

#   Remove duplicate values from the input rdd .
rdd_unique = rdd.distinct()


 First five Output from rdd: 
 ['u1:book1', 'u1:book2', 'u1:book2', 'u1:book3', 'u1:book3'] 



## Apply a set of Transformations

In [5]:
#   Mapping key value pair for input data : key - Book , value - user id
rdd1 = rdd_unique.map(lambda x:x.split(':')).map(lambda x: (x[1], x[0]))
print("Mapper Output key-value : \n" , rdd1.take(5), "\n")

#   Count the total number of unique users
count_users=rdd1.map(lambda x: (x[1])).distinct().collect()
unique_users = len(count_users)
print("Number of Unique Users: \n" , unique_users , "\n")

Mapper Output key-value : 
 [('book2', 'u1'), ('book3', 'u1'), ('book4', 'u1'), ('book5', 'u1'), ('book0', 'u2')] 

Number of Unique Users: 
 14 



In [6]:
#   Group userIds with each bookNumber
#This will have all user id in a list for each book purchased.
#Book will be the key and values will be list of userIds
#output will be in the form: (book#,[user1,user2])

rdd_grouped = rdd1.groupByKey().map(lambda k: (k[0], list(k[1])))
print("First five output of (books, userid of users purchased this book): \n",\
 rdd_grouped.takeOrdered(5) , "\n")


First five output of (books, userid of users purchased this book): 
 [('book0', ['u2', 'u6', 'u3']), ('book1', ['u1', 'u6', 'u7', 'u8', 'u10', 'u12']), ('book10', ['u8']), ('book11', ['u8', 'u12', 'u9', 'u10', 'u11', 'u1']), ('book12', ['u9', 'u11', 'u3', 'u19', 'u8', 'u12', 'u13'])] 



In [7]:
#   Cartesian product of all combination of bookNumber and userid's of users purchasing each book.
cartesian_product_rdd = rdd_grouped.cartesian(rdd_grouped)
print("First five output of Cartesian Product : \n" , cartesian_product_rdd.take(5), "\n")

#   Filter out duplicate row representing the same combination in the cartesian matrix.
filtered_cartesian_rdd = cartesian_product_rdd.filter(lambda x: x[0] < x[1])


First five output of Cartesian Product : 
 [(('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7']), ('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7'])), (('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7']), ('book4', ['u1', 'u4'])), (('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7']), ('book7', ['u2', 'u1'])), (('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7']), ('book9', ['u3', 'u5', 'u6', 'u1', 'u2', 'u4'])), (('book3', ['u1', 'u5', 'u6', 'u12', 'u13', 'u7']), ('book12', ['u9', 'u11', 'u3', 'u19', 'u8', 'u12', 'u13']))] 



##   Computing the Correlation for each pair of books 
#### Creare 4 functions to return values for A,B,C,D that helps arrange the data into a contingency table fro easier correlation calculation.


In [8]:
#   Function to calculate how many customers purchased same combination of books
#Here we want to see how many users have purchased the same book
# y-yes if a user purchased both books and otherwise n-no 
#ex: BookA-yes BookB-Yes
#The function returns:Count of users who purchased both book A and bookB

def a_same_purchase(y,n):
    same_purchase=[]
    for uid_y in y[1]:
        for uid_n in n[1]:
            if uid_y == uid_n:
                same_purchase.append(uid_y)
    return len(same_purchase)

#   Function to calculate how many customers purchased book in
#first column of cartesian product but not in second column of cartesian product.
#ex: BookA:Yes BookB:No
#The function returns:Count of users who purchased book A but didnot purchase bookB

def b_purchase(y,n):
    users_in_b=[]
    for uid_y in y[1]:
        if uid_y not in n[1]:
            users_in_b.append(uid_y)
    return len(users_in_b)

#   Function to calculate how many customers purchased book in 
#second column of cartesian product but not in first column of cartesian product.
#ex: BookA:No BookB:Yes
#The function returns:Count of users who didnot purchase book A But purchased bookB
def c_purchase(y,n):
    users_in_c=[]
    for uid_n in n[1]:
        if uid_n not in y[1]:
            users_in_c.append(uid_n)
    return len(users_in_c)

#  Function to calculate how many users have not purchased either of two books at all
#Here we take the total number of unique users and
#subtract them from users who made same purchase or either one of the purchase of that book.
#ex: BookA:No BookB:No
#The function returns:Count of users who didnot purchase book A and bookB
    
def d_no_purchase(y,n):
    any_purchase = a_same_purchase(y,n)+ b_purchase(y,n) + c_purchase(y,n)
    no_purchase = unique_users - any_purchase
    return no_purchase


## Calculate phi correlation 
- By using values returend by defined functions to apply in pearson's formula

In [9]:
#Create a function and calculate phi values for each combination of books purchased by users in cartesian product.
#The phi-correlation value is two way,that is corr(bookA,bookB) = corr(bookB,bookA)
#The phi_corr function defines the sparse similarity matrix 
#to genereate the phi correlation value for each book pair.

def phi_corr(val):
    a = a_same_purchase(val[0],val[1])
    b = b_purchase(val[0],val[1])
    c = c_purchase(val[0],val[1])
    d = d_no_purchase(val[0],val[1])
    phi_correlation = ((a*d)-(b*c))/(sqrt((a+b)*(c+d)*(a+c)*(b+d)))
    return (val[0][0],val[1][0],phi_correlation),(val[1][0],val[0][0],phi_correlation)

In [10]:
#   The filtered caterisian product of users and book is passed to the flatMap
# to get the correlation between two books
book_corr_rdd = filtered_cartesian_rdd.flatMap(phi_corr)
#   Printing phi correlations of the pairs of books purchased
print("First 5 output of all pairs of book purchased with their phi correlation: \n",\
      book_corr_rdd.takeOrdered(5), "\n")


First 5 output of all pairs of book purchased with their phi correlation: 
 [('book0', 'book1', -0.10050378152592121), ('book0', 'book10', -0.14484136487558028), ('book0', 'book11', -0.45226701686664544), ('book0', 'book12', -0.17407765595569785), ('book0', 'book13', 0.5310850045437943)] 



In [11]:
#   Top 2 book combinations with highest phi correlation 
#For each book combination, we select book with the top 2 largest values of phi.
#If two book pair have same correlation value then any of the combination is picked by spark.
top_two_phi = book_corr_rdd.groupBy(lambda x: x[0]).\
flatMap(lambda g: nlargest(2, g[1], key=lambda x: x[2]))
#   Printing Top two pairs of book purchased with their phi correlation
print("First 5 output of Top two pairs of book purchased with highest correlation value : \n",\
      top_two_phi.takeOrdered(5), "\n")


First 5 output of Top two pairs of book purchased with highest correlation value : 
 [('book0', 'book13', 0.5310850045437943), ('book0', 'book9', 0.6030226891555273), ('book1', 'book11', 0.4166666666666667), ('book1', 'book3', 0.4166666666666667), ('book10', 'book11', 0.32025630761017426)] 



### Similarities have been calculated for every pair of books and the top 2 books are the recommendation based on user purchase,
- if a user buys a particular book ,based on the phi correlation value,the algorithm recommends two other books that user have bought which is highly correlated with the purchase.


In [12]:

book_rdd = top_two_phi.map(lambda x: (x[0],x[1])).\
        groupByKey().map(lambda k: (k[0], list(k[1]))).\
        sortBy(lambda a: a[0])
        
#   Collect values of each book recommending other two books as per user purchase.
book_recommended = book_rdd.collect()

In [13]:
#   Print the book purchased and the books recommended
print("\n Books Recommendation as per customer purchase : \n")
for book in book_recommended:
    print("Customers Who Bought ", book[0] ,"Also Bought: ",' , '.join(book[1]), "\n")


 Books Recommendation as per customer purchase : 

Customers Who Bought  book0 Also Bought:  book9 , book13 

Customers Who Bought  book1 Also Bought:  book3 , book11 

Customers Who Bought  book10 Also Bought:  book6 , book11 

Customers Who Bought  book11 Also Bought:  book19 , book22 

Customers Who Bought  book12 Also Bought:  book16 , book15 

Customers Who Bought  book13 Also Bought:  book0 , book16 

Customers Who Bought  book15 Also Bought:  book16 , book12 

Customers Who Bought  book16 Also Bought:  book15 , book13 

Customers Who Bought  book17 Also Bought:  book7 , book0 

Customers Who Bought  book18 Also Bought:  book19 , book11 

Customers Who Bought  book19 Also Bought:  book18 , book11 

Customers Who Bought  book2 Also Bought:  book5 , book9 

Customers Who Bought  book21 Also Bought:  book22 , book23 

Customers Who Bought  book22 Also Bought:  book21 , book23 

Customers Who Bought  book23 Also Bought:  book22 , book21 

Customers Who Bought  book3 Also Bought:  bo

In [14]:
spark.stop()