In [1]:
import numpy as np
import nltk
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType,DoubleType,ArrayType,IntegerType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Normalizer
from pyspark.sql.functions import mean
from pyspark.ml.feature import PCA,VectorAssembler

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1558477459387_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
sc._conf.get('spark.default.parallelism')

VBox()

'96'

In [3]:
rev_data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"

VBox()

In [4]:
spark = SparkSession \
    .builder \
    .config('spark.yarn.executor.memoryOverhead', 2048) \
    .appName("Spark Text Encoder example") \
    .getOrCreate()

ID = 'B00006J6VG'

VBox()

In [5]:
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

VBox()

In [6]:
# Positive vs. Negative Reviews

records = spark.read.csv(rev_data, header=True, sep='\t').cache()

def splitSent(rev_text_partition):
    rev_text_list = []
    for review_id, review_body in rev_text_partition:
        for idx, sent in enumerate(tokenizer.tokenize(review_body)):
            rev_text_list.append((review_id + '+' + str(idx), sent))
    return rev_text_list

schema =  StructType([StructField("review_id", StringType(), True),
                      StructField("review_body", StringType(), True)])

pos_rdd = records.filter((records.product_id == ID) & (records.review_body.isNotNull()))\
                 .filter('star_rating>=4')\
                 .select('review_id','review_body')\
                 .rdd.mapPartitions(splitSent)
                 #.zipWithIndex()

neg_rdd = records.filter((records.product_id == ID) & (records.review_body.isNotNull()))\
                 .filter('star_rating<=2')\
                 .select('review_id','review_body')\
                 .rdd.mapPartitions(splitSent)
                 #.zipWithIndex()
                                                    
positive = spark.createDataFrame(pos_rdd, schema=schema)\
                .withColumn('index',F.monotonically_increasing_id())\
                .cache()

negative = spark.createDataFrame(neg_rdd, schema=schema)\
                .withColumn('index',F.monotonically_increasing_id())\
                .cache()

VBox()

In [7]:
pos_review_id = pos_rdd.zipWithIndex().map(lambda x: (x[1],x[0][0]))
pos_review_body =  pos_rdd.zipWithIndex().map(lambda x: (x[1],x[0][1]))

neg_review_id = neg_rdd.zipWithIndex().map(lambda x: (x[0][0],x[1]))
neg_review_body = neg_rdd.zipWithIndex().map(lambda x: (x[1],x[0][1]))

VBox()

In [9]:
def tfidfConvert(df):
    tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
    wordsData = tokenizer.transform(df)

    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=512)
    featurizedData = hashingTF.transform(wordsData)

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)
    norm =  Normalizer(inputCol="features", 
                       outputCol="features_norm", p=2)\
                      .transform(rescaledData)\
                      .select('review_id', 'review_body','features_norm')\
                      .cache()

    return norm

positive_emb = tfidfConvert(positive).withColumn('index',F.monotonically_increasing_id()).cache()
negative_emb = tfidfConvert(negative).withColumn('index',F.monotonically_increasing_id()).cache()

print(pos_review_id.count(),'positive reviews:')
positive_emb.show(5)
print(neg_review_id.count(),'negative reviews:')
negative_emb.show(5)


VBox()

