Upload kaggle.json to access datasets

Install pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=96558c441b4d27a73a54724e8b9aab006f718058f893bb43885f0b5f6606e1f4
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark


Install kaggle and access kaggle permissions with kaggle.json

In [None]:
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Import necessary libraries (Some of these may be removed for final version)

In [None]:
import pandas as pd
import numpy as np
import nltk
import pyspark.pandas as ps
import random

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover
from pyspark.ml.feature import StringIndexer, CountVectorizer

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfTransformer, TfidfVectorizer

from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

Create Spark session

In [None]:
spark = SparkSession.builder.appName('Restaurant Data').getOrCreate()

Get restaurant data

In [None]:
!kaggle datasets download inigolopezrioboo/a-tripadvisor-dataset-for-nlp-tasks
!unzip a-tripadvisor-dataset-for-nlp-tasks.zip

Downloading a-tripadvisor-dataset-for-nlp-tasks.zip to /content
100% 591M/591M [00:04<00:00, 190MB/s]
100% 591M/591M [00:04<00:00, 127MB/s]
Archive:  a-tripadvisor-dataset-for-nlp-tasks.zip
  inflating: Barcelona_reviews.csv   
  inflating: London_reviews.csv      
  inflating: Madrid_reviews.csv      
  inflating: New_Delhi_reviews.csv   
  inflating: New_York_reviews.csv    
  inflating: Paris_reviews.csv       


Create Spark dataframe using London reviews

In [None]:
df = spark.read.csv('London_reviews.csv', inferSchema = True, header = True)

**Preparing Data**

a. Understanding Data and removing unwanted columns

b. Filtering neutral reviews

c. Assigning Positive and Negative Sentiment to Reviews based on Score (luckily this already exists in our dataset)

d. Assigning Binary Rating as Target Variable 1: Positive 0: Negative

We will use the following settings to check in on our progress at each step. However, this will add to our run time since .show is an action. This data already comes with a column 'sample' that lists if a review is positive or negative. This is useful for building an initial model.

This is what our initial RDD looks like.

In [None]:
df.show(truncate = False, n = 5)
df.count()

+---+-----------+--------------------+-------------+--------+----------------+----------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

996848

Our first step is to drop all columns other than restaurant_name, rating_review, sample, and review_full.

In [None]:
df = df[['restaurant_name', 'rating_review', 'sample', 'review_full']]
#df.show(truncate = False, n = 5)

Next we noticed that "rating_review" actually had string type values, so we converted them to double for future manipulation.

In [None]:
df.printSchema()
df = df.withColumn('rating_review', df.rating_review.cast('double'))
df.printSchema()

root
 |-- restaurant_name: string (nullable = true)
 |-- rating_review: string (nullable = true)
 |-- sample: string (nullable = true)
 |-- review_full: string (nullable = true)

root
 |-- restaurant_name: string (nullable = true)
 |-- rating_review: double (nullable = true)
 |-- sample: string (nullable = true)
 |-- review_full: string (nullable = true)



After taking a deeper look at our data, we realized that in several rows the entries seem to be in the wrong column (something that is clearly a review being present under "restaurant_name", for instance). This could affect our model in a negative way, so we want to remove those rows.

Additionally, we want to minimize how much weight our model is based on restaurants with a very low number of reviews. There are several restaurants in the data that have an average 5.0 review, which is realistically only the case because they have such a small sample size.

We can address both of these problems by removing rows which have restaurant_names that appear more than one time in the entire dataset.

In [None]:
uniquedf = df.groupBy('restaurant_name').agg(countDistinct('review_full')).orderBy('count(review_full)', ascending = True)
#uniquedf.show(truncate = False, n = 5)
#uniquedf.count()

Looking at the whole dataset we discovered that the first restaurant to have more than one unique review was in the 275th row of our new 2100 row rdd.

In [None]:
print(uniquedf.collect()[273][0], ", ", uniquedf.collect()[273][1])
print(uniquedf.collect()[274][0], ", ", uniquedf.collect()[274][1])

 breaded chicken strips over cooked fried in old oil..." ,  1
