In [54]:
# Import findspark to initialize spark session in notebook
import findspark

findspark.init()

In [55]:
# Import pyspark packages
import pyspark
from pyspark.sql.types import StructType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, count, explode
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [56]:
# Start SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
# Yelp Business Dataset
yelp_business_df = spark.read.format("csv").option("header", "true").option("multiline","true").load(r"C:\Users\vaibh\MSA 8050\FinalProject\yelp_business.csv")
yelp_business_df2 = yelp_business_df.select("business_id", "name", "stars", "review_count", "is_open", "categories")
yelp_business_df2.show()
yelp_business_df2.count()

+--------------------+--------------------+------------+------------+-------+--------------------+
|         business_id|                name|       stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|"""Dental by Desi...|-111.9785992|         4.0|     22|                   1|
|He-G7vWjzVUysIKrf...|"""Stephen Szabo ...|         3.0|          11|      1|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|"""Western Motor ...|-112.1153098|         1.5|     18|                   1|
|8DShNS-LuFqpEWIp0...|"""Sports Authori...|-111.9647254|         3.0|      9|                   0|
|PfOCPjBrlQAnz__NX...|"""Brick House Ta...|         3.5|         116|      1|American (New);Ni...|
|o9eMRCWt5PkpLDE0g...|       """Messina"""|         4.0|           5|      1| Italian;Restaurants|
|kCoE3jvEtg6UVz5SO...|    """BDJ Realty"""|  -115.26846|         4.0|      5|                   1|
|OD2hnuuTJ

174567

In [6]:
# Clean the dataset
yelp_business_df3 = yelp_business_df2.dropna(subset=["business_id", "name", "stars", "review_count", "is_open", "categories"])
yelp_business_df3.show()
yelp_business_df3.printSchema()
yelp_business_df3.count()

+--------------------+--------------------+------------+------------+-------+--------------------+
|         business_id|                name|       stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|"""Dental by Desi...|-111.9785992|         4.0|     22|                   1|
|He-G7vWjzVUysIKrf...|"""Stephen Szabo ...|         3.0|          11|      1|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|"""Western Motor ...|-112.1153098|         1.5|     18|                   1|
|8DShNS-LuFqpEWIp0...|"""Sports Authori...|-111.9647254|         3.0|      9|                   0|
|PfOCPjBrlQAnz__NX...|"""Brick House Ta...|         3.5|         116|      1|American (New);Ni...|
|o9eMRCWt5PkpLDE0g...|       """Messina"""|         4.0|           5|      1| Italian;Restaurants|
|kCoE3jvEtg6UVz5SO...|    """BDJ Realty"""|  -115.26846|         4.0|      5|                   1|
|OD2hnuuTJ

In [7]:
# Filtering Data
yelp_business_df4 = yelp_business_df3.filter(yelp_business_df3.stars.isin('1.0', '1.5', '2.0', '2.5', '3.0', '3.5', 
                                                                          '4.0', '4.5', '5.0'))
yelp_business_df5 = yelp_business_df4.filter(yelp_business_df4.categories.like('%Restaurants%'))

yelp_business_df5 = yelp_business_df5.orderBy('stars', ascending=True)
yelp_business_df5.show()
yelp_business_df5.count()

+--------------------+--------------------+-----+------------+-------+--------------------+
|         business_id|                name|stars|review_count|is_open|          categories|
+--------------------+--------------------+-----+------------+-------+--------------------+
|SkjO4uWLRg4NF198y...|     """Pizzaiolo"""|  1.0|           3|      1|Pizza;Gluten-Free...|
|RScKS72p6_AykRRwZ...|"""Roni's Pizzeri...|  1.0|           5|      0|   Pizza;Restaurants|
|OPdjBa5toeaVsDleg...|"""Authentic Pizz...|  1.0|           3|      0|   Restaurants;Pizza|
|kgAFhUm4sim7RPnND...|"""Papa John's Pi...|  1.0|           7|      1|   Pizza;Restaurants|
|0dXCoyvcvW3Lx1D8O...| """Le Bistro 426"""|  1.0|           3|      1| Restaurants;Italian|
|FO2SNKF9I8mvTCAvb...| """Pizza Palermo"""|  1.0|           4|      1|   Restaurants;Pizza|
|QPpptKYmU3fZMi_Js...|   """Pizza Depot"""|  1.0|           4|      1|   Restaurants;Pizza|
|kkvhQBT1Oh73sVSYZ...|    """McDonald's"""|  1.0|           8|      1|Burgers;Fa

In [97]:
# Exporting data to csv for viewing purposes
yelp_business_df5.write.csv(r"C:\Users\vaibh\MSA 8050\FinalProject\yelp_business_test6.csv", header=True)

In [8]:
# Yelp Review Dataset
yelp_review_df = spark.read.format("csv").option("header", "true").option("multiline","true").load(r"C:\Users\vaibh\MSA 8050\FinalProject\yelp_review.csv")
yelp_review_df2 = yelp_review_df.select("business_id", "stars", "text")
yelp_review_df2.show()
yelp_review_df2.count()

