In [None]:
# Topic Modeling

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]').config("spark.executor.memory", "1g").config("spark.driver.memory", "2g").appName('spark_ml_bumble').getOrCreate()
#for cluster - change kernel to PySpark

In [2]:
bumble = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('bumble_google_play_reviews.csv', header=True)
bumble.show()

+--------------------+-------------------+--------------------+--------------------+-----+-------------+--------------------+--------------+--------------------+------------+
|            reviewId|           userName|           userImage|             content|score|thumbsUpCount|reviewCreatedVersion|            at|        replyContent|   repliedAt|
+--------------------+-------------------+--------------------+--------------------+-----+-------------+--------------------+--------------+--------------------+------------+
|gp:AOqpTOEhR4C-Ep...|    Mike Kulasinski|https://play-lh.g...|    Nice and smooth.|    5|           52|                null|11/29/15 23:15|                null|        null|
|gp:AOqpTOHBUyrp_I...|   Alexander Khozya|https://play-lh.g...|At last we have A...|    5|           60|               1.0.0|11/30/15 21:27|                null|        null|
|gp:AOqpTOEjLo1SOe...|             Phil T|https://play-lh.g...|Finally, an app w...|    4|            2|               1.0.1|

In [3]:
#columns_to_drop = ['userImage', 'reviewCreatedVersion']
bumble = bumble.drop(*['userImage','reviewCreatedVersion','at','repliedAt','reviewId','userName'])
bumble.show()

+--------------------+-----+-------------+--------------------+
|             content|score|thumbsUpCount|        replyContent|
+--------------------+-----+-------------+--------------------+
|    Nice and smooth.|    5|           52|                null|
|At last we have A...|    5|           60|                null|
|Finally, an app w...|    4|            2|                null|
|            Finally!|    5|           76|                null|
|       Finally here!|    5|           54|                null|
|             Finally|    5|           20|                null|
|          Works well|    5|           12|                null|
|My operating syst...|    2|            3|Hey there. Thanks...|
|       Trying it out|    5|           14|                null|
|I have the app on...|    5|           14|                null|
|Tinder was gettin...|    5|           10|                null|
|So I've begun con...|    2|            6|Hey there. Thanks...|
|It took a year to...|    1|            

In [4]:
from pyspark.sql import functions as f

reviews_respond = bumble.filter("replyContent is not NULL").withColumn('sentiment', f.when(f.col('score') >= 4, 'positive').otherwise('negative')).select('content','sentiment')
reviews_respond.show()

+--------------------+---------+
|             content|sentiment|
+--------------------+---------+
|My operating syst...| negative|
|So I've begun con...| negative|
|Pretty sure 90% o...| negative|
|I am unable to si...| negative|
|The app goes thro...| negative|
|I have a galaxy s...| negative|
|Waited all this t...| negative|
|I don't have Face...| negative|
|I can swipe right...| negative|
|It ask for my bir...| negative|
|I'm not showing a...| negative|
|Yay.  A slightly ...| negative|
|Seems like great ...| negative|
|Trying to login.....| negative|
|Like others have ...| negative|
|Great concept. Bu...| negative|
|It's 95 percent f...| negative|
|Glitchy, should h...| negative|
|            Strange.| negative|
|   90% fake profiles| negative|
+--------------------+---------+
only showing top 20 rows



In [5]:
# sql for distribution

