In [1]:
dataset_file = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, DateType
from pyspark.sql import functions as F


def run_stage1(datafile):
    spark = SparkSession.builder.appName("COMP5349 Assignment 2").getOrCreate()
    
    spark.conf.set("spark.sql.shuffle.partitions", 20)
    
    reviews = spark.read.csv(datafile,header=True,sep='\t').cache()
    
    review_count = str(reviews.count())

    print("There are " + review_count + " reviews in total")

    distinct_user = str(reviews.select("customer_id").distinct().count())

    print("There are " + distinct_user + " unique users in total")

    distinct_product = str(reviews.select("product_id").distinct().count())

    print("There are " + distinct_product + " unique products in total")

    print("This is the largest number of reviews published by a single user: ")

    reviews.groupBy("customer_id").count().sort("count", ascending=False).show(1)

    print("This is the largest number of reviews published by the top 10 users: ")

    reviews.groupBy("customer_id").count().sort("count", ascending=False).show(10)

    print("This is the median number of reviews publish by a user: ")

    median_rev_pub = reviews.groupBy("customer_id").count().approxQuantile("Count", [0.5], 0)[0]
    print(median_rev_pub)

    print("This is the largest number of reviews written for a single product: ")
    reviews.groupBy("product_id").count().sort("count", ascending=False).show(1)

    print("This is the largest number of reviews received by the top 10 products: ")
    top10_products = reviews.groupBy("product_id").count().sort("count", ascending=False).limit(10)
    top10_products.show()

    print("This is the median number of reviews received by a product: ")
    median_rev_rec = reviews.groupBy("product_id").count().approxQuantile("Count", [0.5], 0)[0]
    print(median_rev_rec)

    return median_rev_pub, median_rev_rec, top10_products.take(10), reviews

In [3]:
median_rev_pub, median_rev_rec, top10_products, reviews = run_stage1(dataset_file)

There are 4751577 reviews in total
There are 1940732 unique users in total
There are 782326 unique products in total
This is the largest number of reviews published by a single user: 
+-----------+-----+
|customer_id|count|
+-----------+-----+
|   50736950| 7168|
+-----------+-----+
only showing top 1 row

This is the largest number of reviews published by the top 10 users: 
+-----------+-----+
|customer_id|count|
+-----------+-----+
|   50736950| 7168|
|   38214553| 5412|
|   51184997| 5369|
|   18116317| 4222|
|   23267387| 4023|
|   50345651| 3793|
|   14539589| 2896|
|   15725862| 2842|
|   19380211| 2592|
|   20018062| 2568|
+-----------+-----+
only showing top 10 rows

This is the median number of reviews publish by a user: 
1.0
This is the largest number of reviews written for a single product: 
+----------+-----+
|product_id|count|
+----------+-----+
|B00008OWZG| 3936|
+----------+-----+
only showing top 1 row

This is the largest number of reviews received by the top 10 produc

In [4]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as f
import statistics
import nltk

def run_stage2(median_rev_pub, median_rev_rec, reviews):

    def nltk_tokenizer(review_body):
        return len(nltk.sent_tokenize(str(review_body)))

    def median(n_list):
        result = statistics.median(n_list)
        return int(result)

    # Define used defined function for sentence tokenizer
    # This function takes a text column from sql and return integer column of number of sentences
    sent_tokenizer = f.udf(nltk_tokenizer, IntegerType())

    # keep only the below columns
    keep_columns = ['customer_id', 'product_id', 'star_rating', 'review_id', 'review_body', 'review_sentences']

    # Do sentence segmentation on review_body
    filtered_reviews = reviews.withColumn('review_sentences', sent_tokenizer(reviews.review_body)).select(keep_columns)

    # Keep only rows with review_body of 2 sentences and more
    filtered_reviews = filtered_reviews.filter("review_sentences>=2")

    # Create a filter for user with more than median number of published reviews
    filter1 = reviews.groupBy('customer_id').count().filter("count>"+str(median_rev_pub)).drop('count')

    # Create a fliter for products with more than median number of reviews recieved
    filter2 = reviews.groupBy('product_id').count().filter('count>'+str(median_rev_rec)).drop('count')

    # filter DataFrame using the above two filters
    filtered_reviews = filtered_reviews.join(filter1, 'customer_id', 'inner').join(filter2, 'product_id', 'inner').cache()

    # Top 10 users and products with highest median reviews
    median_func = f.udf(median, IntegerType())
    print('Top 10 users ranked by median number of sentences in the reviews they have published')
    filtered_reviews.groupBy('customer_id').agg(median_func(f.collect_list(f.col('review_sentences'))).alias('median')).sort('median', ascending=False).show(10)

    print('Top 10 products ranked by median number of sentences in the reviews they have received')
    filtered_reviews.groupBy('product_id').agg(median_func(f.collect_list(f.col('review_sentences'))).alias('median')).sort('median', ascending=False).show(10)
    return filtered_reviews

In [5]:
reduced_reviews = run_stage2(median_rev_pub, median_rev_rec, reviews)

