In [1]:
from pyspark.sql.functions import col 
metadata = spark.read.json("/amazon-meta/parsed_metadata")

#meta_rdd_dict = metadata.select('product_id', 'title').rdd.collectAsMap()

In [2]:
from itertools import combinations
def parse_line(line):
    infos = line.split('\t')
    # date, products
    return (infos[0] + infos[1], infos[2])

def distinct_products(products):
    products_list = set(products)
    distinct = []
    for p in products:
        distinct.append((p, 1))
    return distinct

def combine_products(products): # [products]
    products_list = list(set(products))
    products_list.sort()
    return combinations(products_list, 2)

In [3]:
grouped_data = sc.textFile('/amazon-reviews/parsed_data/') \
    .map(lambda line: parse_line(line)).groupByKey()

two_products_occ = grouped_data\
    .flatMap(lambda row: combine_products(row[1]))\
    .map(lambda comb: (comb, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda reduced: (reduced[0][0], reduced[0][1], reduced[1]))\
    .filter(lambda row: row[2] > 10) \


In [4]:
products_count = grouped_data\
    .flatMap(lambda row: distinct_products(row[1]))\
    .reduceByKey(lambda a, b: a + b)\


In [5]:
products_count = sc.broadcast(products_count.collectAsMap())

In [6]:
def cal_relationship(row):
    p1 = row[0]
    p2 = row[1]
    occurrence = row[2]
    products_count_dict = products_count.value
    p12_sum = products_count_dict[p1] + products_count_dict[p2]
    relevance = (occurrence / (p12_sum - occurrence)) * 100
    return (p1, p2, relevance)

products_relevance = two_products_occ \
    .map(lambda row: cal_relationship(row)) \
    .sortBy(lambda row: -row[2]) \

products_relevance.take(10)

    #.map(lambda row: get_product_info(row[0], metadata) + '\t' + get_product_info(row[1], metadata)  + '\t' + "{:.2f}".format(row[2]))

[('B000JXN21W', 'B000JXOSYC', 100.0),
 ('B000006A93', 'B0026P9Q0W', 100.0),
 ('B000065UKU', 'B001NS5IIW', 100.0),
 ('0692365877', 'B00RYPV9PE', 100.0),
 ('B00GYHBZP2', 'B00GYHC04W', 100.0),
 ('0752894226', 'B003TO5AJK', 100.0),
 ('B000002564', 'B000008KG0', 100.0),
 ('0940232766', 'B0006QZITQ', 100.0),
 ('B009F3PUP8', 'B014RL77M6', 100.0),
 ('1512310565', 'B00YDWQFYQ', 100.0)]

In [7]:
products_relevance.take(20)

[('B000JXN21W', 'B000JXOSYC', 100.0),
 ('B000006A93', 'B0026P9Q0W', 100.0),
 ('B000065UKU', 'B001NS5IIW', 100.0),
 ('0692365877', 'B00RYPV9PE', 100.0),
 ('B00GYHBZP2', 'B00GYHC04W', 100.0),
 ('0752894226', 'B003TO5AJK', 100.0),
 ('B000002564', 'B000008KG0', 100.0),
 ('0940232766', 'B0006QZITQ', 100.0),
 ('B009F3PUP8', 'B014RL77M6', 100.0),
 ('1512310565', 'B00YDWQFYQ', 100.0),
 ('B001C04WA6', 'B001C06RPO', 100.0),
 ('B001E5CJIG', 'B001E762R8', 100.0),
 ('B001BLTHBA', 'B001BLUZ4S', 100.0),
 ('0062079670', 'B00IRC8CD0', 100.0),
 ('1502971429', 'B00OW9RSPK', 100.0),
 ('B000002GB6', 'B000002GB7', 100.0),
 ('B001F7D0OA', 'B001F7FAUC', 100.0),
 ('B0091JK4KC', 'B00DVQ7DRS', 100.0),
 ('6303391915', 'B00009NH9Z', 100.0),
 ('0595240577', 'B004I6EKJ4', 100.0)]

In [None]:
products_relevance.saveAsTextFile('/amazon-reviews-analysis/')

In [None]:
products_relevance.take(10)

In [None]:

def get_product_info(product_id):
    row = meta.select('title').where(col('product_id') == product_id).rdd.collect()
    if len(row) == 0:
        return ''
    return row[0]['title']
