In [46]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.ml.feature import PCA, StopWordsRemover, Normalizer
from pyspark.ml.linalg import Vectors
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors,VectorUDT
import datetime
start_time=datetime.datetime.now()
spark = SparkSession \
    .builder \
    .appName("Assignment2 - COMP5349-Stage3") \
    .getOrCreate()
awsData = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"

In [47]:
"""Loading data into a dataframe"""
musicData = spark.read.csv(awsData,header=True,sep='\t')
musicData.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)

In [48]:
"""Selecting required columns"""
requiredData = musicData.select('customer_id','product_id','product_title','star_rating','review_id','review_body')
requiredData.count()

4751577

In [49]:
"""Identifying top 10 products based on number of reviews received."""
top_10_product_ids=requiredData.filter(col("review_id").isNotNull()).groupBy("product_id").count().sort(col("count").desc()).limit(10)

In [50]:
"""Picking 10th product from the top 10 products identified"""
picked_product=top_10_product_ids.sort(col("count")).limit(1)

In [51]:
"""Separating positive and negative reviews based on star rating"""
positive_reviews=requiredData.join(picked_product.select("product_id"),"product_id","inner").filter(col("star_rating")>=4)
negative_reviews=requiredData.join(picked_product.select("product_id"),"product_id","inner").filter(col("star_rating")<=2)

In [52]:
positive_reviews.cache()
positive_reviews.count()

1323

In [53]:
negative_reviews.cache()
negative_reviews.count()

415

In [54]:
"""Splitting review body into sentences based on . or ? and saving as list"""
positive_review_sentences=positive_reviews.filter(col("review_body").isNotNull()).select("review_id","review_body")
positive_review_sentences=positive_review_sentences.withColumn("review_body_sentences",split(col("review_body"), r"\.|\?"))
#positive_review_sentences.show(2,truncate=False)
negative_review_sentences=negative_reviews.filter(col("review_body").isNotNull()).select("review_id","review_body")
negative_review_sentences=negative_review_sentences.withColumn("review_body_sentences",split(col("review_body"), r"\.|\?"))
#negative_review_sentences.show(2,truncate=False)

In [55]:
"""Exploding review sentences to have one to many mapping for review_id vs review sentences. Also, cleaning review sentences 
of any special characters/trim additional spaces. Later on, we split sentence into words to remove any stop words in next step"""
positive_review_sentences=positive_review_sentences.select("review_id","review_body_sentences","review_body")
positive_review_sentences=positive_review_sentences.withColumn("review_body_sentences",explode("review_body_sentences"))
positive_review_sentences=positive_review_sentences.withColumn("review_body_sentences_joined",col("review_body_sentences"))
positive_review_sentences=positive_review_sentences.withColumn("review_body_sentences",regexp_replace("review_body_sentences","<br />|\s+|$|,|!|#|@|<|>|/|&|#|;|:|[0-9]|-"," ")) \
.withColumn("review_body_sentences",regexp_replace("review_body_sentences","\s+"," ")) \
.withColumn("review_body_sentences",trim(col("review_body_sentences"))) \
.filter(col("review_body_sentences").isNotNull()) \
.filter(col("review_body_sentences")!='') \
.withColumn("review_body_sentences",lower(col("review_body_sentences"))) \
.withColumn("review_body_sentences",split(trim(col("review_body_sentences")), "\s+"))
#positive_review_sentences.show(2,truncate=False)

negative_review_sentences=negative_review_sentences.select("review_id","review_body_sentences","review_body")
negative_review_sentences=negative_review_sentences.withColumn("review_body_sentences",explode("review_body_sentences"))
negative_review_sentences=negative_review_sentences.withColumn("review_body_sentences_joined",col("review_body_sentences"))
negative_review_sentences=negative_review_sentences.withColumn("review_body_sentences",regexp_replace("review_body_sentences","<br />|\s+|$|,|!|#|@|<|>|/|&|#|;|:|[0-9]|-"," ")) \
.withColumn("review_body_sentences",regexp_replace("review_body_sentences","\s+"," ")) \
.withColumn("review_body_sentences",trim(col("review_body_sentences"))) \
.filter(col("review_body_sentences").isNotNull()) \
.filter(col("review_body_sentences")!='') \
.withColumn("review_body_sentences",lower(col("review_body_sentences"))) \
.withColumn("review_body_sentences",split(trim(col("review_body_sentences")), "\s+"))
#positive_review_sentences.show(2,truncate=False)