Chotto_Matte ,  5


We created a new rdd of unique restaurant names with greater than one unique review by dropping the first 274 rows of uniquedf. From this, we made a list called restaurants based on the first column of this rdd.

In [None]:
uniquedf2 = spark.createDataFrame(uniquedf.tail(uniquedf.count()-274), uniquedf.schema)
#uniquedf2.show(n = 5)
#uniquedf2.count()

In [None]:
restaurants = uniquedf2.select('restaurant_name').collect()
rest_list = [row.restaurant_name for row in restaurants]
print(rest_list)

['Chotto_Matte', 'Merlot_Garden_Bar_Restaurant', 'Buns_and_Buns', 'Sweetsmile_Bakery_Patisserie', 'Roxie_Steak_Earlsfield', 'Roganic', 'Fumo', 'Daisy_Green', 'RedFarm', 'Le_Vacherin', 'Tulse_Hill_Hotel', 'The_Oval_Restaurant', 'Kitchen_W8', 'Whiskey_Ginger', 'Taro', 'Cedar_Lebanese_Restaurant', 'L_amour', 'Rucoletta', 'Damascus_Bite', 'Ask_For_Janice', 'Sim_s_Steak_House', 'Red_Rose_Takeaway', 'The_Counting_House', 'Grand_Cholan_Indian_Cuisine', 'Mint_Leaves_Real_Indian_Food', 'The_London_Burger_Co', '3AKE', 'TDQ_Steaks', 'Afrik_N_Fusion', 'Supplant_Limited', 'Rossopomodoro', 'Ading_s_Kitchen', 'Kalamaki_Street_Greek', 'Di_Stefano_Coffee', 'Pasta_Plant', 'London_Secret_Garden', 'Epic_Pies', 'William_s_Bar_and_Bistro', 'Pizzeria_Pellone_London', 'Bebs_Kitchen', 'Don_Ciccio_Osteria_Siciliana', 'Nora_Cafe', 'Legare', 'The_waves_seafood_grill', 'Acoustic_Brasserie', 'Tower_Mangal', 'Rosmarino', 'Albion_s_Restaurant', 'Da_Moreno', 'Pie_Minister', 'Rotate', 'Kanada_Ya_Upper_Street', 'The_Gre

Now we can drop all rows which have restaurant_names outside of this new rdd.

In [None]:
df = df.filter(df.restaurant_name.isin(rest_list))

Now that the nonsense rows have been removed, we can move on to our next data cleaning step by removing all rows with null values.

---



In [None]:
df = df.dropna()
#df.show(truncate = False, n = 5)

First Checkpoint. If anything goes wrong later, we can start over with this.

In [None]:
check_df1 = df

**Text Pre Processing **

a. Create UDF Functions for text processing: Convert to lower case, Remove Punctuations and alphanumeric words, Remove Stop words

b. Text Lemmatization

i) Convert reviews to lower case.

In [None]:
df = check_df1.select("*", lower(col('review_full')))
df = df.drop(col('review_full'))
df = df.withColumnRenamed('lower(review_full)', 'review_full')
#df.show(truncate = False, n = 5)

ii) Remove punctuation and alphanumeric words

For this we create a python function to leave only the lowercase letters and a space, then convert this function into a spark UDF.

In [None]:
def only_letters(x): #ord(' ') == 32
  stripped = (c for c in x \
              if ord(c) == 32 \
              or 96 < ord(c) < 123) #ord('a') - ord('z') = 97-122
  return ''.join(stripped)

only_letters_udf = udf(lambda x: only_letters(x))

In [None]:
df = df.select(col('*')
          , only_letters_udf(col('review_full')))
df = df.drop(col('review_full'))
df = df.withColumnRenamed('<lambda>(review_full)', 'review_full')
#df.show(truncate = False, n = 5)

Second checkpoint.

In [None]:
check_df2 = df

iii) Remove stopwords

Before we remove the stopwords we need to tokenize the text.