+--------------------+-----+--------------------+
|         business_id|stars|                text|
+--------------------+-----+--------------------+
|AEx2SYEUJmTxVVB18...|    5|Super simple plac...|
|VR6GpWIda3SfvPC-l...|    5|Small unassuming ...|
|CKC0-MOWMqoeWf6s-...|    5|Lester's is locat...|
|ACFtxLv8pGrrxMm6E...|    4|Love coming here....|
|s2I_Ni76bjJNK9yG6...|    4|Had their chocola...|
|8QWPlVQ6D-OExqXoa...|    5|Cycle Pub Las Veg...|
|9_CGhHMz8698M9-Pk...|    4|Who would have gu...|
|gkCorLgPyQLsptTHa...|    4|Always drove past...|
|5r6-G9C4YLbC7Ziz5...|    3|Not bad!! Love th...|
|fDF_o2JPU8BR1Gya-...|    5|Love this place!
...|
|z8oIoCT1cXz7gZP5G...|    4|This is currently...|
|XWTPNfskXoUL-Lf32...|    3|Server was a litt...|
|13nKUHH-uEUXVZylg...|    1|I thought Tidy's ...|
|RtUvSWO_UZ8V3Wpj0...|    3|Wanted to check o...|
|Aov96CM4FZAXeZvKt...|    5|This place is awe...|
|0W4lkclzZThpx3V65...|    4|a must stop when ...|
|fdnNZMk1NP7ZhL-YM...|    1|I too have been t...|


In [9]:
# Clean the dataset
yelp_review_df3 = yelp_review_df2.dropna(subset=["business_id", "stars", "text"])
yelp_review_df3 = yelp_review_df3.withColumn("label", yelp_review_df3["stars"].cast("double"))
yelp_review_df3 = yelp_review_df3.select("business_id", "label", "text")

# Filter the dataset
yelp_review_df4 = yelp_review_df3.filter(yelp_review_df3.label.isin(1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 
                                                                          4.0, 4.5, 5.0))

yelp_review_df4 = yelp_review_df4.orderBy("label", ascending=True)
yelp_review_df4.show()
yelp_review_df4.printSchema()
yelp_review_df4.count()

+--------------------+-----+--------------------+
|         business_id|label|                text|
+--------------------+-----+--------------------+
|0Rni7ocMC_Lg2UH0l...|  1.0|This place is alw...|
|YvuniBBiSs66vx9Gt...|  1.0|The saga has fini...|
|3uBrRcIhbhed1xftL...|  1.0|Horrible customer...|
|JlNeaOymdVbE6_bub...|  1.0|We always go to t...|
|0g7Pr8OWl_t_7DUeY...|  1.0|I bought a groupo...|
|fdnNZMk1NP7ZhL-YM...|  1.0|I too have been t...|
|uN2oZDJGO078ExbbV...|  1.0|"I give up on thi...|
|l1_S1mfGbEMxfT1f9...|  1.0|Terrible service ...|
|zgQHtqX0gqMw1nlBZ...|  1.0|really excited to...|
|DGkjLTH7BiHp_sMgU...|  1.0|"Worst experience...|
|hjk3ox7w1akbEuOgT...|  1.0|Food is very blan...|
|806kkDGaRCJ4lZLRc...|  1.0|after reading the...|
|OwYElCdiJ1IGKVJ4-...|  1.0|"This whole place...|
|X29BLACjBhOg_P2US...|  1.0|after reading rev...|
|aNNOpMame_VEIIZUQ...|  1.0|"Pandora charms a...|
|pLZ9oZM8c6MNbRlg0...|  1.0|"THEY SELL WRECKE...|
|-dy1d0ohs4D8qkJoe...|  1.0|No...I've had it ...|


In [None]:
# Exporting data to csv for viewing purposes
yelp_review_df4.write.csv(r"C:\Users\vaibh\MSA 8050\FinalProject\yelp_review_test1.csv", header=True)

In [10]:
# Join tables on business_id. I want only to join records matching from both tables
result_join_df = yelp_business_df5.join(yelp_review_df4, on=['business_id'], how='inner')
result_join_df.show()
result_join_df.printSchema()
result_join_df.count()