Top 10 users ranked by median number of sentences in the reviews they have published
+-----------+------+
|customer_id|median|
+-----------+------+
|   25628286|   234|
|   37118941|   227|
|   51865782|   226|
|   29580246|   201|
|   50595705|   191|
|   17821650|   183|
|   43879820|   180|
|   15585529|   177|
|   23717536|   157|
|   46097534|   154|
+-----------+------+
only showing top 10 rows

Top 10 products ranked by median number of sentences in the reviews they have received
+----------+------+
|product_id|median|
+----------+------+
|B00LTQ5EVY|   984|
|B009SF2GZU|   321|
|B000003G29|   267|
|B00T7TYTCK|   252|
|B000BCH5PK|   209|
|B0000C0FEW|   200|
|B00AP5M4WM|   160|
|B009SF2IRG|   157|
|B008LA8E9K|   149|
|B005ZHBBU6|   147|
+----------+------+
only showing top 10 rows



In [6]:
from pyspark.sql.types import IntegerType, ArrayType, StringType
from pyspark.sql import functions as f
import nltk
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np

def review_embed(rev_text_partition):
  module_url = "https://tfhub.dev/google/universal-sentence-encoder/2"
  embed = hub.Module(module_url)
  # mapPartition would supply element inside a partition using generator stype
  # this does not fit tensorflow stype
  rev_text_list = [str(text) for text in rev_text_partition]  
  if len(rev_text_list)==0:
      return []
  with tf.Session() as session:
      session.run([tf.global_variables_initializer(), tf.tables_initializer()])
      message_embeddings = session.run(embed(rev_text_list))
  return message_embeddings

def run_stage3(reviews, product_rank, top10_products):
    def nltk_tokenizer(review_body):
        return nltk.sent_tokenize(str(review_body))
  
    def similarity_matrix(vector):
        d = vector @ vector.T
        norm = (vector * vector).sum(1, keepdims=True) ** .5
        return 1 - d / norm / norm.T
    
    # Choose a product
    product_id = top10_products[product_rank][0]

    # Filter chosen product
    reviews = reviews.filter(reviews.product_id==product_id)

    # Sentence tokenizer function
    sent_tokenizer = f.udf(nltk_tokenizer, ArrayType(StringType()))

    reviews = reviews.filter(reviews.product_id==product_id)
    # Extract positive reviews and tokenized
    positive_reviews = reviews.filter('star_rating>3')
    positive_reviews = positive_reviews.withColumn('tokenized_review_body', sent_tokenizer(positive_reviews.review_body))

    # Extract negative reviews and tokenized
    negative_reviews = reviews.filter('star_rating<3')

    negative_reviews = negative_reviews.withColumn('tokenized_review_body', sent_tokenizer(negative_reviews.review_body))

    # Positive Review body rdd
    p_reviews_rdd = positive_reviews.select('review_id','tokenized_review_body').rdd.flatMapValues(lambda x: x).filter(lambda x: len(x[1])>0)
    
    # Negative Review body rdd
    n_reviews_rdd = negative_reviews.select('review_id','tokenized_review_body').rdd.flatMapValues(lambda x: x).filter(lambda x: len(x[1])>0)
    
    positive_review_embedding = p_reviews_rdd.values().mapPartitions(review_embed).filter(lambda x: x != [])

    negative_review_embedding = n_reviews_rdd.values().mapPartitions(review_embed).filter(lambda x: x != [])

    # Intra-Class Similarity

    p_vector = np.asarray(positive_review_embedding.collect())

    npos = p_vector.shape[0]
    
    p_sim_matrix = similarity_matrix(p_vector)
    
    p_result = np.triu(p_sim_matrix).sum() / ((npos**2 - npos) / 2)

    print('Average distance between sentences in the positive review class is '+str(p_result))

    n_vector = np.asarray(negative_review_embedding.collect())

    nneg = n_vector.shape[0]
    
    n_sim_matrix = similarity_matrix(n_vector)

    n_result = np.triu(n_sim_matrix).sum() / ((nneg**2 - nneg) / 2)

    print('Average distance between sentences in the negative review class is '+str(n_result))

    # Class Center Sentences
    p_avg_dist = p_sim_matrix.sum(axis=0) / npos
    p_result = np.argsort(p_avg_dist)
    p_reviews = p_reviews_rdd.collect()
    print()
    print('Positive center review_id and sentence:')
    print(p_reviews[p_result[0]])
    print()
    print('Top 10 positive nearest review_ids and sentences')
    for i in range(min(10, len(p_reviews) - 1)):
        print(p_reviews[p_result[i + 1]])

    n_avg_dist = n_sim_matrix.sum(axis=0) / npos
    n_result = np.argsort(n_avg_dist)
    n_reviews = n_reviews_rdd.collect()
    print()
    print('Negative center review_id and sentence:')
    print(n_reviews[0])
    print()
    print('Top 10 positive nearest review_ids and sentences')
    for i in range(min(10, len(n_reviews) - 1)):
        print(n_reviews[n_result[i + 1]])

    return p_reviews_rdd, n_reviews_rdd

  from ._conv import register_converters as _register_converters