In [None]:
tokenizer = Tokenizer(inputCol = 'review_full', outputCol = 'tokens')
df = tokenizer.transform(check_df2)
df = df.drop(col('review_full'))
df = df.withColumnRenamed('tokens', 'review_full')

Once the reviews are tokenzied, we can use the pyspark StopWordsRemover to remove the stopwords (as one might guess).

In [None]:
remover = StopWordsRemover(inputCol = 'review_full'
  , outputCol = 'review_clean')
df = remover.transform(df).select('*')
df = df.drop(col('review_full'))
df = df.withColumnRenamed('review_clean', 'review_full')
#df.show(truncate = False, n = 5)

Third Checkpoint.

In [None]:
check_df3 = df

iv) Lemmatize

In [None]:
lemmatizer = WordNetLemmatizer()
def lemm(x):
  lems = []
  for word in x:
    temp = lemmatizer.lemmatize(word)
    lems.append(temp)
  return ', '.join(lems)

lemm_udf = udf(lambda x: lemm(x))

In [None]:
df = check_df3.select(col('*'), lemm_udf(col('review_full')))
df = df.drop(col('review_full'))
df = df.withColumnRenamed('<lambda>(review_full)', 'review_full')
#df.show(truncate = False, n = 5)

To create our sentiment analysis model, we will start by splitting our dataset into training data and testing data. The model will be designed using only the training data, and will then be evaluated against the testing data.

In [None]:
(train_set, test_set) = df.randomSplit([0.8, 0.2], seed = 1213)

Creating a logistic regression pipeline.

Tokenizer -> TF/IDF -> logistic regression model

In [None]:
tokenizer = Tokenizer(inputCol = 'review_full', outputCol = 'words')
hashtf = HashingTF(numFeatures = 2**16, inputCol = 'words', outputCol = 'tf')
idf = IDF(inputCol = 'tf', outputCol = 'features', minDocFreq = 5)
label_stringIdx = StringIndexer(inputCol = 'rating_review', outputCol = 'label')
pipeline = Pipeline(stages = [tokenizer, hashtf, idf, label_stringIdx])

In [None]:
pipelineFit = pipeline.fit(train_set)

Fitting the pipeline along the training set and testing set.

In [None]:
train_df = pipelineFit.transform(train_set)
test_df = pipelineFit.transform(test_set)
#train_df.show(n = 5)

Creating the predictive model based on the logistic regression on the training set.

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter = 100)
lrModel = lr.fit(train_df)
prediction = lrModel.transform(test_df)

Creating a new RDD which shows the rating_review and the predicted review based on our model.

In [None]:
lrdf = lrModel.summary.predictions.select('restaurant_name', 'review_full', \
                                          'rating_review', 'prediction')
lrdf = lrdf.withColumn('rating_prediction', col('prediction') - 5)
lrdf = lrdf.drop('prediction')
lrdf = lrdf.withColumn('prediction', col('rating_prediction')*-1)
lrdf = lrdf.drop('rating_prediction')

In [None]:
lrdf.show(n = 20)

+--------------------+--------------------+-------------+----------+
|     restaurant_name|         review_full|rating_review|prediction|
+--------------------+--------------------+-------------+----------+
|280_Degrees_Afric...|bar, man, one, co...|          1.0|       1.0|
|280_Degrees_Afric...|called, , degree,...|          1.0|       1.0|
|280_Degrees_Afric...|get, empty, resta...|          1.0|       1.0|
|280_Degrees_Afric...|god, bad, bad, ba...|          1.0|       1.0|
|280_Degrees_Afric...|ordered, food, on...|          1.0|       1.0|
|280_Degrees_Afric...|receiving, groupo...|          1.0|       1.0|
|280_Degrees_Afric...|visited, restaura...|          1.0|       2.0|
|280_Degrees_Afric...|watson, puff, puf...|          1.0|       1.0|
|280_Degrees_Afric...|arrived, hr, took...|          2.0|       2.0|
|280_Degrees_Afric...|booked, table, , ...|          2.0|       2.0|
|280_Degrees_Afric...|food, alright, ni...|          2.0|       3.0|
|280_Degrees_Afric...|restaurant, 

