In [1]:
from __future__ import print_function
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark import SparkContext, SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array, struct, split, explode, udf, col, collect_list
from  pyspark.sql import functions as f
from pyspark.sql.functions import concat, col, lit

conf = SparkConf().setMaster("local[*]").setAppName('haha')
sc = SparkContext(conf=conf)
ss = SparkSession(sc)

# load raw data

In [2]:
review = ss.read.json('Books_review.json')
meta = ss.read.json('meta_Books.json')

In [3]:
#review.count()

In [4]:
print(review.columns)
print(meta.columns)

['asin', 'helpful', 'overall', 'reviewText', 'reviewTime', 'reviewerID', 'reviewerName', 'summary', 'unixReviewTime']
['_corrupt_record', 'asin', 'brand', 'categories', 'description', 'imUrl', 'price', 'related', 'salesRank', 'title']


# for review data, we only focus on 2012, 2013, 2014

In [5]:
df2012 = review.where(review.reviewTime.contains('2012'))
df2013 = review.where(review.reviewTime.contains('2013'))
df2014 = review.where(review.reviewTime.contains('2014'))
reviews = df2012.union(df2013).union(df2014)   # 6132506

# concat reviewText and summary together  ==> textss

In [6]:
reviews = reviews.select('asin',
                        concat(col("reviewText"),lit(""),col("summary")).alias('textss'))
# here, asin is not distinct

In [7]:
#reviews.show(5)

# for meta data, we only need 4 columns 

In [8]:
meta = meta.drop_duplicates(['asin']).select('asin', 'title','categories', 'description')

In [9]:
#meta.show(5)

# review data group by asin

In [10]:
reviews_gb = reviews.groupBy("asin").agg(f.concat_ws(" ", f.collect_list('textss')).alias('textss'))

# join review data and meta data

In [11]:
df = reviews_gb.join(meta, 'asin', 'inner').cache()

In [12]:
#df.show(5)

# concat all TEXT info together

In [13]:
df = df.select('asin','title','categories',
                concat(col("textss"),lit(""),col("description")).alias('reviewText'))

In [14]:
df.select('asin').distinct().count()  # asin is distinct in this dataframe df

83946

In [15]:
#~df.show(5)

In [16]:
dff = df.na.drop(subset=["reviewText"])  # drop nulls /  from 277342 to 169873

In [17]:
from pyspark.sql.functions import monotonically_increasing_id 
dff = dff.select("*").withColumn("id", monotonically_increasing_id())

# FINALLY, get the nice table we want. 👇

In [18]:
dff.show(5)

+----------+--------------------+----------+--------------------+---+
|      asin|               title|categories|          reviewText| id|
+----------+--------------------+----------+--------------------+---+
|1481916025|Acquiring Trouble...| [[Books]]|Awesome!  Does ev...|  0|
|1481948725|The Shadow Editio...| [[Books]]|I'll say one thin...|  1|
|1482336251|         Sharp Edges| [[Books]]|There were a few ...|  2|
|1482571145|Pull: A Seaside N...| [[Books]]|Amazing!  I loved...|  3|
|1482635720|    The Kona Shuffle| [[Books]]|This was an inter...|  4|
+----------+--------------------+----------+--------------------+---+
only showing top 5 rows



In [19]:
#dff.count()

# get the meaningful words

In [20]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words") #other languages+capital letters?
df_w_words = tokenizer.transform(dff)
remover = StopWordsRemover(inputCol="words", outputCol="mf_words")  # need to add in "book"
df_w_mfwords = remover.transform(df_w_words)
df_w_mfwords.show(3)

+----------+--------------------+----------+--------------------+---+--------------------+--------------------+
|      asin|               title|categories|          reviewText| id|               words|            mf_words|
+----------+--------------------+----------+--------------------+---+--------------------+--------------------+
|1481916025|Acquiring Trouble...| [[Books]]|Awesome!  Does ev...|  0|[awesome!, , does...|[awesome!, , ever...|
|1481948725|The Shadow Editio...| [[Books]]|I'll say one thin...|  1|[i'll, say, one, ...|[say, one, thing,...|
|1482336251|         Sharp Edges| [[Books]]|There were a few ...|  2|[there, were, a, ...|[twist, &turns;, ...|
+----------+--------------------+----------+--------------------+---+--------------------+--------------------+
only showing top 3 rows



# tfidf

In [21]:
hashingTF = HashingTF(inputCol="mf_words", outputCol="tf", numFeatures=200) #need to know how to choose the numFeatures
tf = hashingTF.transform(df_w_mfwords)  # reviews_w_feature  == tf
idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf)
tfidf.show(3)

+----------+--------------------+----------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|      asin|               title|categories|          reviewText| id|               words|            mf_words|                  tf|               tfidf|
+----------+--------------------+----------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|1481916025|Acquiring Trouble...| [[Books]]|Awesome!  Does ev...|  0|[awesome!, , does...|[awesome!, , ever...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|
|1481948725|The Shadow Editio...| [[Books]]|I'll say one thin...|  1|[i'll, say, one, ...|[say, one, thing,...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|
|1482336251|         Sharp Edges| [[Books]]|There were a few ...|  2|[there, were, a, ...|[twist, &turns;, ...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|
+----------+--------------------+----------+--------------------+---+-------

In [22]:
# tfidf.select('tfidf')

In [23]:
feature_vecs = tfidf.select('tfidf').rdd.map(lambda x: x[0]).collect()  

In [24]:
#full_matrix=[]
#for i in range(len(feature_vecs)):
#    fea_vec=[]
#    for i in range(len(feature_vecs[0])):
#        temp = feature_vecs[0][i]
#        fea_vec.append(temp)
#    full_matrix.append(fea_vec)


In [25]:
#full_matrix

## Clustering

In [26]:
#Features_matrix_rdd = sc.parallelize(full_matrix)

In [27]:
#Features_matrix_rdd.take(1)

In [28]:
#Features_matrix_df = ss.createDataFrame(Features_matrix_rdd)

In [29]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
l2NormData = normalizer.transform(feature_vecs)

from pyspark.ml.clustering import KMeans, LDA, LDAModel

kmeans = KMeans().setK(10).setMaxIter(20)
km_model = kmeans.fit(l2NormData)
clustersTable = km_model.transform(l2NormData)

AttributeError: 'list' object has no attribute '_jdf'

In [None]:
centers = km_model.clusterCenters()

In [None]:
clustersTable.groupBy("prediction").count().show()

In [None]:
lda = LDA(k=2, seed=1, optimizer="em")
model = lda.fit(l2NormData)
model.describeTopics().show()

In [None]:
model.topicsMatrix()

In [None]:
lda_path = temp_path + "/lda"
lda.save(lda_path)
sameLDA = LDA.load(lda_path)
distributed_model_path = temp_path + "/lda_distributed_model"
model.save(distributed_model_path)
sameModel = DistributedLDAModel.load(distributed_model_path)
local_model_path = temp_path + "/lda_local_model"
localModel.save(local_model_path)
sameLocalModel = LocalLDAModel.load(local_model_path)