In [2]:
from pathlib import Path

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

# Create spark context and session

In [3]:
conf = (
    SparkConf()
    .setMaster("local[*]")
    .setAppName("Day_5_Quizz")
)

sc = (
    SparkContext(conf=conf)
    .getOrCreate()
)

spark = (
    SparkSession.builder
    .config(conf=sc.getConf())
    .getOrCreate()
)

23/09/28 15:08:11 WARN Utils: Your hostname, vinces-MBP.local resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
23/09/28 15:08:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/28 15:08:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
filepaths = [
    str(p) for p in 
    Path("./day_5_data").rglob("amazon_us_reviews-train-00004-of-00005.parquet") 
]
df = spark.read.parquet(*filepaths)

                                                                                

In [5]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: long (nullable = true)
 |-- verified_purchase: long (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [68]:
df.select(['product_category', 'helpful_votes']).show(2)

+----------------+-------------+
|product_category|helpful_votes|
+----------------+-------------+
|         Apparel|           17|
|         Apparel|            0|
+----------------+-------------+
only showing top 2 rows



In [10]:
rdd = df.select('product_id', 'review_body').rdd

# Top words associated with each product
- The words must not be in the top words across all products (e.g., words like "the")
- Need to find top 1k words from all products first

In [6]:
# Top 1k words from all products
def gen_words(row):
    '''Generate words from a review body str'''
    for word in row['review_body'].split(" "):
        yield (word, 1)

rdd_top1kWords = (
    rdd
    .flatMap(gen_words)
    .reduceByKey(lambda x,y: x+y)
    .sortBy(lambda row: row[1], ascending=False)
    .zipWithIndex()
    .filter(lambda row: row[1] < 1000) # This way we can get top 1k words in RDD form
    .map(lambda row: row[0])
)

                                                                                

In [7]:
rdd_top1kWords.take(2)

[('the', 985246), ('I', 865286)]

In [8]:
# All words from each product ID
def gen_word_product(row):
    '''Generate tuples of word, product ID
    from a row
    '''
    for word in row['review_body'].split(" "):
        yield (word, (row['product_id'], 1))

# Output = [ ('the','PID0',1), ... ]
rdd_word_product = (
    rdd
    .flatMap(gen_word_product)
)

In [31]:
from heapq import nlargest

# Exclude words from the top 1k
# Transform a row to (('PIB0', 'the'), 1)
# Transform a row to ('PIB0', 'the', 1)
rdd_product_word_non1k = (
    rdd_word_product
    .leftOuterJoin(rdd_top1kWords)
    .filter(lambda row: row[1][1] == None)
    .map(lambda row: ((row[1][0][0], row[0]), row[1][0][1]))
    .reduceByKey(lambda x,y: x+y)
    .map(lambda row: (row[0][0], row[0][1], row[1]))
)

topNWords_product = (
    rdd_product_word_non1k
    .groupBy(lambda row: row[0])
    .flatMap(lambda row: nlargest(5, row[1], key=lambda tup: tup[2]))
    .collect()
)

                                                                                

In [41]:
import pandas as pd

topNWords_prod_df = pd.DataFrame(topNWords_product, columns=['PID','word','count'])
topNWords_prod_df.head(2)

In [49]:
(
    topNWords_prod_df
    .groupby('PID', as_index=False)
    .size()
    .sort_values('size', ascending=True)
)

Unnamed: 0,PID,size
18772,B0007CKRTA,1
142326,B001CGGXQ6,1
115863,B0012OAMFG,1
142329,B001CGH1VM,1
5574,B00012UPAW,1
...,...,...
61002,B000JRBBF2,5
61003,B000JRBG6G,5
61004,B000JRBJ8Q,5
60983,B000JR3NY4,5


In [48]:
topNWords_prod_df[topNWords_prod_df['PID'] == 'B000JR3NY4']

Unnamed: 0,PID,word,count
255430,B000JR3NY4,upgrade,1
255431,B000JR3NY4,Dec.,1
255432,B000JR3NY4,reason.,1
255433,B000JR3NY4,shipping.,1
255434,B000JR3NY4,later,1


Finding top words associated with each product seems to produce a very "sparse" result (i.e., no insights)

# Top words associated with each product category
- The words must not be in the top words across all products (e.g., words like "the")
- Need to find top 1k words from all products first

In [62]:
# All words from each product cat
def gen_word_cat(row):
    '''Generate tuples of word, product cat
    from a row
    '''
    for word in row['review_body'].split(" "):
        yield (word, (row['product_category'], 1))

rdd = df.select('product_category', 'review_body').rdd

# Output = [ ('the','cat0',1), ... ]
rdd_word_cat = (
    rdd
    .flatMap(gen_word_cat)
)

In [63]:
from heapq import nlargest

# Exclude words from the top 1k
# Transform a row to (('cat0', 'the'), 1)
# Transform a row to ('cat0', 'the', 1)
rdd_cat_word_non1k = (
    rdd_word_cat
    .leftOuterJoin(rdd_top1kWords)
    .filter(lambda row: row[1][1] == None)
    .map(lambda row: ((row[1][0][0], row[0]), row[1][0][1]))
    .reduceByKey(lambda x,y: x+y)
    .map(lambda row: (row[0][0], row[0][1], row[1]))
)

topNWords_cat = (
    rdd_cat_word_non1k
    .groupBy(lambda row: row[0])
    .flatMap(lambda row: nlargest(5, row[1], key=lambda tup: tup[2]))
    .collect()
)

                                                                                

In [66]:
import pandas as pd

topNWords_cat_df = pd.DataFrame(topNWords_cat, columns=['product_cat','word','count'])
topNWords_cat_df.head(2)

Unnamed: 0,product_cat,word,count
0,Apparel,amazing,2517
1,Apparel,standard,2516


In [67]:
(
    topNWords_cat_df
    .groupby('product_cat', as_index=False)
    .size()
    .sort_values('size', ascending=True)
)

Unnamed: 0,product_cat,size
0,Apparel,5


# Irrelevant reviews
- For each product ID, create a list of random irrelevant reviews
- Irrelevant = not in the list of all reviews of that product

Not sure if we can use both RDD and DF for this

In [26]:
import random

from pyspark.sql.types import *

In [21]:
# Create a list of ALL review IDs for reference
ref_reviews = df.select(['review_id']).collect()
ref_reviews = [ row['review_id'] for row in ref_reviews ]
ref_reviews_brdcst = sc.broadcast(ref_reviews)
ref_reviews_brdcst

                                                                                

In [42]:
# Use the RDD method

def list_combine_rdd(x):
    return [x]

def list_merge_rdd(x,y):
    x.append(y)
    return x

def list_extend_rdd(x,y):
    x.extend(y)
    return x

prod_review_rdd = df.select(['product_id','review_id']).rdd

# Collect all reviews of each product
prod_allReviews_rdd = (
    prod_review_rdd
    .map(lambda row: (row['product_id'], row['review_id']))
    .combineByKey(list_combine_rdd, list_merge_rdd, list_extend_rdd)
)

def sample_irr_rv_rdd(row, seed=1):
    '''Samples irrelevant reviews

    A row must look like this: `('PID0', ['RV0','RV1'])`

    Map this to a RDD
    '''
    sample_space = list(set(ref_reviews_brdcst.value).difference(set(row[1])))
    random.seed(seed)
    return (
        row[0],
        row[1],
        random.sample(sample_space, min(len(sample_space),5))
    )

product_bothReviews_rdd = (
    prod_allReviews_rdd
    .map(sample_irr_rv_rdd)
)

In [43]:
product_bothReviews_rdd.take(1)

                                                                                

[('B001LRM76Q',
  ['RWBRDXHX1B3Y8'],
  ['R2YTD5HINM6J1D',
   'R2XTXZ925N98XY',
   'R3JBN1H7DDWVQ3',
   'R2MFXZERMWGEL8',
   'R38ETWSLKWLISM']),
 ('B001LRM6BW',
  ['R2O3SBNPLMN9MI'],
  ['R2YTD5HINM6J1D',
   'R2XTXZ925N98XY',
   'R3JBN1H7DDWVQ3',
   'R11300M2GHKAR5',
   'R38ETWSLKWLISM'])]

In [55]:
(
    product_bothReviews_rdd
    .toDF(['product_id','review_id_arr','irr_review_id_arr'])
    .printSchema()
)

root
 |-- product_id: string (nullable = true)
 |-- review_id_arr: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- irr_review_id_arr: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [51]:
# Use the DF method

product_allReviews_df = (
    df
    .groupby(['product_id'])
    .agg(F.collect_list('review_id').alias('review_id_arr'))
)

def sample_irr_rv_df(array, seed=1):
    '''Samples irrelevant reviews

    Map this to a DF column
    '''
    sample_space = list(set(ref_reviews_brdcst.value).difference(set(array)))
    random.seed(seed)
    return random.sample(sample_space, min(len(sample_space),5))

# Register UDF
sample_irr_rv_udf = F.udf(sample_irr_rv_df, returnType=ArrayType(StringType()))

product_bothReviews_df = (
    product_allReviews_df
    .withColumn('irr_review_id_arr', sample_irr_rv_udf(F.col('review_id_arr')))
)

In [52]:
product_bothReviews_df.show(2)

[Stage 30:>                                                         (0 + 8) / 8]

+----------+--------------------+--------------------+
|product_id|       review_id_arr|   irr_review_id_arr|
+----------+--------------------+--------------------+
|0000032034|[R2THEEBLJRYPRH, ...|[R13LG32TBZYYBD, ...|
|1465014578|[RIB8YVLWG5B5I, R...|[R12AB1XYK66Z93, ...|
+----------+--------------------+--------------------+
only showing top 2 rows



                                                                                

In [56]:
product_bothReviews_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- review_id_arr: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- irr_review_id_arr: array (nullable = true)
 |    |-- element: string (containsNull = true)