We can determine how accurate our model is by viewing the proportion of predictions that actually match our label.

In [None]:
prediction = prediction.select('restaurant_name', 'review_full', \
                                          'rating_review', 'prediction')
prediction = prediction.withColumn('rating_prediction', col('prediction') - 5)
prediction = prediction.drop('prediction')
prediction = prediction.withColumn('prediction', col('rating_prediction')*-1)
prediction = prediction.drop('rating_prediction')

In [None]:
prediction.sample(.0001).show()

+--------------------+--------------------+-------------+----------+
|     restaurant_name|         review_full|rating_review|prediction|
+--------------------+--------------------+-------------+----------+
|Lokkanta_Meze_Bar...|highly, recommend...|          5.0|       5.0|
|R_H_Cafe_Art_Gallery|lovely, cafe, fam...|          5.0|       5.0|
|     The_Pig_Butcher|real, treat, meat...|          5.0|       5.0|
|      100_Wardour_St|bar, heart, soho,...|          5.0|       5.0|
| Al_Basha_Restaurant|friend, mine, say...|          3.0|       3.0|
|           Flat_Iron|really, good, res...|          5.0|       5.0|
|     Agra_Restaurant|recent, trip, lon...|          5.0|       5.0|
|Wright_Brothers_B...|visited, work, di...|          5.0|       5.0|
|Mr_Fogg_s_Gin_Par...|place, ginlovers,...|          5.0|       5.0|
|              Avista|went, enjoy, , co...|          1.0|       3.0|
|          Bar_Boulud|mood, french, foo...|          5.0|       4.0|
|Spice_Village_Too...|first, appea

Here we take the number of rows where the actual review matches our predicted review and divide by the number of total rows.

In [None]:
hits = prediction.filter(prediction[2] == \
                         prediction[3]).count()
total = prediction.count()

In [None]:
print(hits/total)

0.6855034508412211


Creating a UDF to convert a rating_review to a word, 'Positive', 'Negative', or 'Neutral'. Then using this UDF to convert our previous testing set RDD to words.

In [None]:
def sent(x):
  if x == 3:
    return 'Neutral'
  elif x > 3:
    return 'Positive'
  else:
    return 'Negative'

sent_udf = udf(lambda x: sent(x))

In [None]:
prediction = prediction.select(col('*'), sent_udf(col('rating_review')))
prediction = prediction.drop(col('rating_review'))
prediction = prediction.withColumnRenamed('<lambda>(rating_review)', 'off_sent')
#prediction.show(n = 5)

In [None]:
prediction = prediction.select(col('*'), sent_udf(col('prediction')))
prediction = prediction.drop(col('prediction'))
prediction = prediction.withColumnRenamed('<lambda>(prediction)', 'pred_sent')
#prediction.show(n = 5)

In [None]:
prediction.sample(.0001).show()

+--------------------+--------------------+--------+---------+
|     restaurant_name|         review_full|off_sent|pred_sent|
+--------------------+--------------------+--------+---------+
|Seven_Park_Place_...|went, last, satur...|Positive| Positive|
|              Aviary|aviary, rooftop, ...|Positive| Positive|
|   Galvin_at_Windows|treated, girlfrie...|Positive| Positive|
|   The_Monkey_Puzzle|great, service, b...|Positive| Positive|
|L_Escargot_Restau...|eaten, restaurant...|Negative| Positive|
|    Mr_Fogg_s_Tavern|staff, made, plac...|Positive| Positive|
|     Smith_Wollensky|first, time, ive,...|Positive| Positive|
|             Al_Duca|eaten, many, time...|Positive| Positive|
|     The_Mawson_Arms|, turned, spec, n...|Positive| Positive|
|Le_Relais_De_Veni...|eaten, several, t...|Positive| Positive|
|Muriel_s_Kitchen_...|loved, place, ret...|Positive| Positive|
|O_Neill_s_Kings_C...|always, least, on...|Positive| Positive|
+--------------------+--------------------+--------+---