In [56]:
"""Stop words remover will remove stop words. Stop words list is retreived from nltk using below:
import nltk
from nltk.corpus import stopwords
 set(stopwords.words('english'))
"""
stopwords_list=['a', 'ah','br',  'about', 'quot', 'above',  'after',  'again',  'against',  'ain',  'all',  'am',  'an',  'and',  'any',  'are',  'aren',  "aren't",  'as',  'at',  'be',  'because',  'been',  'before',  'being',  'below',  'between',  'both',  'but',  'by',  'can',  'couldn',  "couldn't",  'd',  'did',  'didn',  "didn't",  'do',  'does',  'doesn',  "doesn't",  'doing',  'don',  "don't",  'down',  'during',  'each',  'few',  'for',  'from',  'further',  'had',  'hadn',  "hadn't",  'has',  'hasn',  "hasn't",  'have',  'haven',  "haven't",  'having',  'he',  'her',  'here',  'hers',  'herself',  'him',  'himself',  'his',  'how',  'i',  'if',  'in',  'into',  'is',  'isn',  "isn't",  'it',  "it's",  'its',  'itself',  'just',  'll',  'm',  'ma',  'me',  'mightn',  "mightn't",  'more',  'most',  'mustn',  "mustn't",  'my',  'myself',  'needn',  "needn't",  'no',  'nor',  'not',  'now',  'o',  'of',  'off',  'on',  'once',  'only',  'or',  'other',  'our',  'ours',  'ourselves',  'out',  'over',  'own',  're',  's',  'same',  'shan',  "shan't",  'she',  "she's",  'should',  "should've",  'shouldn',  "shouldn't",  'so',  'some',  'such',  't',  'than',  'that',  "that'll",  'the',  'their',  'theirs',  'them',  'themselves',  'then',  'there',  'these',  'thanks',  'they',  'this',  'those',  'through',  'to',  'too',  'under',  'until',  'up',  've',  'very',  'was',  'wasn',  "wasn't",  'we',  'were',  'weren',  "weren't",  'what',  'when',  'where',  'which',  'while',  'who',  'whom',  'why',  'will',  'with',  'won',  "won't",  'wouldn',  "wouldn't",  'y',  'you',  "you'd",  "you'll",  "you're",  "you've",  'your',  'yours',  'yourself',  'yourselves']
removerSW=StopWordsRemover(inputCol="review_body_sentences",outputCol="filtered", stopWords=stopwords_list)
positive_review_sentences_new=removerSW.transform(positive_review_sentences)
negative_review_sentences_new=removerSW.transform(negative_review_sentences)

In [57]:
"""removing any sentences with less than two words in them"""
positive_review_sentences_new=positive_review_sentences_new.filter(size(col("filtered"))>2)
positive_review_sentences_new.cache()
positive_review_sentences_new.count()

8667

In [58]:
negative_review_sentences_new=negative_review_sentences_new.filter(size(col("filtered"))>2)
negative_review_sentences_new.cache()
negative_review_sentences_new.count()

3048

In [59]:
"""Joining back the words splitted into sentences back as tensorflow hub works on sentences as well"""
def join_strings(list_of_strings):
    string=" ".join(list_of_strings)
    return string
    
join_strings_udf=udf(join_strings,StringType())
positive_review_sentences_new=positive_review_sentences_new.withColumn("filtered_joined",join_strings_udf(col("filtered")))
negative_review_sentences_new=negative_review_sentences_new.withColumn("filtered_joined",join_strings_udf(col("filtered")))

In [60]:
"""Adding index column to the dataframe"""
positive_review_sentences_main=positive_review_sentences_new.rdd.zipWithUniqueId().map(lambda x: (x[1],x[0]["review_id"],x[0]["review_body_sentences"],x[0]["review_body_sentences_joined"],x[0]["filtered"],x[0]["filtered_joined"])).toDF()
negative_review_sentences_main=negative_review_sentences_new.rdd.zipWithUniqueId().map(lambda x: (x[1],x[0]["review_id"],x[0]["review_body_sentences"],x[0]["review_body_sentences_joined"],x[0]["filtered"],x[0]["filtered_joined"])).toDF()