reviews_respond.groupBy('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| positive| 9582|
| negative|51976|
+---------+-----+



In [6]:
reviews_norespond = bumble.filter("replyContent is NULL").withColumn('sentiment', f.when(f.col('score') >= 4, 'positive').otherwise('negative')).select('content','sentiment')
reviews_norespond.show()

+--------------------+---------+
|             content|sentiment|
+--------------------+---------+
|    Nice and smooth.| positive|
|At last we have A...| positive|
|Finally, an app w...| positive|
|            Finally!| positive|
|       Finally here!| positive|
|             Finally| positive|
|          Works well| positive|
|       Trying it out| positive|
|I have the app on...| positive|
|Tinder was gettin...| positive|
|It took a year to...| negative|
|Just downloaded a...| positive|
|Works fine but ca...| positive|
|The UI is pretty ...| positive|
|Swiping is easy b...| negative|
|The app is great,...| negative|
|Push notification...| positive|
|Lots of fake prof...| negative|
|'lotta talent on ...| positive|
|The women are too...| positive|
+--------------------+---------+
only showing top 20 rows



In [7]:
# sql for distribution

reviews_norespond.groupBy('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| positive|35356|
| negative| 9040|
+---------+-----+



In [8]:
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft
import pyspark.ml.clustering as clus
from pyspark.ml import Pipeline

#remove html tags in the text with regular expression
reviews_respond = reviews_respond.withColumn('content', fn.regexp_replace(fn.col("content"), '<[^>]+>', ' '))
reviews_norespond = reviews_norespond.withColumn('content', fn.regexp_replace(fn.col("content"), '<[^>]+>', ' '))

#remove special characters and line breaks in the text with regular expression
reviews_respond = reviews_respond.withColumn('content', fn.regexp_replace(fn.col("content"), '([^\s\w_]|_)+', ' ')).withColumn('content', fn.regexp_replace(fn.col("content"), '[\n\r]', ' '))
reviews_norespond = reviews_norespond.withColumn('content', fn.regexp_replace(fn.col("content"), '([^\s\w_]|_)+', ' ')).withColumn('content', fn.regexp_replace(fn.col("content"), '[\n\r]', ' '))

reviews_respond.take(1)

[Row(content='My operating system has autorotation of images disabled yet in gallery view images are rotating anyhow  Please fix ', sentiment='negative')]

In [9]:
# reviews with responses

tokenizer = ft.RegexTokenizer(inputCol='content', outputCol='review_tok', pattern='\s+|[,.\"/!]')
tokenizer.transform(reviews_respond).select('review_tok').take(1)

stopwordList = ['bumble','app','even','like','m','ve','k','re','h','n','ll'] 
stopwordList.extend(ft.StopWordsRemover().getStopWords())
stopwordList = list(set(stopwordList))
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='review_stop', stopWords=stopwordList)
stopwords.transform(tokenizer.transform(reviews_respond)).select('review_stop').take(1)

#tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf')
tokenized = stopwords.transform(tokenizer.transform(reviews_respond))
tf.fit(tokenized).transform(tokenized).select('review_tf').take(1)

lda = clus.LDA(k=10, optimizer='online', maxIter=10, featuresCol=tf.getOutputCol())

pipeline = Pipeline(stages=[tokenizer,stopwords,tf,lda])
pipeline_model = pipeline.fit(reviews_respond)

vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(10)                                                                                             
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)



topic: 0
*************************
good
time
facebook
waste
get
one
worst
use
match
account
*************************
topic: 1
*************************
y
time
que
people
doesn
profile
get
use
women
swipe
*************************
topic: 2
*************************
people
trash
get
matches
back
one
account
best
men
first
*************************
topic: 3
*************************
money
dating
time
back
never
waste
without
free
first
use
*************************
topic: 4
*************************
people
match
message
one
day
matches
pay
bad
good
want
*************************
topic: 5
*************************
matches
get
one
first
message
people
never
day
de
women
*************************
topic: 6
*************************
matches
people
time
meh
money
get
one
see
make
dating
*************************
topic: 7
*************************
message
match
shite
matches
women
account
many
people
entertaining
know
*************************
topic: 8
*************************
pay
matches
mone

In [10]:
# reviews without responses 

tokenizer = ft.RegexTokenizer(inputCol='content', outputCol='review_tok', pattern='\s+|[,.\"/!]')
tokenizer.transform(reviews_norespond).select('review_tok').take(1)

stopwordList = ['bumble','app','even','like','m','ve','k','re','h','n','ll'] 
stopwordList.extend(ft.StopWordsRemover().getStopWords())
stopwordList = list(set(stopwordList))
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='review_stop', stopWords=stopwordList)
stopwords.transform(tokenizer.transform(reviews_norespond)).select('review_stop').take(1)

#tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol='review_tf')
tokenized = stopwords.transform(tokenizer.transform(reviews_norespond))
tf.fit(tokenized).transform(tokenized).select('review_tf').take(1)

# 10 topics
lda = clus.LDA(k=10, optimizer='online', maxIter=10, featuresCol=tf.getOutputCol())

pipeline = Pipeline(stages=[tokenizer,stopwords,tf,lda])
pipeline_model = pipeline.fit(reviews_norespond)

vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(10)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
nice
ok
dating
amazing
use
easy
people
good
really
best
*************************
topic: 1
*************************
piece
matches
use
mast
classy
people
pretty
g
easy
revealing
*************************
topic: 2
*************************
tinder
people
better
get
women
first
time
matches
dating
match
*************************
topic: 3
*************************
fantastic
top
way
ich
und
4
guys
similar
day
actually
*************************
topic: 4
*************************
best
dating
works
get
hai
eh
wow
people
great
awsome
*************************
topic: 5
*************************
solid
use
matter
easy
lives
best
many
black
woman
great
*************************
topic: 6
*************************
o
la
que
de
people
fun
aight
superb
match
great
*************************
topic: 7
*************************
gr8
fabulous
greatest
issue
one
amendment
2nd
profiles
workin
terrific
*************************
topic: 8
*************************
nahi
great
c
si