# NLP - Finding Topics of Restaurants

## Setting up

In [None]:
#Python Stuff
import pandas as pd
from functools import reduce
import matplotlib.pyplot as plt

#PySpark Stuff
from pyspark.sql.functions import isnan, when, count, col, avg
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, BooleanType, DateType, FloatType

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-demo') \
  .getOrCreate()

In [10]:
bucket = "big-data-yelp"
spark.conf.set('temporaryGcsBucket', bucket)

## Data Preparation

### Loading Business

In [19]:
table = 'red-formula-339716:gfds.yelp_business_basicdata'
df_b = spark.read.format('bigquery').option('table', table).load()

#### Flitering Restaurants and Food

We will be doing restuarants in Ohio only for this project.

In [12]:
df_r = df_b.filter(df_b.state.like("OH")).filter(df_b.categories.like("%Restaurants%")).drop('int64_field_0', 'categories','review_count', 'address','city','postal_code', 'name','latitude', 'longitude', 'stars', 'is_open')

In [13]:
del df_b

### Loading Reviews

In [14]:
table_re = 'red-formula-339716:gfds.yelp_review'
df_re = spark.read.format('bigquery').option('table', table_re).load()

In [15]:
df_re = df_re.drop('cool','funny','useful', 'date', 'review_id', 'compliment_count')

### Joining and Group Reviews by Businesses

In [16]:
dfj = df_r.join(df_re, ['business_id'], "inner").drop('user_id')

In [10]:
# Checking for null values
dfj.select([count(when(isnan(c), c)).alias(c) for c in dfj.columns]).show()

                                                                                

+-----------+-----+-----+----+
|business_id|state|stars|text|
+-----------+-----+-----+----+
|          0|    0|    0|   0|
+-----------+-----+-----+----+



In [11]:
del df_re, df_r

In [18]:
dff = dfj.groupby("business_id").agg(F.concat_ws(", ", F.collect_list(dfj.text)).alias('text'))
dff.show(3)



+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|--_nBudPOb1lNRgKf...|I would have neve...|
|-QOl03c2B22yi_On0...|Came in traveling...|
|-Qos8tv2j6lj9uMxV...|Hot Tots

   So m...|
+--------------------+--------------------+
only showing top 3 rows



                                                                                

In [14]:
dff.count()

                                                                                

4377

## NLP 

In [16]:
# Run these to use nltk on GCP
#! pip install --upgrade nltk
#! python -m nltk.downloader punkt
#import nltk
#nltk.download('stopwords')

Got insporation from: Distributed Topic Modelling using Spark NLP and Spark MLLib(LDA)   
https://medium.com/analytics-vidhya/distributed-topic-modelling-using-spark-nlp-and-spark-mllib-lda-6db3f06a4da3

In [23]:
# Import Spark NLP
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml import Pipeline

In [24]:
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document") \
    .setCleanupMode("shrink")

In [25]:
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

In [26]:
# clean unwanted characters and garbage
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

In [28]:
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')
eng_stopwords.extend(['pass', 'way', 'go', 'one', 'on', 'would', 'going', 'dd', 'yeah', 'lady', 'although', 'place', 'area', 'another', 'goes', 'got', 'im', 'even', 'getting', 'that', 'dai', 'maybe', 'wonder', 'hope', 'went', 'want', 'door', 'still', 'could', 'include', 'hope', 'make', 'theyr', 'back', 'almost', 'anything', 'across', 'need', 'thing', 'sure', 'ago', 'case', 'man', 'ivy', 'get', 'on', 'came', 'come', 'Harvard', 'ran', 'u', 'made', 'part', 'nothing', 'wont'])
# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setStopWords(eng_stopwords) \
      .setCaseSensitive(False)

In [29]:
# stem the words to bring them to the root form.
stemmer = Stemmer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("stem")