+--------------------+--------------------+-----+------------+-------+--------------------+-----+--------------------+
|         business_id|                name|stars|review_count|is_open|          categories|label|                text|
+--------------------+--------------------+-----+------------+-------+--------------------+-----+--------------------+
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0|        1451|      1|Cajun/Creole;Stea...|  4.0|I brought my two ...|
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0|        1451|      1|Cajun/Creole;Stea...|  5.0|Excellent steakho...|
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0|        1451|      1|Cajun/Creole;Stea...|  2.0|I've been to this...|
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0|        1451|      1|Cajun/Creole;Stea...|  5.0|Steak! My first t...|
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0|        1451|      1|Cajun/Creole;Stea...|  3.0|Located in the Ve...|
|--9e1ONYQuAa-CB_R...|"""Delmonico Stea...|  4.0

In [11]:
result_join_df.select('label').distinct().show()

# Choose label and text for topic modeling
final_result_df = result_join_df.select('text', 'label')
final_result_df.show(5)
final_result_df.count()

+-----+
|label|
+-----+
|  5.0|
|  4.0|
|  3.0|
|  1.0|
|  2.0|
+-----+

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Super simple plac...|  5.0|
|Small unassuming ...|  5.0|
|Lester's is locat...|  5.0|
|Love coming here....|  4.0|
|Had their chocola...|  4.0|
+--------------------+-----+
only showing top 5 rows



In [12]:
# Now split dataset depending on label
one_star_df = final_result_df.filter(final_result_df.label.isin(1.0))
# one_star_df.show(5)
# one_star_df.count()

two_star_df = final_result_df.filter(final_result_df.label.isin(2.0))
#two_star_df.show(5)
# two_star_df.count()

three_star_df = final_result_df.filter(final_result_df.label.isin(3.0))
#three_star_df.show(5)
# three_star_df.count()

four_star_df = final_result_df.filter(final_result_df.label.isin(4.0))
#four_star_df.show(5)
# four_star_df.count()

five_star_df = final_result_df.filter(final_result_df.label.isin(5.0))
#five_star_df.show(5)
# five_star_df.count()

In [51]:
# Text Processing Packages
from nltk.corpus import stopwords
from bs4 import BeautifulSoup
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.ml.feature import RegexTokenizer
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import monotonically_increasing_id as mi
from wordcloud import WordCloud,STOPWORDS
import matplotlib.pyplot as plt

# Model Building packages
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.ml.clustering import LDA
from pyspark.sql.types import StringType, ArrayType

In [31]:
# Normalize and tokenize text
def normalize_func(df):
    extract_Text = udf(
        lambda d: BeautifulSoup(d, "lxml").get_text(strip=False), StringType())

    remove_Punc = udf(
        lambda s: re.sub(r'[^a-zA-Z0-9]|[0-9]', r' ', s).strip().lower(), StringType())

    # Remove HTML, puncuations, etc...
    normalized_df = df.withColumn("text", remove_Punc(extract_Text(df.text)))
    # one_star_normalized_df.show(5)

    # Tokenize
    tokenizer = RegexTokenizer(inputCol="text", outputCol="words",
                               gaps=True, pattern=r'\s+', minTokenLength=2)
    
    star_tokens_df = tokenizer.transform(normalized_df)
    
    # one_star_tokens_df.show(10)
    return star_tokens_df

In [22]:
one_star_df1 = normalize_func(one_star_df)
one_star_df1.show(5)

two_star_df1 = normalize_func(two_star_df)
two_star_df1.show(5)

three_star_df1 = normalize_func(three_star_df)
three_star_df1.show(5)

four_star_df1 = normalize_func(four_star_df)
four_star_df1.show(5)

five_star_df1 = normalize_func(five_star_df)
five_star_df1.show(5)

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|food is very blan...|  1.0|[food, is, very, ...|
|if you have not y...|  1.0|[if, you, have, n...|
|worse customer se...|  1.0|[worse, customer,...|
|after reading the...|  1.0|[after, reading, ...|
|came here for a f...|  1.0|[came, here, for,...|
+--------------------+-----+--------------------+
only showing top 5 rows

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|a few years ago  ...|  2.0|[few, years, ago,...|
|atmosphere for th...|  2.0|[atmosphere, for,...|
|decided to give t...|  2.0|[decided, to, giv...|
|you can see that ...|  2.0|[you, can, see, t...|
|i think i may hav...|  2.0|[think, may, have...|
+--------------------+-----+--------------------+
only showing top 5 rows

+--------------------+-----+--------------------+


In [32]:
# Stopwords and Stemming - One Star Ratings
def sw_and_stemming(df):
    english_stopwords = set(stopwords.words('english'))
    len_english_stopwords = len(english_stopwords)
    english_stopwords_ratio = udf(lambda z: len(set(z).intersection(english_stopwords)) / len_english_stopwords)

    german_stopwords = set(stopwords.words('german'))
    len_german_stopwords = len(german_stopwords)
    german_stopwords_ratio = udf(lambda z: len(set(z).intersection(german_stopwords)) / len_german_stopwords)

    star_tokens_df_eng = df.withColumn("ratio_eng", english_stopwords_ratio(df['words']))
        .withColumn("ratio_ger", german_stopwords_ratio(df['words']))
        .withColumn("Eng", col('ratio_eng') > col('ratio_ger')).filter('Eng')
    
    return star_tokens_df_eng

In [24]:
one_star_df2 = sw_and_stemming(one_star_df1)
one_star_df2.show(5)

two_star_df2 = sw_and_stemming(two_star_df1)
two_star_df2.show(5)

three_star_df2 = sw_and_stemming(three_star_df1)
three_star_df2.show(5)

four_star_df2 = sw_and_stemming(four_star_df1)
four_star_df2.show(5)

five_star_df2 = sw_and_stemming(five_star_df1)
five_star_df2.show(5)

+--------------------+-----+--------------------+-------------------+--------------------+----+
|                text|label|               words|          ratio_eng|           ratio_ger| Eng|
+--------------------+-----+--------------------+-------------------+--------------------+----+
|food is very blan...|  1.0|[food, is, very, ...| 0.0670391061452514|0.004310344827586207|true|
|if you have not y...|  1.0|[if, you, have, n...|0.13966480446927373|0.004310344827586207|true|
|worse customer se...|  1.0|[worse, customer,...| 0.0670391061452514|0.004310344827586207|true|
|after reading the...|  1.0|[after, reading, ...| 0.1787709497206704| 0.01293103448275862|true|
|came here for a f...|  1.0|[came, here, for,...| 0.2346368715083799|0.017241379310344827|true|
+--------------------+-----+--------------------+-------------------+--------------------+----+
only showing top 5 rows

+--------------------+-----+--------------------+-------------------+--------------------+----+
|              

In [38]:
# Remove stopwords
def sw_and_stemming_removed(df):
    more_stopwords = ['', 'com', 'de', 'eu', 'cf', 'pm', 'like', 'one', 'using', 'new', 'also',
         'really', 'need', 'caption', 'since', 'change', 'young', 'align', 'width',
         'attachment', 'number', 'know', 'two', 'use', 'see', 'get', 'first', 'good',
         'next', 'well', 'day', 'way', 'fruit', 'different', 'let', 'lot', 'would',
         'already', 'set', 'user', 'even', 'might', 'many', 'different', 'crazy',
         'may', 'could', 'still', 'probably', 'make', 'write', 'used', 'written',
         'go', 'us', 'yes', 'seen', 'behind', 'much', 'makes', 'via', 'based',
         'choose', 'presented', 'away', 'hence', 'wants', 'please', 'add',
         'something', 'conclusion', 'able', 'describe', 'thing', 'likely',
         'lots', 'sense', 'higher', 'every', 'right', 'sure', 'quite', 'without']
    
    tokenizer = RegexTokenizer(inputCol="text", outputCol="words", gaps=True, pattern=r'\s+', minTokenLength=2)

    stopword_Remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")
    stopword_Remover.setStopWords(stopword_Remover.getStopWords() + more_stopwords)

    final_tokens_df = stopword_Remover.transform(df)

    # Stemming
    stem_words = SnowballStemmer("english", ignore_stopwords=False)
    udf_Stemmed = udf(lambda l: [stem_words.stem(s) for s in l], ArrayType(StringType()))

    new_final_tokens_df = final_tokens_df.withColumn("filteredStemmed", udf_Stemmed(final_tokens_df["filtered"]))
    
    return new_final_tokens_df

In [39]:
one_star_df3 = sw_and_stemming_removed(one_star_df2)
one_star_df3.show(5)

two_star_df3 = sw_and_stemming_removed(two_star_df2)
two_star_df3.show(5)

three_star_df3 = sw_and_stemming_removed(three_star_df2)
three_star_df3.show(5)

four_star_df3 = sw_and_stemming_removed(four_star_df2)
four_star_df3.show(5)

five_star_df3 = sw_and_stemming_removed(five_star_df2)
five_star_df3.show(5)

+--------------------+-----+--------------------+-------------------+--------------------+----+--------------------+--------------------+
|                text|label|               words|          ratio_eng|           ratio_ger| Eng|            filtered|     filteredStemmed|
+--------------------+-----+--------------------+-------------------+--------------------+----+--------------------+--------------------+
|food is very blan...|  1.0|[food, is, very, ...| 0.0670391061452514|0.004310344827586207|true|[food, bland, aut...|[food, bland, aut...|
|if you have not y...|  1.0|[if, you, have, n...|0.13966480446927373|0.004310344827586207|true|[yet, tried, wasa...|[yet, tri, wasabi...|
|worse customer se...|  1.0|[worse, customer,...| 0.0670391061452514|0.004310344827586207|true|[worse, customer,...|[wors, custom, se...|
|after reading the...|  1.0|[after, reading, ...| 0.1787709497206704| 0.01293103448275862|true|[reading, reviews...|[read, review, ye...|
|came here for a f...|  1.0|[came,

In [58]:
# Add ID column
id=mi()
one_star_df3 = one_star_df3.withColumn('ID', id)
two_star_df3 = two_star_df3.withColumn('ID', id)
three_star_df3 = three_star_df3.withColumn('ID', id)
four_star_df3 = four_star_df3.withColumn('ID', id)
five_star_df3 = five_star_df3.withColumn('ID', id)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49470)
Traceback (most recent call last):
  File "C:\Users\vaibh\Spark\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 958, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\vaibh\Spark\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1096, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:49470)

In [189]:
# Create feature vectors
def create_feature_vectors(df):
    cv = CountVectorizer(inputCol="filteredStemmed", outputCol="features",
                         vocabSize=50000,
                         minTF=2, # of words that must appear in a document
                         minDF=4) # of documents a word must appear show up in

    cv_Model = cv.fit(df)

    count_Vectors = (cv_Model.transform(df).select("ID", "features").cache())
    
    return count_Vectors

In [200]:
one_star_df4 = create_feature_vectors(one_star_df3)

two_star_df4 = create_feature_vectors(two_star_df3)

three_star_df4 = create_feature_vectors(three_star_df3)

four_star_df4 = create_feature_vectors(four_star_df3)

five_star_df4 = create_feature_vectors(five_star_df3)


In [167]:
# Train LDA Model - One Star Rating
train_data_df, test_data_df = one_star_df4.randomSplit([0.8, 0.2], 1)
# print('Training and testing documents: ', df_training.count(), df_testing.count())

numTopics = 20 # number of topics

lda = LDA(k = numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True, maxIter = 50, 
          learningDecay = 0.51,  learningOffset = 64.0, subsamplingRate = 0.05)

lda_Model = lda.fit(train_data_df)
log_prex_test, log_prex_train = lda_Model.logPerplexity(test_data_df), lda_Model.logPerplexity(train_data_df)
print("Perplexity of Test Dataset and Train Dataset: " + str(log_prex_train) + ',' + str(log_prex_test))

# Print topics and top-weighted terms
topics = lda_Model.describeTopics(maxTermsPerTopic=10)
vocabArray = cv_Model.vocabulary

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]))
FormatNumbers = udf(lambda y: ["{:1.4f}".format(x) for x in y])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)
topics.select(FormatNumbers(topics.termWeights).alias('weights_of_words')).show(truncate=False, n=numTopics)

Perplexity on testing and training data: 6.1589394814669784,6.4188039262149035
+-----------------------------------------------------------------------------+
|words                                                                        |
+-----------------------------------------------------------------------------+
|[food, place, restaur, better, bad, review, great, qualiti, servic, got]     |
|[wait, tabl, minut, drink, ask, restaur, seat, came, hour, server]           |
|[room, wine, glass, dirti, door, clean, park, hotel, car, floor]             |
|[price, shrimp, worst, ice, dinner, pho, cream, game, pretti, cours]         |
|[salad, never, sushi, came, didn, bad, roll, cold, horribl, food]            |
|[servic, custom, server, bartend, ever, manag, tip, bill, pay, bar]          |
|[lunch, breakfast, store, lobster, card, coupon, cake, stop, pie, red]       |
|[burger, eat, burrito, friend, fri, free, wasn, fresh, salti, bun]           |
|[meal, menu, potato, noth, special, smal

In [168]:
# One Star Rating Topic Results

from pyspark.sql.types import StructType, FloatType
maxTermsPerTopic = 5

# Print the topics, showing the top-weighted terms for each topic.
topics = lda_Model.describeTopics(maxTermsPerTopic=maxTermsPerTopic)
vocabArray = cv_Model.vocabulary
numTopics = topics.count()

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]), ArrayType(StringType()))
FormatNumbers = udf(lambda y: [float("{:1.4f}".format(x)) for x in y], ArrayType(FloatType()))

top_topics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights_of_words'))

top_topics.show(truncate=False, n=numTopics)

print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))

+-----+----------------------------------------+----------------------------------------+
|topic|words                                   |weights                                 |
+-----+----------------------------------------+----------------------------------------+
|1    |[food, place, restaur, better, bad]     |[0.3604, 0.1662, 0.0218, 0.0168, 0.0153]|
|2    |[wait, tabl, minut, drink, ask]         |[0.0706, 0.0636, 0.056, 0.042, 0.0306]  |
|3    |[room, wine, glass, dirti, door]        |[0.0943, 0.0658, 0.0604, 0.0585, 0.0532]|
|4    |[price, shrimp, worst, ice, dinner]     |[0.1064, 0.0694, 0.0555, 0.0424, 0.0333]|
|5    |[salad, never, sushi, came, didn]       |[0.0361, 0.0289, 0.0272, 0.0247, 0.0234]|
|6    |[servic, custom, server, bartend, ever] |[0.1379, 0.0617, 0.0252, 0.0203, 0.02]  |
|7    |[lunch, breakfast, store, lobster, card]|[0.1083, 0.0783, 0.0451, 0.0428, 0.0394]|
|8    |[burger, eat, burrito, friend, fri]     |[0.1578, 0.1376, 0.0456, 0.0424, 0.0376]|
|9    |[me

In [191]:
# Train LDA Model - Two Star Rating
train_data_df, test_data_df = two_star_df4.randomSplit([0.8, 0.2], 1)
# print('Training and testing documents: ', df_training.count(), df_testing.count())

numTopics = 20 # number of topics

lda = LDA(k = numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True, maxIter = 50, 
          learningDecay = 0.51,  learningOffset = 64.0, subsamplingRate = 0.05)

lda_Model = lda.fit(train_data_df)
log_prex_test, log_prex_train = lda_Model.logPerplexity(test_data_df), lda_Model.logPerplexity(train_data_df)
print("Perplexity of Test Dataset and Train Dataset: " + str(log_prex_train) + ',' + str(log_prex_test))

# Print topics and top-weighted terms
topics = lda_Model.describeTopics(maxTermsPerTopic=10)
vocabArray = cv_Model.vocabulary

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]))
FormatNumbers = udf(lambda y: ["{:1.4f}".format(x) for x in y])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)
topics.select(FormatNumbers(topics.termWeights).alias('weights_of_words')).show(truncate=False, n=numTopics)

Perplexity on testing and training data: 6.301070797376387,6.571717312813163
+---------------------------------------------------------------------------------+
|words                                                                            |
+---------------------------------------------------------------------------------+
|[food, servic, place, eat, nice, time, order, hour, manag, told]                 |
|[order, wait, minut, came, chicken, custom, tabl, want, back, time]              |
|[yesterday, train, pictur, hole, spici, sound, adult, post, strong, warm]        |
|[tast, best, yet, cheap, excit, restaur, disgust, pretti, instead, boyfriend]    |
|[place, owner, take, anyth, order, ve, anoth, come, chees, never]                |
|[sit, plate, experi, think, sauc, today, whole, salad, deliv, menus]             |
|[place, disappoint, left, go, long, guy, min, minut, full, honest]               |
|[friend, bad, brought, door, thing, bland, finish, forev, busi, hour]            |

In [192]:
# Two Star Rating Topic Results

from pyspark.sql.types import StructType, FloatType
maxTermsPerTopic = 5

# Print the topics, showing the top-weighted terms for each topic.
topics = lda_Model.describeTopics(maxTermsPerTopic=maxTermsPerTopic)
vocabArray = cv_Model.vocabulary
numTopics = topics.count()

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]), ArrayType(StringType()))
FormatNumbers = udf(lambda y: [float("{:1.4f}".format(x)) for x in y], ArrayType(FloatType()))

top_topics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights_of_words'))

top_topics.show(truncate=False, n=numTopics)

print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))

+-----+---------------------------------------+----------------------------------------+
|topic|words                                  |weights                                 |
+-----+---------------------------------------+----------------------------------------+
|1    |[food, servic, place, eat, nice]       |[0.3036, 0.0662, 0.0552, 0.0226, 0.0217]|
|2    |[order, wait, minut, came, chicken]    |[0.0775, 0.0454, 0.0404, 0.0352, 0.0331]|
|3    |[yesterday, train, pictur, hole, spici]|[0.057, 0.0564, 0.0328, 0.0292, 0.0268] |
|4    |[tast, best, yet, cheap, excit]        |[0.0678, 0.0595, 0.0309, 0.0305, 0.0234]|
|5    |[place, owner, take, anyth, order]     |[0.0958, 0.0316, 0.0278, 0.0268, 0.0249]|
|6    |[sit, plate, experi, think, sauc]      |[0.08, 0.0591, 0.0512, 0.0502, 0.0451]  |
|7    |[place, disappoint, left, go, long]    |[0.0628, 0.0461, 0.0439, 0.0237, 0.0217]|
|8    |[friend, bad, brought, door, thing]    |[0.1282, 0.0851, 0.0377, 0.0313, 0.0305]|
|9    |[leav, someon,

In [195]:
# Train LDA Model - Three Star Rating
train_data_df, test_data_df = three_star_df4.randomSplit([0.8, 0.2], 1)
# print('Training and testing documents: ', df_training.count(), df_testing.count())

numTopics = 20 # number of topics

lda = LDA(k = numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True, maxIter = 50, 
          learningDecay = 0.51,  learningOffset = 64.0, subsamplingRate = 0.05)

lda_Model = lda.fit(train_data_df)
log_prex_test, log_prex_train = lda_Model.logPerplexity(test_data_df), lda_Model.logPerplexity(train_data_df)
print("Perplexity of Test Dataset and Train Dataset: " + str(log_prex_train) + ',' + str(log_prex_test))

# Print topics and top-weighted terms
topics = lda_Model.describeTopics(maxTermsPerTopic=10)
vocabArray = cv_Model.vocabulary

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]))
FormatNumbers = udf(lambda y: ["{:1.4f}".format(x) for x in y])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)
topics.select(FormatNumbers(topics.termWeights).alias('weights_of_words')).show(truncate=False, n=numTopics)

Perplexity on testing and training data: 6.364765340907587,6.618695934516439
+---------------------------------------------------------------------------+
|words                                                                      |
+---------------------------------------------------------------------------+
|[place, food, time, went, bad, servic, bar, drink, better, never]          |
|[spot, put, heard, sausag, nobodi, known, sister, contain, ranch, block]   |
|[said, ok, establish, overal, anyth, child, whatev, mile, flavorless, hope]|
|[got, peopl, eat, sit, experi, place, horribl, thought, disgust, think]    |
|[ever, back, eat, sat, small, ve, three, dirti, call, terribl]             |
|[order, servic, visit, night, chicken, fresh, told, pork, locat, tell]     |
|[cook, hour, hostess, plate, insid, past, save, alway, none, son]          |
|[worst, fri, group, order, given, place, dollar, open, disappoint, greasi] |
|[place, anyth, expect, re, everyth, late, want, ask, els, order]

In [196]:
# Three Star Rating Topic Results

from pyspark.sql.types import StructType, FloatType
maxTermsPerTopic = 5

# Print the topics, showing the top-weighted terms for each topic.
topics = lda_Model.describeTopics(maxTermsPerTopic=maxTermsPerTopic)
vocabArray = cv_Model.vocabulary
numTopics = topics.count()

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]), ArrayType(StringType()))
FormatNumbers = udf(lambda y: [float("{:1.4f}".format(x)) for x in y], ArrayType(FloatType()))

top_topics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights_of_words'))

top_topics.show(truncate=False, n=numTopics)

print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))

+-----+--------------------------------------+----------------------------------------+
|topic|words                                 |weights                                 |
+-----+--------------------------------------+----------------------------------------+
|1    |[place, food, time, went, bad]        |[0.0472, 0.0465, 0.0352, 0.0348, 0.0347]|
|2    |[spot, put, heard, sausag, nobodi]    |[0.0495, 0.0472, 0.0464, 0.0449, 0.039] |
|3    |[said, ok, establish, overal, anyth]  |[0.1838, 0.1782, 0.035, 0.0299, 0.0174] |
|4    |[got, peopl, eat, sit, experi]        |[0.0928, 0.053, 0.0328, 0.0316, 0.0292] |
|5    |[ever, back, eat, sat, small]         |[0.1272, 0.0814, 0.0753, 0.0174, 0.0172]|
|6    |[order, servic, visit, night, chicken]|[0.2107, 0.1191, 0.0533, 0.0373, 0.0361]|
|7    |[cook, hour, hostess, plate, insid]   |[0.1046, 0.0837, 0.0305, 0.0293, 0.0283]|
|8    |[worst, fri, group, order, given]     |[0.1491, 0.0325, 0.0241, 0.0188, 0.0185]|
|9    |[place, anyth, expect, re

In [198]:
# Train LDA Model - Four Star Rating
train_data_df, test_data_df = Four_star_df4.randomSplit([0.8, 0.2], 1)
# print('Training and testing documents: ', df_training.count(), df_testing.count())

numTopics = 20 # number of topics

lda = LDA(k = numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True, maxIter = 50, 
          learningDecay = 0.51,  learningOffset = 64.0, subsamplingRate = 0.05)

lda_Model = lda.fit(train_data_df)
log_prex_test, log_prex_train = lda_Model.logPerplexity(test_data_df), lda_Model.logPerplexity(train_data_df)
print("Perplexity of Test Dataset and Train Dataset: " + str(log_prex_train) + ',' + str(log_prex_test))

# Print topics and top-weighted terms
topics = lda_Model.describeTopics(maxTermsPerTopic=10)
vocabArray = cv_Model.vocabulary

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]))
FormatNumbers = udf(lambda y: ["{:1.4f}".format(x) for x in y])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)
topics.select(FormatNumbers(topics.termWeights).alias('weights_of_words')).show(truncate=False, n=numTopics)

Perplexity on testing and training data: 6.374076570454552,6.571016625296677
+----------------------------------------------------------------------------------+
|words                                                                             |
+----------------------------------------------------------------------------------+
|[never, bad, food, order, end, back, manag, wrong, say, want]                     |
|[got, minut, beef, open, eat, final, long, size, time, give]                      |
|[acknowledg, avail, spinach, guarante, fourth, btw, chain, freezer, child, turnov]|
|[went, time, didn, peopl, give, hour, never, told, call, ok]                      |
|[place, order, look, line, experi, night, peopl, wasn, ever, burrito]             |
|[food, wait, pizza, chicken, back, sat, famili, check, waitress, servic]          |
|[drink, husband, hand, noodl, mean, serv, needless, worker, toronto, lobster]     |
|[owner, money, tip, came, eat, saw, told, everyth, salt, need]          

In [199]:
# Four Star Rating Topic Results

from pyspark.sql.types import StructType, FloatType
maxTermsPerTopic = 5

# Print the topics, showing the top-weighted terms for each topic.
topics = lda_Model.describeTopics(maxTermsPerTopic=maxTermsPerTopic)
vocabArray = cv_Model.vocabulary
numTopics = topics.count()

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]), ArrayType(StringType()))
FormatNumbers = udf(lambda y: [float("{:1.4f}".format(x)) for x in y], ArrayType(FloatType()))

top_topics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights_of_words'))

top_topics.show(truncate=False, n=numTopics)

print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))

+-----+----------------------------------------------+----------------------------------------+
|topic|words                                         |weights                                 |
+-----+----------------------------------------------+----------------------------------------+
|1    |[never, bad, food, order, end]                |[0.0598, 0.0559, 0.0535, 0.0503, 0.0436]|
|2    |[got, minut, beef, open, eat]                 |[0.1339, 0.0651, 0.0298, 0.0264, 0.0261]|
|3    |[acknowledg, avail, spinach, guarante, fourth]|[0.0527, 0.0242, 0.0218, 0.019, 0.0174] |
|4    |[went, time, didn, peopl, give]               |[0.0326, 0.0306, 0.0216, 0.019, 0.0181] |
|5    |[place, order, look, line, experi]            |[0.1851, 0.1679, 0.0796, 0.0319, 0.029] |
|6    |[food, wait, pizza, chicken, back]            |[0.2774, 0.0614, 0.0329, 0.0264, 0.0164]|
|7    |[drink, husband, hand, noodl, mean]           |[0.2014, 0.0978, 0.0705, 0.0289, 0.0267]|
|8    |[owner, money, tip, came, eat]   

In [201]:
# Train LDA Model - Five Star Rating
train_data_df, test_data_df = five_star_df4.randomSplit([0.8, 0.2], 1)
# print('Training and testing documents: ', df_training.count(), df_testing.count())

numTopics = 20 # number of topics

lda = LDA(k = numTopics, seed = 1, optimizer="online", optimizeDocConcentration=True, maxIter = 50, 
          learningDecay = 0.51,  learningOffset = 64.0, subsamplingRate = 0.05)

lda_Model = lda.fit(train_data_df)
log_prex_test, log_prex_train = lda_Model.logPerplexity(test_data_df), lda_Model.logPerplexity(train_data_df)
print("Perplexity of Test Dataset and Train Dataset: " + str(log_prex_train) + ',' + str(log_prex_test))

# Print topics and top-weighted terms
topics = lda_Model.describeTopics(maxTermsPerTopic=10)
vocabArray = cv_Model.vocabulary

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]))
FormatNumbers = udf(lambda y: ["{:1.4f}".format(x) for x in y])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)
topics.select(FormatNumbers(topics.termWeights).alias('weights_of_words')).show(truncate=False, n=numTopics)

Perplexity on testing and training data: 6.237907148346874,6.451044403333594
+-------------------------------------------------------------------------------+
|words                                                                          |
+-------------------------------------------------------------------------------+
|[food, fish, bad, got, time, wast, told, say, find, credit]                    |
|[server, didn, waitress, chees, put, hour, mind, last, eaten, save]            |
|[particular, argu, figur, stupid, agre, laugh, along, king, throw, worker]     |
|[locat, walk, dinner, ever, took, want, chicken, around, happen, poor]         |
|[restaur, said, fri, night, actual, eat, tasteless, made, sorri, given]        |
|[place, friend, seem, els, time, want, dish, water, return, check]             |
|[meal, expens, menu, least, ten, game, dine, feel, believ, overal]             |
|[order, tabl, serv, hope, never, dinner, felt, warm, eat, min]                 |
|[look, sauc, seat, s

In [202]:
# Five Star Rating Topic Results

from pyspark.sql.types import StructType, FloatType
maxTermsPerTopic = 5

# Print the topics, showing the top-weighted terms for each topic.
topics = lda_Model.describeTopics(maxTermsPerTopic=maxTermsPerTopic)
vocabArray = cv_Model.vocabulary
numTopics = topics.count()

ListOfIndexToWords = udf(lambda z: list([vocabArray[w] for w in z]), ArrayType(StringType()))
FormatNumbers = udf(lambda y: [float("{:1.4f}".format(x)) for x in y], ArrayType(FloatType()))

top_topics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights_of_words'))

top_topics.show(truncate=False, n=numTopics)

print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))

+-----+---------------------------------------+----------------------------------------+
|topic|words                                  |weights                                 |
+-----+---------------------------------------+----------------------------------------+
|1    |[food, fish, bad, got, time]           |[0.4218, 0.0311, 0.028, 0.025, 0.0244]  |
|2    |[server, didn, waitress, chees, put]   |[0.2102, 0.1339, 0.0912, 0.0479, 0.0239]|
|3    |[particular, argu, figur, stupid, agre]|[0.1112, 0.0588, 0.0569, 0.0342, 0.0334]|
|4    |[locat, walk, dinner, ever, took]      |[0.0322, 0.0279, 0.0279, 0.027, 0.021]  |
|5    |[restaur, said, fri, night, actual]    |[0.1256, 0.12, 0.1063, 0.0643, 0.0381]  |
|6    |[place, friend, seem, els, time]       |[0.6169, 0.0467, 0.0328, 0.0268, 0.0246]|
|7    |[meal, expens, menu, least, ten]       |[0.075, 0.0424, 0.0375, 0.0298, 0.0218] |
|8    |[order, tabl, serv, hope, never]       |[0.5132, 0.1139, 0.0479, 0.0194, 0.0133]|
|9    |[look, sauc, s