In [1]:
# library imports 
import sys
from importlib import reload
#import findspark
import customHelpers as helper
#findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType,BooleanType,DateType

reload(helper)

<module 'customHelpers' from '/Users/luisedumuller/Documents/Studies/MDS/CloudComputing/Assignment2/COMP5349_AmazonProductReviewAnalysis/notebooks/customHelpers.py'>

In [2]:
# initialise the session 

spark = SparkSession \
    .builder \
    .appName("Amazon Product Review Analysis") \
    .getOrCreate()

## Load Dataset 

| Column | Description | 
| :--- | :--- |
| marketplace | 2 letter country code of the marketplace where the review was written. |
| customer_id | Random identifier that can be used to aggregate reviews written by a single author. |
| review_id | The unique ID of the review. |
| product_id | The unique Product ID the review pertains to. In the multilingual dataset the reviews for the same product in different countries can be grouped by the same product_id. | 
| product_parent | Random identifier that can be used to aggregate reviews for the same product. |
| product_title | Title of the product. | 
| product_category | Broad product category that can be used to group reviews (also used to group the dataset into  coherent parts). | 
| star_rating | the 1-5 star rating of the review. | 
| helpful_votes | Number of helpful votes. | 
| total_votes | Number of total votes the review received. | 
| vine | Review was written as part of the Vine program. |
| verified_purchase | The review is on a verified purchase. |
| review_headline | The title of the review. |
| review_body | The review text. |
| review_date | The date the review was written | 


DATA FORMAT
Tab ('\t') separated text file, without quote or escape characters.
First line in each file is header; 1 line corresponds to 1 record.

In [3]:
# load the data set 
#review_data = '../data/sample_us.tsv'
# actual data load - PERFORMANCE WARNING ON LOCAL MACHINE
review_data = '../data/amazon_reviews_us_Music_v1_00.tsv'

