In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local').appName('Recommender').getOrCreate()

In [3]:
df_books = spark.read.json("metaBooks.json")

In [4]:
df_books = df_books.select("asin","title","description")

In [5]:
df_books.show()

+----------+--------------------+--------------------+
|      asin|               title|         description|
+----------+--------------------+--------------------+
|0001048791|The Crucible: Per...|                null|
|0001048775|Measure for Measu...|William Shakespea...|
|0001048236|The Sherlock Holm...|&#34;One thing is...|
|0000401048|The rogue of publ...|                null|
|0001019880|Classic Soul Winn...|                null|
|0001048813|Archer Christmas ...|                null|
|0001148427| Sonatas - For Piano|                null|
|0001057170|Classic Connolly ...|[Editor's Note: T...|
|0001047566|       Hand in Glove|                null|
|0001053396|War Poems: An Ant...|Writing poetry ha...|
|0000913154|The Way Things Wo...|                null|
|0001072986|As You Like it: C...|William Shakespea...|
|0001053744| Pearl and Sir Orfeo|While many reader...|
|0001055178|      Master Georgie|Beryl Bainbridge ...|
|0001064487|Celebremos Su Gloria|                null|
|000104233

In [6]:
df_books = df_books.fillna("",subset=['title'])

In [7]:
df_books = df_books.fillna("",subset=['description'])

In [8]:
df_books.show()

+----------+--------------------+--------------------+
|      asin|               title|         description|
+----------+--------------------+--------------------+
|0001048791|The Crucible: Per...|                    |
|0001048775|Measure for Measu...|William Shakespea...|
|0001048236|The Sherlock Holm...|&#34;One thing is...|
|0000401048|The rogue of publ...|                    |
|0001019880|Classic Soul Winn...|                    |
|0001048813|Archer Christmas ...|                    |
|0001148427| Sonatas - For Piano|                    |
|0001057170|Classic Connolly ...|[Editor's Note: T...|
|0001047566|       Hand in Glove|                    |
|0001053396|War Poems: An Ant...|Writing poetry ha...|
|0000913154|The Way Things Wo...|                    |
|0001072986|As You Like it: C...|William Shakespea...|
|0001053744| Pearl and Sir Orfeo|While many reader...|
|0001055178|      Master Georgie|Beryl Bainbridge ...|
|0001064487|Celebremos Su Gloria|                    |
|000104233

In [9]:
from pyspark.sql import functions as f

In [10]:
df_books = df_books.withColumn("combined",f.concat(f.col('title'),f.lit(' '), f.col('description')))

In [11]:
df_books.show()

+----------+--------------------+--------------------+--------------------+
|      asin|               title|         description|            combined|
+----------+--------------------+--------------------+--------------------+
|0001048791|The Crucible: Per...|                    |The Crucible: Per...|
|0001048775|Measure for Measu...|William Shakespea...|Measure for Measu...|
|0001048236|The Sherlock Holm...|&#34;One thing is...|The Sherlock Holm...|
|0000401048|The rogue of publ...|                    |The rogue of publ...|
|0001019880|Classic Soul Winn...|                    |Classic Soul Winn...|
|0001048813|Archer Christmas ...|                    |Archer Christmas ...|
|0001148427| Sonatas - For Piano|                    |Sonatas - For Piano |
|0001057170|Classic Connolly ...|[Editor's Note: T...|Classic Connolly ...|
|0001047566|       Hand in Glove|                    |      Hand in Glove |
|0001053396|War Poems: An Ant...|Writing poetry ha...|War Poems: An Ant...|
|0000913154|

In [12]:
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF

In [13]:
df_books_rdd = df_books.rdd

In [14]:
book_text = df_books_rdd.map(lambda x: x['combined']).filter(lambda y: y is not None)

In [15]:
StopWords = stopwords.words("english")

In [16]:
tokens = book_text                                                   \
    .map( lambda document: document.strip().lower())               \
    .map( lambda document: re.split(" ", document))          \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 3] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \
    .zipWithIndex()

In [17]:
df_txts = spark.createDataFrame(tokens, ["list_of_words",'index'])

In [18]:
df_txts.show()

+--------------------+-----+
|       list_of_words|index|
+--------------------+-----+
|[performed, stuar...|    0|
|[measure, complet...|    1|
|[sherlock, holmes...|    2|
|[rogue, confessio...|    3|
|[classic, soul, t...|    4|
|[archer, christma...|    5|
|    [sonatas, piano]|    6|
|[classic, connoll...|    7|
|       [hand, glove]|    8|
|[anthology, poetr...|    9|
|[things, illustra...|   10|
|[like, complete, ...|   11|
|[pearl, orfeo, ma...|   12|
|[master, georgie,...|   13|
|[celebremos, gloria]|   14|
|[complete, unabri...|   15|
|[laurel, hardy, b...|   16|
|[classic, connoll...|   17|
|[guess, guess, co...|   18|
|       [foot, piper]|   19|
+--------------------+-----+
only showing top 20 rows



In [19]:
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)


In [None]:
# IDF
idf = IDF(inputCol="list_of_words", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)

In [19]:
result_cv.show()

+--------------------+-----+--------------------+
|       list_of_words|index|        raw_features|
+--------------------+-----+--------------------+
|[performed, stuar...|    0|(5000,[1338,2461,...|
|[measure, complet...|    1|(5000,[14,124,453...|
|[sherlock, holmes...|    2|(5000,[0,8,13,19,...|
|[rogue, confessio...|    3|(5000,[1145,4182]...|
|[classic, soul, t...|    4|(5000,[191,364,79...|
|[archer, christma...|    5|(5000,[697,2773],...|
|    [sonatas, piano]|    6| (5000,[1488],[1.0])|
|[classic, connoll...|    7|(5000,[4,5,13,25,...|
|       [hand, glove]|    8|  (5000,[818],[1.0])|
|[anthology, poetr...|    9|(5000,[14,15,23,4...|
|[things, illustra...|   10|(5000,[179,300,35...|
|[like, complete, ...|   11|(5000,[14,20,124,...|
|[pearl, orfeo, ma...|   12|(5000,[6,13,15,30...|
|[master, georgie,...|   13|(5000,[4,5,8,10,1...|
|[celebremos, gloria]|   14|        (5000,[],[])|
|[complete, unabri...|   15|(5000,[14,124,453...|
|[laurel, hardy, b...|   16|(5000,[0,463],[1....|


IllegalArgumentException: 'requirement failed: Column list_of_words must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually array<string>.'

In [20]:
result_tfidf.show()

+--------------------+-----+--------------------+--------------------+
|       list_of_words|index|        raw_features|            features|
+--------------------+-----+--------------------+--------------------+
|[performed, stuar...|    0|(5000,[1338,2461,...|(5000,[1338,2461,...|
|[measure, complet...|    1|(5000,[14,124,453...|(5000,[14,124,453...|
|[sherlock, holmes...|    2|(5000,[0,8,13,19,...|(5000,[0,8,13,19,...|
|[rogue, confessio...|    3|(5000,[1145,4182]...|(5000,[1145,4182]...|
|[classic, soul, t...|    4|(5000,[191,364,79...|(5000,[191,364,79...|
|[archer, christma...|    5|(5000,[697,2773],...|(5000,[697,2773],...|
|    [sonatas, piano]|    6| (5000,[1488],[1.0])|(5000,[1488],[5.9...|
|[classic, connoll...|    7|(5000,[4,5,13,25,...|(5000,[4,5,13,25,...|
|       [hand, glove]|    8|  (5000,[818],[1.0])|(5000,[818],[5.25...|
|[anthology, poetr...|    9|(5000,[14,15,23,4...|(5000,[14,15,23,4...|
|[things, illustra...|   10|(5000,[179,300,35...|(5000,[179,300,35...|
|[like

In [21]:
result_tfidf_rdd = result_tfidf.select('index','features').rdd

In [22]:
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel

In [23]:
num_topics = 10
max_iterations = 100

In [None]:
lda_model = LDA.train(result_tfidf_rdd.mapValues(Vectors.fromML).map(list), k=num_topics, maxIterations=max_iterations)

In [None]:
wordNumbers = 5  
topicIndices = sc.parallelize(lda_model.describeTopics(maxTermsPerTopic = wordNumbers))

In [None]:
from pyspark.ml.clustering import KMeans, KMeansModel

kmeans = KMeans() \
          .setK(3) \
          .setFeaturesCol("indexedFeatures")\
          .setPredictionCol("cluster")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, kmeans])

model = pipeline.fit(transformed)

cluster = model.transform(transformed)

In [26]:
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\nandi\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True