10108 positive reviews:
+----------------+--------------------+--------------------+-----+
|       review_id|         review_body|       features_norm|index|
+----------------+--------------------+--------------------+-----+
|R3R7MRNK5HPULY+0|Good Charlotte's ...|(512,[3,8,17,30,8...|    0|
| RQ9PYEGZ1N6LS+0|My daughter loves...|(512,[202,455,476...|    1|
|R1P3A5U0M98JWW+0|              thanks|    (512,[94],[1.0])|    2|
|R2TYDS7G24XRZC+0|vey good, I was c...|(512,[1,2,9,23,24...|    3|
|R14IHG9LSIZLZK+0|I love this cd, I...|(512,[6,26,35,82,...|    4|
+----------------+--------------------+--------------------+-----+
only showing top 5 rows

3303 negative reviews:
+----------------+--------------------+--------------------+-----+
|       review_id|         review_body|       features_norm|index|
+----------------+--------------------+--------------------+-----+
|R2F6WAB05QY47M+0|I would rather go...|(512,[29,47,52,55...|    0|
|R2F6WAB05QY47M+1|This is the garba...|(512,[17,22,29,44.

In [10]:
#For Positive reviews: using PCA to reduce the dimension to 436 which contains 95% information

def positive_pca(df):
    pca = PCA(k=436, inputCol="features_norm", outputCol="pca")
    pca_result = lambda vectors : pca.fit(vectors).transform(vectors).select('pca')
    positive_pca_result = pca_result(df) \
                        .withColumn('index',F.monotonically_increasing_id())  
    return  positive_pca_result

def positive_cosine_dis(item):
    dis_list = []
    pca_bdt = positive_pca_broadcast.value
    cos = lambda vector1,vector2 : np.dot(vector1,vector2)/(np.linalg.norm(vector1)*(np.linalg.norm(vector2)))
    for i in range(item[0]+1,len(pca_bdt)):
        dis_list.append([pca_bdt[i][0],1-cos(item[1],pca_bdt[i][1])])
    return [[(item[0],i[0]), i[1].tolist()] for i in dis_list]

def positive_cosine_dis_each():
    positive_distance_rdd = positive_pca_result.rdd.repartition(128) \
                                               .map(lambda item: (item[1],np.array(item[0]))) \
                                               .flatMap(positive_cosine_dis) \
                                               .cache()
    positive_distance_df = spark.createDataFrame(positive_distance_rdd, ['point','distance'])
    return positive_distance_df


#For Negative reviews: without using PCA
sim_calculator = F.udf(lambda emba, embb : 1 - float(emba.dot(embb)), returnType=FloatType())

def negative_withoutPCA(df):
    negative_result = df.select(F.col('review_id').alias('rev_a'), F.col('features_norm').alias('emba')) \
                             .crossJoin(df.select(F.col('review_id').alias('rev_b'), F.col('features_norm').alias('embb'))) \
                             .filter(F.col('rev_a') > F.col('rev_b')).repartition(128) \
                             .withColumn('sim', sim_calculator('emba', 'embb')) \
                             .drop('emba', 'embb').cache()
    return negative_result


VBox()

In [11]:
#calculate mean for the distance for each class
positive_pca_result = positive_pca(positive_emb)
positive_pca_broadcast = positive_pca_result\
                         .rdd.map(lambda item: (item[1],np.array(item[0])))\
                         .collect()
positive_pca_broadcast =sc.broadcast(positive_pca_broadcast)

positive_distance_df = positive_cosine_dis_each()
avg_positive_distance = positive_distance_df.select(mean(positive_distance_df.distance))
print('positive reviews average cosine distance:')
avg_positive_distance.show()

negative_distance = negative_withoutPCA(negative_emb)
avg_neg_dist = negative_distance.agg(F.avg(F.col('sim'))).collect()[0][0]
avg_negative_distance = spark.createDataFrame(sc.parallelize([('avg', avg_neg_dist)]),['avg','avg(distance)']).select('avg(distance)')
print('negative reviews average cosine distance:')
avg_negative_distance.show()


VBox()

positive reviews average cosine distance:
+------------------+
|     avg(distance)|
+------------------+
|0.9594840008378294|
+------------------+

negative reviews average cosine distance:
+------------------+
|     avg(distance)|
+------------------+
|0.9590096414403552|
+------------------+

In [13]:
#create the index -> distance_list df, the distance_list contains all points and distance it connects
def transform_distance_rdd(df):
    '''
    combine the key with connected all points and distance to form the new rdd 
    '''
    trans_rdd = df.rdd.flatMap(lambda item : [[item[0][0],(item[0][1],item[1])],[item[0][1],(item[0][0],item[1])]]) \
                                .mapValues(lambda val : [val]) \
                                .reduceByKey(lambda x,y: x+y)
    return trans_rdd

def min_avg_distance(df):
    '''
    calculate the minimum avg distance for each point
    return the index of the minimum row that is the class centre
    '''
    avg_distance_udf = F.udf(lambda distance_list: float(np.mean(np.array(distance_list)[:,1],axis=0)), DoubleType())
    return df.withColumn('avg_distance',avg_distance_udf(df.distance_list)) \
                        .drop('distance_list') \
                        .rdd \
                        .reduce(lambda x,y : (x[0],x[1]) if (x[1] < y[1]) else (y[0],y[1]))[0]

def get_positive_index(index,df,class_df):
    '''
    input: index of the class centre, df
    output: the 10 nearest neighbors list of the index
    
    '''
    rdd = df.filter(df.index == index).select('distance_list') \
            .rdd.flatMap(lambda data: [[item[0],item[1]]for item in data.distance_list])
    index_list = spark.createDataFrame(rdd,['index','distance']) \
                        .orderBy(F.asc('distance')) \
                        .limit(10) \
                        .select('index') \
                        .collect()
    index_list = [item['index'] for item in index_list] #top 10 nearest neighbors index list
    return index_list


def get_negative_id():
    unoin_negative = negative_distance.select(F.col('rev_a').alias('rev_id'), 'sim') \
                               .union(negative_distance.select(F.col('rev_b').alias('rev_id'), 'sim')) \
                               .groupby('rev_id').agg(F.avg(F.col('sim')).alias('sim_avg')) \
                               .orderBy('sim_avg').head(1)[0]
    negative_center = unoin_negative[0]
    return negative_center


VBox()

In [14]:
positive_distance_list_df = spark\
                            .createDataFrame(transform_distance_rdd(positive_distance_df),
                                             ['index','distance_list'])
positive_centre_index = min_avg_distance(positive_distance_list_df)
positive_neighbors_index = get_positive_index(positive_centre_index,positive_distance_list_df,positive_emb)


negative_center=get_negative_id()
negative_udf = F.udf(lambda reva, revb :  reva if reva != negative_center else revb, returnType=StringType())
negative_neighbors = negative_distance\
                        .filter((F.col('rev_a') == negative_center) | (F.col('rev_b') == negative_center)) \
                        .withColumn('review_id', negative_udf('rev_a', 'rev_b')).drop('rev_a', 'rev_b') \
                        .join(negative, 'review_id').orderBy('sim').head(10)


VBox()

In [15]:
print('Positive center index:',positive_centre_index)
print('center review_id:',str((pos_review_id.lookup(positive_centre_index))).split('+')[0][2:])
print('center sentence:',str(pos_review_body.lookup(positive_centre_index)))

print('\n-------------------------------------------------------------------------------------\n')

for idx in enumerate(positive_neighbors_index):
    print(str(idx[0]+1),'nearest neighbor index:',
          idx[1],'\nreview_id:',str(pos_review_id.lookup(idx[1])).split('+')[0][2:],
          'sentence:',str(pos_review_body.lookup(idx[1]))+'\n')
    

VBox()

Positive center index: 9323
center review_id: R3IAOTJZNHP373
center sentence: ['Hey kids my name is courtney and i just wanted to write a review to tell you about the cd and why its really awesome...I reallyhave never experenced this much of a impact in my life threw music...Good Charlottes debuet ablum was awesome the hit songs like Little Things, Motavation Proclamation, and the Festival song have true meaning and alot of there other songs did..But the new ablum the young and the hopeless talks about how yeh they wernt Mr.Popular in school (another loser Anthem) and sometimes there were sick of hanging in there but they wanted to hold on and they want you to hold on too (hold on) they have been through alot of stuff alot of us have been threw..some people may no like them but...just give them a chance there really awesome guys ( i met them) they are nice and they dont care who you are and they want you to know that u may not be in the in crowd because your better then that..there mus

In [16]:
print('Negative center index:',neg_review_id.lookup(negative_center))
print('center review_id:',negative_center.split('+')[0])
print('center sentence:',negative.filter(F.col('review_id') == negative_center).collect()[0][1])

print('\n-------------------------------------------------------------------------------------\n')

for idx,p in enumerate(negative_neighbors):
    print(str(idx+1) + ' nearest neighbor index:' 
          + str(p[3]) + '\nreview_id:' + p[0][:-2] 
          + '\nsentence:' + p[2] + '\n')
       

VBox()

Negative center index: [2511]
center review_id: R2K76T2GLNBUB
center sentence: <br />Anyways, just to emphasize what most of the other reviews say, GC isn't punk, &quot;pop-punk&quot; isn't a category, MTV has ruined mainstream music, people that say GC is &quot;punk&quot; are just blonde cheerleaders and Avril Lavigne wannabes that have sub-human mental capacity, so, can you really blame them for getting the facts mixed up?<br />Also, GC fans, quit saying this band is talented, because they are not, and I bet all of you don't know what a scale or arpeggio is even if your life depended on it.<br />If you want simple three-chord rock that's good, get any MISFITS album, because they are three-chord rock, and they are better than GC...oh, wait a minute, they are not better, they are actually good, unlike GC, the most talent lacking band ever.<br />How they have so many fans baffles me.<br />In closing, if these guys still have their homes a few years from now, there is no justice in the w