Determining how many rows have a sentiment review and predicted sentiment that match. The 'total' number of rows remains unchanged.

In [None]:
sent_hits = prediction.filter(prediction.off_sent == \
                         prediction.pred_sent).count()

In [None]:
print(sent_hits/total)

0.8932425843357444


Last step, word cloud based on positive/negative reviews

In [None]:
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from collections import ChainMap
import pyspark.sql.functions as F

In [None]:
cloud_df = df.drop('restaurant_name', 'sample')

In [None]:
cloud_df.show(n = 5)

+-------------+--------------------+
|rating_review|         review_full|
+-------------+--------------------+
|          5.0|away, couple, mon...|
|          5.0|recently, ordered...|
|          5.0|came, missed, tab...|
|          5.0|first, time, coco...|
|          5.0|healthyhomemade, ...|
+-------------+--------------------+
only showing top 5 rows



In [None]:
cloud_df = cloud_df.select(col('*'), sent_udf(col('rating_review')))
cloud_df = cloud_df.drop(col('rating_review'))
cloud_df = cloud_df.withColumnRenamed('<lambda>(rating_review)', 'sent')
cloud_df.show(n = 5)

+--------------------+--------+
|         review_full|    sent|
+--------------------+--------+
|away, couple, mon...|Positive|
|recently, ordered...|Positive|
|came, missed, tab...|Positive|
|first, time, coco...|Positive|
|healthyhomemade, ...|Positive|
+--------------------+--------+
only showing top 5 rows



In [None]:
pos_df = cloud_df.filter(cloud_df.sent == 'Positive')
neg_df = cloud_df.filter(cloud_df.sent == 'Negative')

In [None]:
pos_df.show(n = 5)

+--------------------+--------+
|         review_full|    sent|
+--------------------+--------+
|away, couple, mon...|Positive|
|recently, ordered...|Positive|
|came, missed, tab...|Positive|
|first, time, coco...|Positive|
|healthyhomemade, ...|Positive|
+--------------------+--------+
only showing top 5 rows



In [None]:
neg_df.show(n = 5)

+--------------------+--------+
|         review_full|    sent|
+--------------------+--------+
|ordered, , set, m...|Negative|
|poor, service, fo...|Negative|
|partner, came, di...|Negative|
|excited, go, serv...|Negative|
|disappointed, del...|Negative|
+--------------------+--------+
only showing top 5 rows



In [None]:
wordcloud = WordCloud(background_color = 'white')

In [None]:
words = dict(ChainMap(*pos_df.select(F.create_map('review_full', 'sent')).rdd.map(lambda x: x[0]).collect()))

In [None]:
plt.imshow(wordcloud.generate_from_frequencies(words))

In [None]:
word_count = (
    pos_df.withColumn('review_full', F.explode(F.split(F.col('review_full'), '\s+')))
    .withColumn('review_full', F.regexp_replace('review_full', '[^\w]', ''))
    .groupBy('review_full')
    .count()
    .sort('count', ascending = False)
)

In [None]:
word_count.show(10)

Many lines skipped to separate this from the stuff above.

Textblob testing

Textblob is a Python library for NLP. It includes a simple API for sentiment analysis. We can use this to test the accuracy of our model.

In [None]:
from textblob import TextBlob

In [None]:
def sentiment_score(review):
  return TextBlob(review).sentiment.polarity

In [None]:
sentiment_udf = udf(lambda x: sentiment_score(x), DoubleType())

In [None]:
tbdf = df.select('restaurant_name', 'rating_review', 'review_full',
               sentiment_udf('review_full').alias('sentiment_score'))

In [None]:
tbdf.groupBy('restaurant_name')\
     .agg(avg('sentiment_score').alias('avg_sentiment_score'))\
     .orderBy('avg_sentiment_score', ascending = False) \
     .show()

In [None]:
#Fetch the resturent location and collect sentiment score depending lo location
#Classify resturent depending on food items
#Compare your result with the rating of the resturant