In [61]:
"""Forming main dataframe and renaming columns"""
positive_review_sentences_main=positive_review_sentences_main.select(col("_1").alias("index"),col("_2").alias("review_id"),col("_3").alias("review_body_sentences"),col("_4").alias("review_body_sentences_joined"),col("_5").alias("filtered"),col("_6").alias("filtered_joined"))
negative_review_sentences_main=negative_review_sentences_main.select(col("_1").alias("index"),col("_2").alias("review_id"),col("_3").alias("review_body_sentences"),col("_4").alias("review_body_sentences_joined"),col("_5").alias("filtered"),col("_6").alias("filtered_joined"))

In [62]:
"""Defining function to work on partition to convert sentences into vectors."""
def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    rev_text_list = [text for text in rev_text_partition]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings

In [63]:
"""Extracting only review sentences field and converting into string. We already took sentences with more than 2 words, 
so there wont be any nulls. Ensuring we have only 1 partition"""
positive_review_sentences_rdd=positive_review_sentences_main.select("filtered_joined").rdd.map(lambda x: str(x[0])).cache()  #.repartition(1000)
positive_review_sentences_rdd.getNumPartitions()

1

In [64]:
negative_review_sentences_rdd=negative_review_sentences_main.select("filtered_joined").rdd.map(lambda x: str(x[0])).cache()  #.repartition(1000)
negative_review_sentences_rdd.getNumPartitions()

1

In [65]:
"""Applying function on the rdd formed"""
positive_review_embed=positive_review_sentences_rdd.mapPartitions(review_embed).cache()
negative_review_embed=negative_review_sentences_rdd.mapPartitions(review_embed).cache()

In [66]:
positive_review_embed_vectors_df=positive_review_embed.zipWithUniqueId().map(lambda x: (x[1],Vectors.dense(x[0].tolist()))).toDF().select(col("_1").alias("index"),col("_2").alias("features"))
negative_review_embed_vectors_df=negative_review_embed.zipWithUniqueId().map(lambda x: (x[1],Vectors.dense(x[0].tolist()))).toDF().select(col("_1").alias("index"),col("_2").alias("features"))

In [67]:
"""Implementing PCA to convert into 2 dimensional vector"""
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(positive_review_embed_vectors_df)
pca_result_stg = model.transform(positive_review_embed_vectors_df)
model_n = pca.fit(negative_review_embed_vectors_df)
pca_result_stg_n = model_n.transform(negative_review_embed_vectors_df)

In [68]:
"""Setting up normaliser to allow taking dot product for cosine similarity"""
normalizer = Normalizer(inputCol="pca_features", outputCol="features_norm", p=2.0)
pca_result_stg = normalizer.transform(pca_result_stg)
pca_result_stg_n = normalizer.transform(pca_result_stg_n)

In [69]:
"""Collecting all positive normalised vectors into numpy array and reshaping to have each row to contain 
vector from corresponding review sentence """
all_vectors_array=np.array(pca_result_stg.select("features_norm").collect())
all_vectors_array=all_vectors_array.reshape((all_vectors_array.shape[0],2))
all_vectors_array.shape

(8667, 2)

In [70]:
"""Similarly for negative"""
all_vectors_array_n=np.array(pca_result_stg_n.select("features_norm").collect())
all_vectors_array_n=all_vectors_array_n.reshape((all_vectors_array_n.shape[0],2))
all_vectors_array_n.shape

(3048, 2)

In [71]:
"""Making join with pca vectors with main dataframe based on index to get relevant fields required"""
pca_result=positive_review_sentences_main.join(pca_result_stg,"index","inner")
pca_result_n=negative_review_sentences_main.join(pca_result_stg_n,"index","inner")

In [72]:
"""Defining function to take intra class similarity. It takes dot product between normalised features with the array formed 
in previous steps. We exclude the vector itself from the main array as we aim to find similarity between other vectors. This 
is achieved by droping the column at index of the row feature as took transpose for dot product.Once dot product is done, 
we take 1-dot product to get cosine distance and take average using numpy.mean"""
def intra_class_similarity(features_vec,index):
    current_vec=np.array(features_vec)[np.newaxis,:]
    all_vec_norm=all_vectors_array.T
    all_vec_norm=np.delete(all_vec_norm, index, axis=1)
    avg_dist=np.mean(1-np.dot(current_vec,all_vec_norm))
    return float(avg_dist)