In [30]:
finisher = Finisher() \
    .setInputCols(["stem"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

In [31]:
nlp_pipeline = Pipeline(stages=[document_assembler, \
                                tokenizer,          \
                                normalizer,         \
                                stopwords_cleaner,  \
                                stemmer,            \ 
                                finisher])

In [32]:
nlp_model = nlp_pipeline.fit(dff)
processed_df  = nlp_model.transform(dff)
tokens_df1 = processed_df.select('business_id','tokens')

In [35]:
tokens_df1.show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|         business_id|              tokens|
+--------------------+--------------------+
|--_nBudPOb1lNRgKf...|[food, great, car...|
|-QOl03c2B22yi_On0...|[amaz, veget, sou...|
|-Qos8tv2j6lj9uMxV...|[excit, try, glad...|
|-RpXYkc-4WDM4iJhg...|[mean, usual, don...|
|-YsHP7zcVcCWZ4wye...|[great, good, foo...|
|-h7leD9mwrnV0JDa3...|[fridai, slow, te...|
|-kgm5IIE54ncW6Ssa...|[whoa, stuff, ama...|
|-mkYw8bj6B9CDqkm6...|[cool, spot, sunb...|
|01bpUEQYIRXAfDThD...|[pleasant, atmosp...|
|023THgSmMx1Q_FSUn...|[soda, size, paye...|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

## CountVectorizer

In [37]:
from pyspark.ml.feature import CountVectorizer

In [38]:
cv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=100, minDF=3.0)
cv_model = cv.fit(tokens_df1)
vectorized_tokens = cv_model.transform(tokens_df1)

## LDA

In [41]:
from pyspark.ml.clustering import LDA
num_topics = 7
lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))



The lower bound on the log likelihood of the entire corpus: -21224873.612972133
The upper bound on perplexity: 4.375077270399759


                                                                                

In [42]:
# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()   
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)



topic: 0
*************************
beer
good
great
burger
food
bar
time
like
servic
order
*************************
topic: 1
*************************
food
chicken
good
flavor
restaur
try
great
best
love
columbu
*************************
topic: 2
*************************
restaur
servic
food
u
order
great
good
drink
tabl
dinner
*************************
topic: 3
*************************
order
food
like
time
good
great
locat
servic
realli
dont
*************************
topic: 4
*************************
pizza
order
good
great
time
like
food
servic
sauc
chees
*************************
topic: 5
*************************
food
good
order
great
servic
time
like
restaur
realli
chicken
*************************
topic: 6
*************************
sandwich
salad
good
side
chees
great
love
order
delici
lunch
*************************


                                                                                

In [48]:
from pyspark.sql.functions import udf
def topic_prediction(my_document):
    input1 = [my_document]
   # processed_string = nlp_model.transform(string_input)
    transformed_string = cv_model.transform(input1)    
    return lda_model.transform(transformed_string)
topic_prediction_udf = udf(topic_prediction, IntegerType())

In [49]:
topic_predictions = model.transform(vectorized_tokens)

In [50]:
topic_predictions.show(3)

[Stage 99:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+
|         business_id|              tokens|            features|   topicDistribution|
+--------------------+--------------------+--------------------+--------------------+
|--_nBudPOb1lNRgKf...|[quesadilla, fren...|(100,[0,1,2,3,4,5...|[4.58359064709114...|
|-QOl03c2B22yi_On0...|[love, alwai, sto...|(100,[0,1,2,3,4,5...|[8.58646743685032...|
|-Qos8tv2j6lj9uMxV...|[famili, great, e...|(100,[0,1,2,3,4,5...|[0.01066670297647...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



                                                                                

In [51]:
max_index = F.udf(lambda x: x.tolist().index(max(x)), IntegerType())
topic_predictions = topic_predictions.withColumn("topicID", max_index("topicDistribution"))

In [54]:
topic_predictions.groupBy('topicID').count().show()



+-------+-----+
|topicID|count|
+-------+-----+
|      1|  648|
|      6|  139|
|      3| 1512|
|      5|  790|
|      4|  559|
|      2|  288|
|      0|  441|
+-------+-----+



                                                                                

In [53]:
b_topics = topic_predictions.select('business_id', 'topicID')

### Saving to BigQuery

In [55]:
table = 'red-formula-339716:gfds.b_id_with_topic'
b_topics.write.format('bigquery') \
  .option('table', table) \
  .save()

                                                                                