W0522 08:58:57.214910 140312044205888 __init__.py:56] Some hub symbols are not available because TensorFlow version is less than 1.14


In [7]:
p_reviews_rdd, n_reviews_rdd = run_stage3(reduced_reviews, 1, top10_products)

Average distance between sentences in the positive review class is 0.6927572444366153
Average distance between sentences in the negative review class is 0.7296461383763737

Positive center review_id and sentence:
('RWDRCB1IYL9QF', 'Every song is spectacular.')

Top 10 positive nearest review_ids and sentences
('R3M0H21UQDESQG', 'He definitely makes each song as a masterpiece.')
('R2QFBOHT45UJ61', 'EVERY SONG IS AMAZING!')
('R1XGIGOYF9XJ4B', 'THIS song along with most of the others is wonderful.')
('R1C3Z0KW5J23KK', 'The entire selection of songs is phenomenal!')
('R20RQCH1HEPG6T', "This CD is a fantastic mix of ballads that show off Clay Aiken's powerful voice and some fun, up-tempo pop songs.")
('R1WDBTFIBQ603C', 'Every song is awesome!!')
('RAPCBKXCGY1EE', "The songs on this CD are all beautiful and all showcase Clay's magnificent voice.")
('R2W807APUY0UND', 'Every song is great!')
('R2ZWUWVHROP1XL', 'The Way is another one of my favorite songs...')
('R3O8ATGEQC1MN9', 'This Album is 

In [8]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, ArrayType, StringType
from pyspark.sql import functions as f
from pyspark.ml.feature import Word2Vec
import numpy as np

def run_stage4(positive_reviews, negative_reviews):      
    def similarity_matrix(vector):
        d = vector @ vector.T
        norm = (vector * vector).sum(1, keepdims=True) ** .5
        return 1 - d / norm / norm.T

    row = Row('review_sentences')

    positive_review_embedding = positive_reviews.values().map(lambda x: x.replace('.','').split(" ")).map(row).toDF()

    negative_review_embedding = negative_reviews.values().map(lambda x: x.replace('.','').split(" ")).map(row).toDF()

    word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="review_sentences", outputCol="result")

    # Intra-Class Similarity
    model = word2Vec.fit(positive_review_embedding)

    result = model.transform(positive_review_embedding).collect()
    p_vector = []
    for row in result:
        words, vector = row
        p_vector.append(vector)
    p_vector = np.asarray(p_vector)
    npos = p_vector.shape[0]

    p_sim_matrix = similarity_matrix(p_vector)
        
    p_result = np.triu(p_sim_matrix).sum() / ((npos**2 - npos) / 2)

    print('Average distance between sentences in the positive review class is '+str(p_result))

    model = word2Vec.fit(negative_review_embedding)

    result = model.transform(negative_review_embedding).collect()

    n_vector = []
    for row in result:
        words, vector = row
        n_vector.append(vector)
    n_vector = np.asarray(n_vector)

    nneg = n_vector.shape[0]

    n_sim_matrix = similarity_matrix(n_vector)
        
    n_result = np.triu(n_sim_matrix).sum() / ((nneg**2 - nneg) / 2)

    print('Average distance between sentences in the negative review class is '+str(n_result))

    # Class Center Sentences
    p_avg_dist = p_sim_matrix.sum(axis=0) / npos
    p_result = np.argsort(p_avg_dist)
    p_reviews = p_reviews_rdd.collect()
    print()
    print('Positive center review_id and sentence:')
    print(p_reviews[p_result[0]])
    print()
    print('Top 10 positive nearest review_ids and sentences')
    for i in range(min(10, len(p_reviews) - 1)):
        print(p_reviews[p_result[i + 1]])
      
    n_avg_dist = n_sim_matrix.sum(axis=0) / npos
    n_result = np.argsort(n_avg_dist)
    n_reviews = n_reviews_rdd.collect()  
    print()
    print('Negative center review_id and sentence:')
    print(n_reviews[0])
    print()
    print('Top 10 positive nearest review_ids and sentences')
    for i in range(min(10, len(n_reviews) - 1)):
        print(n_reviews[n_result[i + 1]])

In [9]:
run_stage4(p_reviews_rdd, n_reviews_rdd)

Average distance between sentences in the positive review class is 0.6769896046767968
Average distance between sentences in the negative review class is 0.23678240710413384

Positive center review_id and sentence:
('R25G8UWZF7PHBF', '<br /> <br />HOWEVER....this cd is mostly boring and the songs aren\'t too special....as other reviewers have said, the problem with this cd is that all the songs sound pretty much the same....and IT DOES NOT show off Clay\'s voice at all....the songs are poorly written and boring....I am really disappointed by Clay\'s vocal performance as well because it\'s just not good....not enough power and the quality of his voice is quite...rough....nothing like the beautiful voice that you hear in \\\\"Solitaire\\\\" and his Christmas Album.... <br /> <br />One thing though, is that this cd really grows on you...after listening to this cd a few times, I started to like it more cuz the song seems to be a bit addictive, ahaha....overall~not bad, pretty good....but it