def intra_class_similarity_n(features_vec,index):
    current_vec=np.array(features_vec)[np.newaxis,:]
    all_vec_norm=all_vectors_array_n.T
    all_vec_norm=np.delete(all_vec_norm, index, axis=1)
    avg_dist=np.mean(1-np.dot(current_vec,all_vec_norm))
    return float(avg_dist)


intra_class_similarity_udf = udf(intra_class_similarity, FloatType())
intra_class_similarity_n_udf = udf(intra_class_similarity_n, FloatType())

In [73]:
"""Applying UDF defined in previous step for both negative and positive cases"""
pca_result_withIndex=pca_result.withColumn("avg_distance", intra_class_similarity_udf(pca_result["features_norm"],pca_result["index"]))
pca_result_withIndex_n=pca_result_n.withColumn("avg_distance", intra_class_similarity_n_udf(pca_result_n["features_norm"],pca_result_n["index"]))

In [74]:
pca_result_upd=pca_result_withIndex.select("index","review_id","pca_features","review_body_sentences_joined","filtered","features_norm","avg_distance").sort(col("avg_distance"))
pca_result_upd_n=pca_result_withIndex_n.select("index","review_id","pca_features","review_body_sentences_joined","filtered","features_norm","avg_distance").sort(col("avg_distance"))

In [75]:
#pca_result_upd.agg({"avg_distance": "max"}).collect()[0]

In [76]:
#positive_center=pca_result_upd.limit(1)
"""Extracting case center by sorting in ascending based on average distance"""
positive_center=pca_result_upd.select("index","review_id","features_norm","avg_distance",col("review_body_sentences_joined").alias("center_sentence_text")).limit(1)
negative_center=pca_result_upd_n.select("index","review_id","features_norm","avg_distance",col("review_body_sentences_joined").alias("center_sentence_text")).limit(1)

In [77]:
"""Using center identified, we apply the function similar to previous one, to identify distance and sort in ascending order. 
The first element is considered to be iself as vector will have 0 distance(least) with itself. """
def ten_most_similar(features_vec,index):
    current_vec=np.array(features_vec)[np.newaxis,:]
    all_vec_norm=all_vectors_array.T
    #all_vec_norm=np.delete(all_vec_norm, index, axis=1)
    index_list=np.argsort(1-np.dot(current_vec,all_vec_norm))[:,1:11][0]
    return index_list.tolist()

def ten_most_similar_n(features_vec,index):
    current_vec=np.array(features_vec)[np.newaxis,:]
    all_vec_norm=all_vectors_array_n.T
    #all_vec_norm=np.delete(all_vec_norm, index, axis=1)
    index_list=np.argsort(1-np.dot(current_vec,all_vec_norm))[:,1:11][0]
    return index_list.tolist()

ten_most_similar_udf = udf(ten_most_similar, ArrayType(IntegerType()))
ten_most_similar_n_udf = udf(ten_most_similar_n, ArrayType(IntegerType()))

ten_most_similar_df=positive_center.withColumn("10_most_similar_index",ten_most_similar_udf(col("features_norm"),col("index")))
ten_most_similar_df=ten_most_similar_df.drop(col("index")).withColumn("index",explode("10_most_similar_index"))
ten_most_similar_n_df=negative_center.withColumn("10_most_similar_index",ten_most_similar_n_udf(col("features_norm"),col("index")))
ten_most_similar_n_df=ten_most_similar_n_df.drop(col("index")).withColumn("index",explode("10_most_similar_index"))

In [78]:
"""Based on index identified in previous step, we make a join with main dataframe and extract results in required format."""
final_result=ten_most_similar_df.select(col("review_id").alias("center_review_id"),col("avg_distance").alias("center_avg_dist"),col("index"), col("center_sentence_text")).join(pca_result_upd,"index", "inner")
final_result_n=ten_most_similar_n_df.select(col("review_id").alias("center_review_id"),col("avg_distance").alias("center_avg_dist"),col("index"), col("center_sentence_text")).join(pca_result_upd_n,"index", "inner")
#final_result.show()