aws_product_review_schema = StructType([
    StructField("marketplace", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("product_id",StringType(),True),
    StructField("product_parent",StringType(),False),
    StructField("product_title", StringType(), False),
    StructField("product_category", StringType(), False),
    StructField("star_rating", IntegerType(), False),
    StructField("helpful_votes",IntegerType(),False),
    StructField("total_votes", IntegerType(), False),
    StructField("vine",StringType(),False),
    StructField("verified_purchase", StringType(), False),
    StructField("review_headline", StringType(), False),
    StructField("review_body", StringType(), False),
    StructField("review_date",DateType(),False)])

aws_product_review_schema_limited = StructType([
    StructField("customer_id", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("product_id",StringType(),True),
    StructField("product_title", StringType(), False),
    StructField("product_category", StringType(), False),
    StructField("star_rating", IntegerType(), False),
    StructField("helpful_votes",IntegerType(),False),
    StructField("total_votes", IntegerType(), False),
    StructField("review_headline", StringType(), False),
    StructField("review_body", StringType(), False),
    StructField("review_date",DateType(),False)])

%time awsProductReview_raw_data = spark.read.csv(review_data,header=True,sep="\t",schema=aws_product_review_schema)


CPU times: user 1.97 ms, sys: 956 µs, total: 2.93 ms
Wall time: 1.89 s


In [4]:
# when testing in local machine only 
print(awsProductReview_raw_data.count())
# limit to 1 mil
awsProductReview_raw_data = awsProductReview_raw_data.limit(100000)
print(awsProductReview_raw_data.count())

4751577
100000


In [5]:
dfProductReview = awsProductReview_raw_data.drop('vine').drop('verified_purchase') \
                    .drop('product_parent').drop('marketplace')

In [6]:
# remove rows with no review text 
print("number of rows before filter: {0}".format(dfProductReview.count()))
dfFilteredReviews = dfProductReview.na.drop(subset=["review_body"])
print("number of rows after filter: {0}".format(dfFilteredReviews.count()))

number of rows before filter: 100000
number of rows after filter: 99977


## Stage One: Overall statistics

### Produce overall summary statistics of the data set, in particular,

* the total number of reviews
* the number of unique users
* the number of unique products

In [7]:
from pyspark.sql.functions import col, count, countDistinct, when, isnull

dfOverallStats = dfFilteredReviews.agg(countDistinct("customer_id").alias("unique_customers"), \
                    countDistinct("product_id").alias("unique_products"), \
                    count(col="review_id").alias("total_reviews")) \

#dfOverallStats.show()

In [8]:
# CHECK IN ALL THE COLUMNS IF THERE IS ANY NULL VALUE.
dfFilteredReviews.select([count(when(isnull(column), column)).alias(column) \
                        for column in dfProductReview.columns]).show()

+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+---------------+-----------+-----------+
|customer_id|review_id|product_id|product_title|product_category|star_rating|helpful_votes|total_votes|review_headline|review_body|review_date|
+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+---------------+-----------+-----------+
|          0|        0|         0|            0|               0|          0|            0|          0|              0|          0|          0|
+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+---------------+-----------+-----------+



### For user-review distribution, you are asked to find out:

* the largest number of reviews published by a single user
* the top 10 users ranked by the number of reviews they publish
* the median number of reviews published by a user

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count
import pyspark.sql.functions as F

dfUserReviewCounts = helper.distributionStats(dfRecords=dfFilteredReviews.select("customer_id", "review_id"), \
                                              partitionBy="customer_id",countBy="review_id", \
                                              returnCountName="total_reviews")
print("Top Reviewer:")
dfUserReviewCounts.show(1)
print("Top 10 Reviewers:")
dfUserReviewCounts.show(10)

user_review_median=dfUserReviewCounts.approxQuantile("total_reviews", [0.50], 0)[0]
print("median number of {0} reviews published by user".format(user_review_median))

Top Reviewer:
+-----------+-------------+
|customer_id|total_reviews|
+-----------+-------------+
|   15536614|          373|
+-----------+-------------+
only showing top 1 row

Top 10 Reviewers:
+-----------+-------------+
|customer_id|total_reviews|
+-----------+-------------+
|   15536614|          373|
|   38214553|          226|
|    4276914|          222|
|    8342883|          194|
|    5291529|          190|
|   13634768|          180|
|    2112938|          126|
|   38229524|           98|
|   48046800|           97|
|   22716027|           92|
+-----------+-------------+
only showing top 10 rows

median number of 1.0 reviews published by user


### For product-review distribution, you are asked to find out:
    
* the largest number of reviews written for a single product
* the top 10 products ranked by the number of reviews they have
* the median number of reviews a product has

In [10]:
dfProductReviewCounts = helper.distributionStats(dfRecords=dfProductReview.select("product_id", "review_id"), \
                                              partitionBy="product_id",countBy="review_id", \
                                              returnCountName="total_reviews")
print("Top Product By Review:")
dfProductReviewCounts.show(1)
print("Top 10 Products by Reviews:")
dfProductReviewCounts.show(10)

product_review_median=int(dfProductReviewCounts.approxQuantile("total_reviews", [0.50], 0)[0])
print("median number of {0} reviews per product".format(product_review_median))

Top Product By Review:
+----------+-------------+
|product_id|total_reviews|
+----------+-------------+
|B00VXGTJMU|          256|
+----------+-------------+
only showing top 1 row

Top 10 Products by Reviews:
+----------+-------------+
|product_id|total_reviews|
+----------+-------------+
|B00VXGTJMU|          256|
|B00VMRJPCE|          255|
|B00UCFVIDQ|          205|
|B00VMQK37Q|          170|
|B00ZQUP38S|          158|
|B00WE2SMKC|          139|
|B00WSOWR0M|          120|
|B00MRHANNI|          116|
|B00VTBBEL8|          115|
|B00XJJAWES|          112|
+----------+-------------+
only showing top 10 rows

median number of 1 reviews per product


## Stage Two: Filtering Unwanted Data

filter reviews based on length, reviewer and product feature. In particular, the following reviews should be removed:

* reviews with less than two sentences in the review body.
* reviews published by users with less than median number of reviews published
* reviews from products with less than median number of reviews received

NOTE: Sentence Segmentation Using: NLTK

In [12]:
# reviews with less than 2 sentences in review_body
# convert to RDD and carry out a filter to remove rows with less than 2 sentences 

print("number of rows before filter: {0}".format(dfFilteredReviews.count()))

dfFilteredReviews = dfFilteredReviews.filter(helper.FilterSentences('review_body'))

#dfFilteredReviews.show(1)
dfFilteredReviews.cache()

print("number of rows post filter: {0}".format(dfFilteredReviews.count())) 

number of rows before filter: 99977
number of rows post filter: 28955


In [13]:
# user review filter 
print("number of rows before filter: {0}".format(dfFilteredReviews.count()))

window = Window.partitionBy("customer_id")
dfFilteredReviews = dfFilteredReviews \
    .withColumn("review_count", count("review_id") \
    .over(window)) \
    .filter(col("review_count") >= user_review_median) \
    .drop("review_count")

print("number of rows post filter: {0}".format(dfFilteredReviews.count()))

number of rows before filter: 28955
number of rows post filter: 28955


In [14]:
# product review filter
print("number of rows before filter: {0}".format(dfFilteredReviews.count()))

window = Window.partitionBy("product_id")
dfFilteredReviews = dfFilteredReviews \
    .withColumn("review_count", count("review_id") \
    .over(window)) \
    .filter(col("review_count") >= product_review_median) \
    .drop("review_count")

print("number of rows post filter: {0}".format(dfFilteredReviews.count()))

number of rows before filter: 28955
number of rows post filter: 28955


In [15]:
print("Original Number of Rows before cleanup: {0}".format(dfProductReview.count()))
print("Number of rows after all filters applied: {0}".format(dfFilteredReviews.count()))

Original Number of Rows before cleanup: 100000
Number of rows after all filters applied: 28955


In [16]:
dfFilteredReviews.cache()

DataFrame[customer_id: string, review_id: string, product_id: string, product_title: string, product_category: string, star_rating: int, helpful_votes: int, total_votes: int, review_headline: string, review_body: string, review_date: date]

#### After filtering out the above, find out:

* top 10 users ranked by median number of sentences in the reviews they have published
* top 10 products ranked by median number of sentences in the reviews they have received

In [17]:
# top 10 users ranked by median number of sentences in the reviews they have published
dfTop10UsersBySents = helper.getTopBySentMedian(dfRecords=dfFilteredReviews, \
                                                partitionBy="customer_id", \
                                                textCol="review_body",\
                                                medianColName="median_sents",n=10)
dfTop10UsersBySents.show()

+-----------+------------+
|customer_id|median_sents|
+-----------+------------+
|   49745257|         241|
|   49754397|         239|
|   14678937|         155|
|   22434772|         127|
|   20894201|         103|
|   25007515|          87|
|    1459729|          82|
|   51979520|          82|
|   49758023|          79|
|   34564717|          77|
+-----------+------------+



In [18]:
# top 10 products ranked by median number of sentences in the reviews they have received
dfTop10ProductsBySents = helper.getTopBySentMedian(dfRecords=dfFilteredReviews, \
                                                   partitionBy="product_id", \
                                                   textCol="review_body", \
                                                   medianColName="median_sents",n=10)

dfTop10ProductsBySents.show(10)

+----------+------------+
|product_id|median_sents|
+----------+------------+
|B003ZUCXP2|         217|
|B010SOIC5C|         194|
|B00Q9H9GBM|         151|
|B003UUQ7OK|         150|
|B000007WPE|         148|
|B00FY3X5GO|         146|
|B00FG1EVUS|         145|
|B003XD03DU|         134|
|B012IV1E62|         134|
|B00GG3JEU2|         134|
+----------+------------+



In [19]:
# save the cleaned and filtered dataframe to file system 

#dfFilteredReviews.coalesce(1).write.format("parquet") \
#    .option("header", "true").saveAsTable('filteredReviews',mode="overwrite")
dfFilteredReviews.coalesce(1).write.csv("../output",mode="overwrite",header=True,sep="\t")

## Stage 3 Similarity analysis with Sentence Embedding

perform similarity analysis on the review sentences. The analysis involves segmenting review body into multiple sentences; encoding each sentence as vector so that the distance between pair of sentences can be computed.

### Positive vs. Negative Reviews

* pick a product from the top 10 products in stage 1
* Create a positive and negative class of reviews using the rating 
    - Positive Class - rate >=4 
    - Negative Class - rate <= 2
    - for each review, extracting the review body part and segment it into multiple sentences.
    - encode the sentences using google universal encoder

In [4]:
from pyspark.sql.functions import col, count, countDistinct, when, isnull
from pyspark.sql.window import Window
from pyspark.sql.functions import count
import pyspark.sql.functions as F

filtered_data = "../output/part-*.csv"
dfBaseDataset = spark.read.csv(filtered_data,header=True,sep="\t",schema=aws_product_review_schema_limited)

In [5]:
# product from top 10 by review number 
base_product_id = "B00MIA0KGY"

dfSelectedProduct = dfBaseDataset.where((col("product_id") == base_product_id))
dfPositiveClass = dfBaseDataset.where((col("product_id") == base_product_id) & (col("star_rating") >= 4))
dfNegativeClass = dfBaseDataset.where((col("product_id") == base_product_id) & (col("star_rating") <= 2))

In [6]:
dfBaseDataset.count()

329468

In [7]:
print("number of reviews from {0}: {1}".format(base_product_id, dfSelectedProduct.count()))
print("number of positives reviews from {0}: {1}".format(base_product_id,dfPositiveClass.count()))
print("number of negatives reviews from {0}: {1}".format(base_product_id,dfNegativeClass.count()))



number of reviews from B00MIA0KGY: 638
number of positives reviews from B00MIA0KGY: 549
number of negatives reviews from B00MIA0KGY: 50


### Extract the sentences - similar to flatMap

In [17]:
# for each review, extract the review body part and segment it into multiple sentences
# extract the positive sentences
dfPosSents = dfPositiveClass.select("review_id","review_body") \
    .withColumn("sentences", helper.GenerateSentences("review_body")) \
    .select("review_id", F.explode_outer("sentences").alias("sentence")) \
    .na.drop(subset=["sentence"])

print("number of sentences from positive reviews: {0}".format(dfPosSents.count()))

number of sentences from positive reviews: 3018


In [18]:
# extract the negative sentences
dfNegSents = dfNegativeClass.select("review_id","review_body") \
    .withColumn("sentences", helper.GenerateSentences("review_body")) \
    .select("review_id", F.explode_outer("sentences").alias("sentence")) \
    .na.drop(subset=["sentence"])

print("number of sentences from negative reviews: {0}".format(dfNegSents.count()))

dfNegSents.show()

number of sentences from negative reviews: 314
+--------------+--------------------+
|     review_id|            sentence|
+--------------+--------------------+
|R2P2KVK3GRJBHP|     Love her voice!|
|R2P2KVK3GRJBHP|Few surprises, sh...|
|R2P2KVK3GRJBHP|Pairing with a pa...|
|R2P2KVK3GRJBHP|Her son has a bea...|
|R1HY9W9AU5S4WB|   So disappointed!!|
|R1HY9W9AU5S4WB|!why did she do it??|
|R1HY9W9AU5S4WB|                   ?|
| RT90E78LWTMTC|Only song I liked...|
| RT90E78LWTMTC|     It has feeling.|
| RT90E78LWTMTC|In all the others...|
|R1TXTNSLRNGDYI|If you think this...|
|R1TXTNSLRNGDYI|Mostly  it's the ...|
|R1TXTNSLRNGDYI|Give the guys a c...|
| R8WGLIPJJR975|       Love Barbara!|
| R8WGLIPJJR975|Own practically e...|
| R8WGLIPJJR975|         Hated this!|
| R8WGLIPJJR975|could not even li...|
| R8WGLIPJJR975|Her singing, for ...|
| R8WGLIPJJR975|her partners, wit...|
| R8WGLIPJJR975|A number of the a...|
+--------------+--------------------+
only showing top 20 rows



### Encoding the sentences - google universal encoder 

In [24]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType,BooleanType,DateType,ArrayType

reload(helper)


<module 'customHelpers' from '/Users/luisedumuller/Documents/Studies/MDS/CloudComputing/Assignment2/COMP5349_AmazonProductReviewAnalysis/notebooks/customHelpers.py'>

In [102]:
### old Luis version ### --------------------------------------------------------------

# Processing the data for embedding: first step selecting the sentence column from the 
# data frames sentences. not sure why it doesn't work if don't use '.limit()' even if
# dataframe is short 

#pos_rev_text = dfPosSents.limit(10000).select('sentence')
#second: convert the dataframe to rdd pipeline
#pos_rev_clean_text_rdd = pos_rev_text.rdd.map(lambda row: str(row[0])) \
#                    .filter(lambda data: data is not None).cache()

#neg_rev_text = dfNegSents.limit(10000).select('sentence')
#second: convert the dataframe to rdd pipeline
#neg_rev_clean_text_rdd = neg_rev_text.rdd.map(lambda row: str(row[0])) \
#                    .filter(lambda data: data is not None).cache()

### old Luis version ### --------------------------------------------------------------



## Nagib version ### ------------------------------------------------------------------ 

##### bug with tensorflow if no limit is set
# bypass by setting df count as limit 

# get the negative embeddings + dense vectors 
#rddTemp = dfNegSents.limit(dfNegSents.count()).select("review_id","sentence").rdd.map(list) \
#            .mapPartitions(helper.vectorizeSents).cache()
#dfNegSentsVectorised = spark.createDataFrame(rddTemp, helper.VECTOR_SCHEMA)

#dfNegSentsVectorised.show()

rddTemp = dfPosSents.limit(dfPosSents.count()).select("review_id","sentence").rdd.map(list) \
            .mapPartitions(helper.vectorizeSents).cache()
dfPosSentsVectorised = spark.createDataFrame(rddTemp, helper.VECTOR_SCHEMA)

dfPosSentsVectorised.show()

+--------------+--------------------+
|     review_id|             vectors|
+--------------+--------------------+
|R33AKM6TMGP62U|[-0.0016616370994...|
|R33AKM6TMGP62U|[0.00872488133609...|
|R33AKM6TMGP62U|[-0.0347822681069...|
|R338L3ESXHT0XJ|[0.01993116922676...|
|R338L3ESXHT0XJ|[-0.0055921743623...|
|R338L3ESXHT0XJ|[0.02325326018035...|
|R338L3ESXHT0XJ|[-0.0637154728174...|
|R338L3ESXHT0XJ|[-0.0039158728905...|
|R338L3ESXHT0XJ|[0.02630591951310...|
|R338L3ESXHT0XJ|[-0.0169928558170...|
|R1YIG5CA2CR3FC|[-0.0058144275099...|
|R1YIG5CA2CR3FC|[0.01800153031945...|
|R1YIG5CA2CR3FC|[-0.0625827088952...|
|R23WXSCWPOBERM|[0.00281153293326...|
|R23WXSCWPOBERM|[0.01098206080496...|
|R23WXSCWPOBERM|[1.21090859465766...|
|R337RW8HCJLL7H|[0.02948248200118...|
|R337RW8HCJLL7H|[-0.0196939110755...|
|R337RW8HCJLL7H|[0.06208934262394...|
|R337RW8HCJLL7H|[-0.0235339216887...|
+--------------+--------------------+
only showing top 20 rows



In [28]:
# print some lines 
#print("Total embedded and vectorised positive sentences {0}".format(dfPosSentsVectorised.count()))
#print("Pos vector sample:")
#dfPosSentsVectorised.show(1)

print("Total embedded and vectorised negative sentences {0}".format(dfNegSentsVectorised.count()))
print("Neg vector sample:")
dfNegSentsVectorised.show(1)

Total embedded and vectorised negative sentences 314
Neg vector sample:
+--------------+--------------------+
|     review_id|             vectors|
+--------------+--------------------+
|R2P2KVK3GRJBHP|[-0.0506365746259...|
+--------------+--------------------+
only showing top 1 row



In [51]:
# embedding old version - luis

## function from lab 9
#def review_embed(rev_text_partition):
#    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
#    embed = hub.Module(module_url)
#    rev_text_list = [text for text in rev_text_partition]
#    with tf.Session() as session:
#        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
#        message_embeddings = session.run(embed(rev_text_list))
    
#    return message_embeddings

In [52]:
# getting the embedding from sentences
#positive_review_embedding = pos_rev_clean_text_rdd.mapPartitions(review_embed).cache()
#negative_review_embedding = neg_rev_clean_text_rdd.mapPartitions(review_embed).cache()

### Intra-Class Similarity

We want to find out if sentences in the same category are closely related with each other. The closeness is measured by average distance between points in the class. In our case, point refers to the sentence encoding and pair-wise distance is measured by Cosine distance. Cosine distance is computed as “1 − CosineSimilarity”. It has a value between 0 and 2.


In [103]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="vectors", outputCol="normFeatures") # default uses L2 norm 
l2NegNormData = normalizer.transform(dfNegSentsVectorised)
print("Normalized using L2 norm")
l2NegNormData.show(5)

l2PosNormData = normalizer.transform(dfPosSentsVectorised)

Normalized using L2 norm
+--------------+--------------------+--------------------+
|     review_id|             vectors|        normFeatures|
+--------------+--------------------+--------------------+
|R2P2KVK3GRJBHP|[-0.0506365746259...|[-0.0506365802085...|
|R2P2KVK3GRJBHP|[-0.0580368600785...|[-0.0580368585287...|
|R2P2KVK3GRJBHP|[0.03794875741004...|[0.03794875709855...|
|R2P2KVK3GRJBHP|[0.03115262463688...|[0.03115262578360...|
|R1HY9W9AU5S4WB|[0.01509214658290...|[0.01509214560504...|
+--------------+--------------------+--------------------+
only showing top 5 rows



In [104]:
masterNegMapData = l2NegNormData.select("review_id","normFeatures").rdd.zipWithIndex().cache()

masterPosMapData = l2PosNormData.select("review_id","normFeatures").rdd.zipWithIndex().cache()

In [113]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
#mat = IndexedRowMatrix(masterMapData.map(lambda row: IndexedRow(row.review_id, row.normFeatures.toArray()))) \
#            .toBlockMatrix()

def calc_distance(row):
    rowid = row.index
    distances = 1-row.vector
    # now get the sums 
    totalDistance = distances.toArray().sum()
    length = len(distances.toArray())
    avg = totalDistance / length
    return (rowid, avg)

#matNeg = IndexedRowMatrix(masterNegMapData.map(lambda row: IndexedRow(row[1], (row[0][1]).toArray()))).toBlockMatrix()
#dotNeg = matNeg.multiply(matNeg.transpose())
#negCosines = dotNeg.toIndexedRowMatrix()
#avgNegDistances = negCosines.rows.map(lambda row: calc_distance(row))

matPos = IndexedRowMatrix(masterPosMapData.map(lambda row: IndexedRow(row[1], (row[0][1]).toArray()))).toBlockMatrix()
dotPos = matPos.multiply(matPos.transpose())
posCosines = dotPos.toIndexedRowMatrix()
avgPosDistances = posCosines.rows.map(lambda row: calc_distance(row))



In [114]:
%time x = avgPosDistances.collect()

soma = 0
count = 0
for i in x :
    soma += i[1]
    count += 1
print(soma/count)

test = posCosines.rows.map(lambda row: row)
ccc = test.collect()
len(ccc)

CPU times: user 3.94 ms, sys: 1.79 ms, total: 5.73 ms
Wall time: 1.68 s
0.6784501847286866


3018

In [53]:
from pyspark.ml.linalg import Vectors
reload(helper)
# create an rdd with PySparkVector and zip it with an index.
neg_vect_index = negative_review_embedding.map(Vectors.dense).zipWithIndex()

#crete a cartesian combination of the vectors, index pair 
#eg: ((vect0,0), (vect1,1)), ((vect1,1), (vect2,2)), etc... 
neg_distances = neg_vect_index.cartesian(neg_vect_index) \
                       .map(helper.CosineDistance) \
                       .filter(lambda x: x[2] != 0.0)

%time neg_result = neg_distances.collect()
neg_avg = sum(i[2] for i in neg_result)/len(neg_result)

print("Average cosine distance between positive reviews: {0}".format(neg_avg))

CPU times: user 110 ms, sys: 241 ms, total: 351 ms
Wall time: 30.6 s
Average cosine distance between positive reviews: 0.7156308870893071


In [31]:
pos_vect_index = positive_review_embedding.map(Vectors.dense).zipWithIndex()

pos_distances = pos_vect_index.cartesian(pos_vect_index) \
                       .map(helper.CosineDistance) \
                       .filter(lambda x: x[2] != 0.0)

%time pos_result = pos_distances.collect()
pos_avg = sum(i[2] for i in pos_result)/len(pos_result)

print("Average cosine distance between positive reviews: {0}".format(pos_avg))

CPU times: user 7.38 ms, sys: 5.05 ms, total: 12.4 ms
Wall time: 24.2 s
Average cosine distance between positive reviews: 0.6657311997824882


### Class Center Sentences

Find out the class center and its 10 closest neighbours for positive and negative class respectively. We define class center as the point that has the smallest average distance to other points in the class. Again in this case point refers to the sentence encoding and pair-wise distance are measured by Cosine distance.
The result should show the text of the center sentence, the review id it belongs to and its 10 closest neighbouring sentences text and their respective review id.

In [64]:
from pyspark.sql.functions import count, avg

aws_distances_schema = StructType([
    StructField("id_origin", IntegerType(), True),
    StructField("id_dest", IntegerType(), True),
    StructField("distances",FloatType(), True)])

dfNegDistances = spark.createDataFrame(neg_distances,aws_distances_schema)
#dfPosDistances = spark.createDataFrame(pos_distances,aws_distances_schema)

In [65]:
#dfPosCentre = dfPosDistances.groupBy("id_origin").agg(avg("distances")) \
#                       .orderBy("avg(distances)") \
#                       .limit(1)
#posCenterid  = dfPosCentre.collect()[0][0]

#print("Center sentence from Positive class with minimum average distance")
#dfPosCentre.show()

#dfPosClosest = dfPosDistances.where(col("id_origin") == posCenterid) \
#                             .orderBy("distances") \
#                             .limit(10)

#print("Ten closest sentences from positive class centre:")

#dfPosClosest.show()

NameError: name 'dfPosDistances' is not defined

In [66]:
dfNegCentre = dfNegDistances.groupBy("id_origin").agg(avg("distances")) \
                       .orderBy("avg(distances)") \
                       .limit(1)

negCenterid  = dfNegCentre.collect()[0][0]

print("Center sentence from Negative class with minimum average distance")
dfNegCentre.show()

dfNegClosest = dfNegDistances.where(col("id_origin") == negCenterid) \
                             .orderBy("distances") \
                             .limit(10)

print("Ten closest sentences from negative class centre:")

dfNegClosest.show()

Center sentence from Negative class with minimum average distance
+---------+------------------+
|id_origin|    avg(distances)|
+---------+------------------+
|      101|0.5902185768555528|
+---------+------------------+

Ten closest sentences from negative class centre:
+---------+-------+----------+
|id_origin|id_dest| distances|
+---------+-------+----------+
|      101|     37|0.18140683|
|      101|     60|0.21687192|
|      101|      7|0.25256255|
|      101|    137|0.25421256|
|      101|     91|0.25971147|
|      101|    221|0.26143548|
|      101|      3|0.27037904|
|      101|    311|0.27135503|
|      101|    180|0.27161464|
|      101|    182| 0.2756444|
+---------+-------+----------+

