In [5]:
import requests
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import time
import re
import datetime
import seaborn as sns
from IPython.display import display


import findspark
findspark.init()

import pyspark
import  pyspark.sql.functions as F
from pyspark.sql import SQLContext, SparkSession

from wordcloud import WordCloud

In [6]:
#Starting Spark Session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","16G")\
    .config('spark.executor.memory', '8G') \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.0") \
    .getOrCreate()
spark

In [14]:
rest_attrs_file = '../data/input/rest_attrs_filtered.json/'
df_rest_attrs =  spark.read.option("multiLine", "false").option("mode", "PERMISSIVE").json(rest_attrs_file)
rest_count = df_rest_attrs.count()
print('Total Restaurants = ', rest_count)

Total Restaurants =  60715


In [18]:
op_path = '../../rest_type classification/data/input/review_files/'
op_files = [op_path + f for f in os.listdir(op_path) ]
df_reviews = spark.read.parquet(*op_files)
print('Total Number of Reviews = ',df_reviews.count())

Total Number of Raw Reviews =  19567851


In [95]:
def clean_reviews(df_rest_attributes, df_rest_reviews):
    df_rem_dup = df_rest_reviews.drop_duplicates(subset = ['place_id', 'review_id'])

    df_filter_rev = df_rem_dup.withColumn('Review_Length', F.length('review')).filter(F.col('Review_Length') >30)
    df_merge_rest_type_revs = df_rest_attributes.select('place_id', 'rest_type','rest_summary')\
                    .join(df_filter_rev.select('place_id', 'review','rating'), 'place_id', 'inner')
    
    return df_merge_rest_type_revs

In [338]:
def tokenize(df_rest_reviews):
    from pyspark.ml.feature import Tokenizer, StopWordsRemover, RegexTokenizer
    rest_types = [row.rest_type for row in df_rest_attrs.select("rest_type").distinct().collect()]
    stopwordList = ["good","food","yet", "place","translated", 'tasty', 'amazing', 'one', 'especially', 'definitely',
                    'best', 'really','excellent', 'love', 'restaurant', 'awesome','coming','think', 'though',
                    'perfect','taste', 'eat', 've', 'got', 'location', 'first', 'time','go', 'back','yummy','liked',
                    'know', 'everything', 'need', 'came', 'come','loved', 'enjoy', 'well','better','make', 'sure', 'want',
                    'try', 'meal','thing', 'much', 'll', 'say','even','probably', 'must', 'tasted', 'visit', 'wow','ask',
                    'never', 're', 'd', 'ask', 'asked','went', 'visit', 'person', 'people', 'absolutely','look', 'looked',
                    'friend', 'wife','went','made', 'ok','ate', 'eating', 'eat','wasn', 'didn', 'm', 'way','left','use',
                    'actually', "google", 'great', 'delicious', 'like' , 'lot', 'still', 'thank','won', 'nothing','see',
                    'gave', 'guy', 'cook', 'last', 'top', 'used','enjoyed', 'least', 'little', 'thought', 'guess','tried',
                    'return', 'tried','told','tell','point','okay', 'instead', 'ordering', 'anything','every', 'seem', 
                    'something', 'husband', 'leave', 'right', 'second', 'call', 'served','couldn','waiter', 'waitress',
                    'bad', 'give', 'awful','disappoint', 'disappointing','usually', 'pretty','awful','let','sorry',
                    'said', 'maybe', 'someone', 'table', 'dont', 'done', 'table','worst', 'attitude','plate', 'maybe',
                    'server', 'wanted','unfortunately', 'horrible', 'menu', 'open', 'two','things', 'around', 'inside',
                    'another', 'item', 'bit', 'called', 'everyone', 'given', 'walked', 'understand', 'us','seems', 'find',
                    'put', 'alway','disappointed', 'u', 'put', 'literally', 'going' , 'ordered', 'like', 'either',
                   'brought', 'feel', 'serve', 'saw', 'time','honestly', 'friends']  + rest_types
    stopwordList.extend(StopWordsRemover().getStopWords())
    df_rest_reviews_lower = df_rest_reviews.withColumn('review', F.lower(F.col('review')))
    df_reviews_rem_html = df_rest_reviews_lower.select('place_id','rest_type', 'rating', \
                                  (F.lower(F.regexp_replace('review', "<.*?>", " ")).alias('review')))
    df_reviews_rem_symbols = df_reviews_rem_html.select('place_id','rest_type', 'rating', \
                               (F.lower(F.regexp_replace('review', "[^a-zA-Z\\s]", " ")).alias('review')))

    def filter_empty(l):
        return filter(lambda x: x is not None and len(x) > 0, l)
    

    tokenizer = Tokenizer(inputCol="review", outputCol="review_tokens")
    df_reviews_tokens = tokenizer.transform(df_reviews_rem_symbols).select('place_id','rest_type','rating','review_tokens')
    remover = StopWordsRemover(inputCol='review_tokens', outputCol='words_clean',stopWords=stopwordList)
    df_reviews_clean = remover.transform(df_reviews_tokens).select('place_id','rest_type','rating', 'words_clean')

    df_clean_reviews = df_reviews_clean.withColumn("words_clean", F.expr("filter(words_clean, elem -> elem != '')"))
    
    return df_clean_reviews

In [339]:
def spark_prep(df_rest_attributes, df_rest_reviews):
    df_combined = clean_reviews(df_rest_attributes, df_rest_reviews)
    df_clean_reviews = tokenize(df_combined)
    return df_clean_reviews

In [333]:
df_clean_reviews = spark_prep(df_rest_attrs, df_reviews)
df_clean_reviews.persist()

In [354]:
rest_types = [row.rest_type for row in df_clean_reviews.select('rest_type').distinct().collect()]
for cuisine in rest_types:
    pos_rev = [row.words_clean for row in df_clean_reviews.where('rest_type = "{}" and rating > 3'.format(cuisine)).select('words_clean').collect()]
    neg_rev = [row.words_clean for row in df_clean_reviews.where('rest_type = "{}" and rating < 3'.format(cuisine)).select('words_clean').collect()]
    pos = True
    for rev in [pos_rev, neg_rev]:
        rev_corpus = [(" ").join(sl) for sl in rev]
        vectorizer = TfidfVectorizer(stop_words='english', ngram_range = (1,2), min_df = .01)
        
        X = vectorizer.fit_transform(rev_corpus)
        dense = X.todense()
        denselist = dense.tolist()
        feature_names = vectorizer.get_feature_names_out()
        df = pd.DataFrame(denselist, columns=feature_names)

        wordcloud = WordCloud(font_path = r'C:\Windows\Fonts\Verdana.ttf',
                            background_color = 'white',
                            width = 1200,max_words = 100,
                            height = 1000,
                            collocation_threshold = 20           
                            ).generate_from_frequencies(df.T.sum(axis=1))
        sent = 'negative'
        if pos:
            sent = 'positive'
            pos = False
        os.makedirs('../data/output/wordclouds/{0}/'.format(cuisine),exist_ok=True)
        op_fname = ('../data/output/wordclouds/{0}/{1}_{2}.png'.format(cuisine,cuisine,sent))
        plt.figure(figsize = (12,10))
        plt.imshow(wordcloud)
        plt.axis('off')
        plt.title('Wordcloud of Cuisine: {} , Sentiment: {}"'.format(cuisine,sent))
        plt.savefig(op_fname)
        plt.close()
        