In [79]:
positive_final=final_result.select("center_review_id","center_sentence_text",col("review_id").alias("10_nearestneighbour_rv_id"), col("review_body_sentences_joined").alias("corresponding_sentence"))
negative_final=final_result_n.select("center_review_id","center_sentence_text",col("review_id").alias("10_nearestneighbour_rv_id"), col("review_body_sentences_joined").alias("corresponding_sentence"))
#df.write.parquet("s3a://bucket-name/shri/test.parquet",mode="overwrite")

In [80]:
"""Result for positive case. It includes center review id, center sentence text, 
10 nearest neighbours review id and their corresponding sentences"""
positive_final.show()

+----------------+--------------------+-------------------------+----------------------+
|center_review_id|center_sentence_text|10_nearestneighbour_rv_id|corresponding_sentence|
+----------------+--------------------+-------------------------+----------------------+
|   RLMNZTH4LDUAN|  My personal fav...|           R221HHEXXYR0O5|   Don't believe th...|
|   RLMNZTH4LDUAN|  My personal fav...|           R3Q0GV24AEX6EO|    These are two d...|
|   RLMNZTH4LDUAN|  My personal fav...|           R33LA8393ELKP9|  I love Good Charl...|
|   RLMNZTH4LDUAN|  My personal fav...|           R3L848KDDKFSGU|   to make it clear...|
|   RLMNZTH4LDUAN|  My personal fav...|           R1YX5THOWN73FR|   Geez Good Charlo...|
|   RLMNZTH4LDUAN|  My personal fav...|           R1QUKBXCN1EQSO|  It is so aweseome...|
|   RLMNZTH4LDUAN|  My personal fav...|           R1LAMC7N0Y96VI|   My bloody Valent...|
|   RLMNZTH4LDUAN|  My personal fav...|           R3UJ8LR0RK48OU|   The melodies are...|
|   RLMNZTH4LDUAN|  M

In [81]:
"""Result for negative case. It includes center review id, center sentence text, 
10 nearest neighbours review id and their corresponding sentences"""
negative_final.show()

+----------------+--------------------+-------------------------+----------------------+
|center_review_id|center_sentence_text|10_nearestneighbour_rv_id|corresponding_sentence|
+----------------+--------------------+-------------------------+----------------------+
|  R3MGK1ZXH61TIS| now if you want ...|           R246WGK1KFGIFO|   I mean the music...|
|  R3MGK1ZXH61TIS| now if you want ...|            RLKKDH4ZQTSR2|  i bought this alb...|
|  R3MGK1ZXH61TIS| now if you want ...|           R2Z6QVD6K0ILZ0|   You can save a t...|
|  R3MGK1ZXH61TIS| now if you want ...|            RRBM6IZ04NH41|   If one remark wo...|
|  R3MGK1ZXH61TIS| now if you want ...|           R377WFXFM7DCVY|   <br /> <br />Don...|
|  R3MGK1ZXH61TIS| now if you want ...|           R1O38JNU5QXPHT|    Good Charlotte ...|
|  R3MGK1ZXH61TIS| now if you want ...|           R3UX3SV2OQVRKX|   Terrible, just b...|
|  R3MGK1ZXH61TIS| now if you want ...|           R2WZV9V9DDU210|   This review is a...|
|  R3MGK1ZXH61TIS| no

In [82]:
end_time=datetime.datetime.now()

In [83]:
print("Execution time:",(end_time-start_time))

Execution time: 0:02:35.047162

In [84]:
positive_center.show(truncate=False)

+-----+-------------+----------------------------------------+------------+------------------------------------------------+
|index|review_id    |features_norm                           |avg_distance|center_sentence_text                            |
+-----+-------------+----------------------------------------+------------+------------------------------------------------+
|5152 |RLMNZTH4LDUAN|[0.9921261081421591,0.12524290615716627]|0.3372212   |  My personal favorites are tracks 1,2,3,4,and 8|
+-----+-------------+----------------------------------------+------------+------------------------------------------------+

In [85]:
negative_center.show(truncate=False)

+-----+--------------+---------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|index|review_id     |features_norm                          |avg_distance|center_sentence_text                                                                                                                                                                                                                                                                                                     |
+-----+--------------+---------------------------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------