### Download Kaggle Dataset in Google Drive
Reference：https://medium.com/analytics-vidhya/how-to-fetch-kaggle-datasets-into-google-colab-ea682569851a

Kaggle access token is required to be put in your google drive, i.e. /MyDrive/Kaggle

Project Name: **Yelp Data Visualization and Analysis with
Jupyter & Spark** 

Author: YING YAO, YING YU

In order to properly run the code, please follow the weidgets instruction. It will prompt out the window to ask for permission and please authenticate it.

In [None]:
from google.colab import drive
import os
drive.mount('/gdrive')

In [None]:
# Check current location, '/content' is the Colab virtual machine
os.getcwd()
# Enable the Kaggle environment, use the path to the directory your Kaggle API JSON is stored in
os.environ['KAGGLE_CONFIG_DIR'] = '/gdrive/MyDrive/Kaggle'

In [None]:
!pip install kaggle
# Navigate into Drive where you want to store your Kaggle data
if not os.path.exists('/gdrive/My Drive/Kaggle'):
  os.mkdir('/gdrive/My Drive/Kaggle')

!!! Next step is to download your Kaggle API token kaggle.json and save it in /gdrive/My Drive/Kaggle

In [None]:
os.chdir('/gdrive/My Drive/Kaggle')
# Paste and run the copied API command, the data will download to the current directory
!kaggle datasets download -d yelp-dataset/yelp-dataset --force
# Check contents of directory, you should see the .zip file for the competition in your Drive
os.listdir()

### Mount Google Drive and Unzip Data in VM

In [None]:
# Complete path to storage location of the .zip file of data

zip_path = '/gdrive/MyDrive/Kaggle/yelp-dataset.zip'
vm_path = '/content'
# Check current directory (be sure you're in the directory where Colab operates: '/content')
os.chdir(vm_path)
# Copy the .zip file into the present directory
!cp '{zip_path}' .
# Unzip quietly 
!unzip -q 'yelp-dataset.zip'
# View the unzipped contents in the virtual machine
os.listdir()

### Install Spark

In [None]:
%%time
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz # effective on 2021/07/30
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# conf = SparkConf().setMaster("local[*]").setAppName("MyTest")
# sc = SparkContext(conf=conf)
# spark = SparkSession(sc)
spark = SparkSession.builder \
.appName('app_name') \
.master('local[*]') \
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','512G') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()


### Initial Data Preprocessing

In [None]:
import numpy as np 
import pandas as pd
import time
import json
from collections import Counter
import pyspark.sql.functions as f
import matplotlib.pyplot as plt
import plotly.offline as py
import plotly.express as px
import plotly.graph_objs as go
from plotly.offline import iplot, init_notebook_mode
# Using plotly + cufflinks in offline mode
import cufflinks
import ipywidgets as widgets
from ipywidgets import interact, interact_manual
from wordcloud import WordCloud

In [None]:
%%time
df_business = spark.read.json("/content/yelp_academic_dataset_business.json").cache()
#df_checkin = spark.read.json("/content/yelp_academic_dataset_checkin.json").cache()
df_review = spark.read.json("/content/yelp_academic_dataset_review.json").cache()
# df_tip = spark.read.json("/content/yelp_academic_dataset_tip.json").cache()
# df_user = spark.read.json("/content/yelp_academic_dataset_user.json").cache()

In [None]:
# remove missing
key_cols = ['name', 'address', 'categories', 'stars']
df_business_1 = df_business.na.drop(subset = key_cols)
print('% of missing values are {:.2f} %'.format((1 - df_business_1.count() / df_business.count()) * 100))
# remove duplicates
df_business_f = df_business_1.dropDuplicates(['name', 'latitude', 'longitude'])
print('Removed {} duplicated business stores'.format(df_business_1.count() - df_business_f.count()))
# TODO: add text processing for typo, e.g. resturants, restaurant

In [None]:
# get # of business per state
bus_per_state = df_business_f.groupBy('state').count().orderBy('count', ascending=False).toPandas()

# visualized the review count by state
df = px.data.tips()
fig = px.bar(x=bus_per_state['state'], y=bus_per_state['count'], labels={'x':'state', 'y':'count'}, 
             log_y=True, title="Business count per state")
fig.show()


In [None]:
rdd_cat = df_business_f.select(f.split(df_business_f['categories'], ', ')).rdd.flatMap(lambda x: x).filter(lambda x: x is not None).flatMap(lambda x: x).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False)
print('Total number of categories/subcategories are {}'.format(rdd_cat.count()))
print('List categories/subcategories: {}'.format(rdd_cat.collect()))

d = dict(rdd_cat.collect()[0:150])
wordcloud = WordCloud (
                    background_color = 'white',
                    width = 1800,
                    height = 1500
                        ).generate_from_frequencies(d)
plt.figure(figsize=(20,10),facecolor='k' )
 
plt.imshow(wordcloud)
plt.axis('off') # to off the axis of x and y
plt.show()

### Interactive Map Demo
Reference: https://plotly.com/python/figurewidget-app/,

https://towardsdatascience.com/interactive-controls-for-jupyter-notebooks-f5c94829aee6

In [None]:
# create interactive plot
dist_state = df_business_f.select('state').distinct().rdd.map(lambda x: x[0]).collect()
top_cat = rdd_cat.map(lambda x: x[0]).collect()[0:100]

In [None]:
@interact_manual
def interactive_store_map(state=dist_state, stars = np.arange(0, 5.1, 1), categories=top_cat):
  df = df_business_f.filter((df_business_f['stars'] >= stars) & (df_business_f['state'] == state) & 
                            df_business_f['categories'].contains(categories))
  fig = px.scatter_mapbox(df.toPandas(), lat="latitude", lon="longitude", hover_data=["name", "address", "postal_code"], color="stars", size="review_count",
                color_continuous_scale=px.colors.sequential.Rainbow, size_max=40, zoom=9, height=1000) # zoom level: https://wiki.openstreetmap.org/wiki/Zoom_levels
  fig.update_layout(mapbox_style="open-street-map")
  fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
  # TODO: add city UI based on state
  # TODO: add auto zoom-in on map based on long&lat
  # get top 10 by starts and # of reviews
  display('Top 10 recommendations are as below')
  display(df.sort(df.stars.desc(), df.review_count.desc()).drop('attributes', 'business_id', 'latitude', 'longitude').show(10))
  if df_business_f.count() == 0:
    print('No store found based on the criteria')
  py.iplot(fig, filename='Yelp Dataset Visualization')

### Performance comparison

#### Buinsess set (118 MB)

Pyspark

In [None]:
start_time = time.time()
df_business = spark.read.json("/content/yelp_academic_dataset_business.json").cache()
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# remove missing
start_time = time.time()
key_cols = ['name', 'address', 'categories', 'stars']
df_business_1 = df_business.na.drop(subset = key_cols)
print('% of missing values are {:.2f} %'.format((1 - df_business_1.count() / df_business.count()) * 100))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# remove duplicates
start_time = time.time()
df_business_f = df_business_1.dropDuplicates(['name', 'latitude', 'longitude'])
print('Removed {} duplicated business stores'.format(df_business_1.count() - df_business_f.count()))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# calculate categories
start_time = time.time()
rdd_cat = df_business_f.select(f.split(df_business_f['categories'], ', ')).rdd.flatMap(lambda x: x).filter(lambda x: x is not None).flatMap(lambda x: x).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False)
print('Total number of categories/subcategories are {}'.format(rdd_cat.count()))
print('List categories/subcategories: {}'.format(rdd_cat.collect()))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

start_time = time.time()
# create interactive plot
dist_state = df_business_f.select('state').distinct().rdd.map(lambda x: x[0]).collect()
top_cat = rdd_cat.map(lambda x: x[0]).collect()[0:100]
@interact_manual
def interactive_store_map(state=dist_state, stars = [0,1,2,3,4,5], categories=top_cat):
  df = df_business_f.filter((df_business_f['stars'] >= stars) & (df_business_f['state'] == state) & 
                            df_business_f['categories'].contains(categories))
  fig = px.scatter_mapbox(df.toPandas(), lat="latitude", lon="longitude", hover_data=["name", "address", "postal_code"], color="stars", size="review_count",
                color_continuous_scale=px.colors.sequential.Rainbow, size_max=40, zoom=9, height=1000) # zoom level: https://wiki.openstreetmap.org/wiki/Zoom_levels
  fig.update_layout(mapbox_style="open-street-map")
  fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
  # TODO: add city UI based on state
  # TODO: add auto zoom-in on map based on long&lat
  # get top 10 by starts and # of reviews
  display('Top 10 recommendations are as below')
  display(df.sort(df.stars.desc(), df.review_count.desc()).drop('attributes', 'business_id', 'latitude', 'longitude').show(10))
  if df.count() == 0:
    print('No store found based on the criteria')
  py.iplot(fig, filename='Yelp Dataset Visualization')

print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

Pandas

In [None]:
start_time = time.time()
df_business = pd.read_json("/content/yelp_academic_dataset_business.json", lines=True)
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# remove missing
start_time = time.time()
key_cols = ['name', 'address', 'categories', 'stars']
df_business_1 = df_business.dropna(subset = key_cols)
print('% of missing values are {:.2f} %'.format((1 - len(df_business_1)/ len(df_business)) * 100))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# remove duplicates
start_time = time.time()
df_business_f = df_business_1.drop_duplicates(subset = ['name', 'latitude', 'longitude'])
print('Removed {} duplicated business stores'.format(len(df_business_1) - len(df_business_f)))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# calculate categories
start_time = time.time()
cat_list = []
for i in df_business_f['categories'][df_business_f['categories'].notnull()].str.split(', '):
  cat_list += i
dict_cat = Counter(cat_list)
sorted_dict_cat = sorted(dict_cat.items(), key=lambda x: x[1], reverse=True)
print('Total number of categories/subcategories are {}'.format(len(sorted_dict_cat)))
print('Dictionary of categories/subcategories: {}'.format(sorted_dict_cat))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# create interactive plot
start_time = time.time()
dist_state = df_business_f['state'].unique()
top_cat = [i[0] for i in sorted_dict_cat[:100]]
@interact_manual
def interactive_store_map(state=dist_state, stars = [0,1,2,3,4,5], categories=top_cat):
  df = df_business_f[(df_business_f['stars'] >= stars) & (df_business_f['state'] == state) & 
                            (df_business_f['categories'].str.contains(categories))]
  fig = px.scatter_mapbox(df, lat="latitude", lon="longitude", hover_data=["name", "address", "postal_code"], color="stars", size="review_count",
                color_continuous_scale=px.colors.sequential.Rainbow, size_max=40, zoom=9, height=1000) # zoom level: https://wiki.openstreetmap.org/wiki/Zoom_levels
  fig.update_layout(mapbox_style="open-street-map")
  fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
  # TODO: add city UI based on state
  # TODO: add auto zoom-in on map based on long&lat
  # get top 10 by starts and # of reviews
  display('Top 10 recommendations are as below')
  display(df.sort_values(by = ['stars', 'review_count'], ascending=False).drop(columns = ['attributes', 'business_id', 'latitude', 'longitude'])[:10])
  if len(df) == 0:
    print('No store found based on the criteria')
  py.iplot(fig, filename='Yelp Dataset Visualization')
print('Time elapsed is {} second'.format(time.time() - start_time))


#### Review set (6.5 G)

Pyspark

In [None]:
start_time = time.time()
df_review = spark.read.json("/content/yelp_academic_dataset_review.json").cache()
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))
df_review.show(5)
# remove missing
start_time = time.time()
df_review_1 = df_review.na.drop("all")
print('% of missing values are {:.2f} %'.format((1 - df_review_1.count() / df_review.count()) * 100))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

# remove duplicates
start_time = time.time()
df_review_f = df_review_1.dropDuplicates()
print('Removed {} duplicated review stores'.format(df_review_1.count() - df_review_f.count()))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

In [None]:
# # calculate categories
# start_time = time.time()
# rdd_cat = df_review_f.select(f.split(df_review_f['text'], ', ')).rdd.flatMap(lambda x: x).filter(lambda x: x is not None).flatMap(lambda x: x).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False)
# print('Total number of words in review are {}'.format(rdd_cat.count()))
# print('Top 1000 frequent words in review are {}'.format(rdd_cat.take(1000)))
# print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

Pandas

In [None]:
start_time = time.time()
df_review= pd.read_json("/content/yelp_academic_dataset_review.json", lines=True)
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

In [None]:
# ！！！This line of code crashes Colab due to out of RAM issue, need to spin up a larger cluster
# remove missing
start_time = time.time()
df_business_1 = df_business.dropna()
print('% of missing values are {:.2f} %'.format((1 - len(df_business_1)/ len(df_business)) * 100))
print('Time elapsed is {:.2f} second'.format(time.time() - start_time))

Word Cloud for five star reviews

In [None]:
five_star_word_count = df_review.filter(df_review.stars >=5.0).select('text').rdd.filter(lambda x: x is not None).map(lambda x: len(x[0].split())).map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False).collect()
x_only, y_only = zip(*five_star_word_count)
print(x_only)
#visualized the review count by state
import plotly.express as px
fig = px.bar(x=x_only,y=y_only, labels={'x':'five star review length', 'y':'count'})
fig.show()

import nltk
nltk.download('punkt')
import matplotlib.pyplot as plt
from wordcloud import WordCloud

from nltk import word_tokenize

#remove unnecessary key
five_star_analysis = df_review.filter(df_review.stars >=5.0).select('text') \
                      .rdd.filter(lambda x: x is not None).map(lambda x: x[0].split()) \
                      .flatMap(lambda x: x).map(lambda x:x.lower()).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y) \
                      .filter(lambda x:x[1]>10).filter(lambda x: x[0]!='i') \
                      .filter(lambda x: x[0]!='the').filter(lambda x: x[0]!='and') \
                      .filter(lambda x: x[0]!='he').filter(lambda x: x[0]!='she') \
                      .filter(lambda x: x[0]!='a').filter(lambda x: x[0]!='an') \
                      .filter(lambda x: x[0]!='to').filter(lambda x: x[0]!='was') \
                      .filter(lambda x: x[0]!='of').filter(lambda x: x[0]!='is') \
                      .filter(lambda x: x[0]!='in').filter(lambda x: x[0]!='for') \
                      .filter(lambda x: x[0]!='with').filter(lambda x: x[0]!='my') \
                      .filter(lambda x: x[0]!='on').filter(lambda x: x[0]!='it') \
                      .filter(lambda x: x[0]!='they').filter(lambda x: x[0]!='that') \
                        .filter(lambda x: x[0]!='we').filter(lambda x: x[0]!='at') \
                        .filter(lambda x: x[0]!='re').filter(lambda x: x[0]!='this') \
                        .filter(lambda x: x[0]!='you').filter(lambda x: x[0]!='have').collect()

     

sorted_by_second = sorted(five_star_analysis, key=lambda tup: -tup[1])[:150]

x_only, y_only = zip(*sorted_by_second)
print(x_only)
#visualized the review count by state
import plotly.express as px
fig = px.bar(x=x_only,y=y_only, labels={'x':'word', 'y':'count'})
fig.show()



rdd_cat_df = dict(five_star_analysis)


wordcloud = WordCloud (
                    background_color = 'white',
                    width = 1800,
                    height = 1500
                        ).generate_from_frequencies(rdd_cat_df)
plt.figure(figsize=(20,10),facecolor='k' )
 
plt.imshow(wordcloud)
plt.axis('off') # to off the axis of x and y
plt.show()


Word Cloud for One star reviews

In [None]:
one_star_word_count = df_review.filter(df_review.stars == 1.0).select('text').rdd.filter(lambda x: x is not None).map(lambda x: len(x[0].split())).map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending=False).collect()
x_only, y_only = zip(*one_star_word_count)
print(x_only)
#visualized the review count by state
import plotly.express as px
fig = px.bar(x=x_only,y=y_only, labels={'x':'one star review length', 'y':'count'})
fig.show()

import nltk
nltk.download('punkt')
import matplotlib.pyplot as plt
from wordcloud import WordCloud

from nltk import word_tokenize

#remove unnecessary key
one_star_analysis = df_review.filter(df_review.stars == 1.0).select('text') \
                      .rdd.filter(lambda x: x is not None).map(lambda x: x[0].split()) \
                      .flatMap(lambda x: x).map(lambda x:x.lower()).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y) \
                      .filter(lambda x:x[1]>10).filter(lambda x: x[0]!='i') \
                      .filter(lambda x: x[0]!='the').filter(lambda x: x[0]!='and') \
                      .filter(lambda x: x[0]!='he').filter(lambda x: x[0]!='she') \
                      .filter(lambda x: x[0]!='a').filter(lambda x: x[0]!='an') \
                      .filter(lambda x: x[0]!='to').filter(lambda x: x[0]!='was') \
                      .filter(lambda x: x[0]!='of').filter(lambda x: x[0]!='is') \
                      .filter(lambda x: x[0]!='in').filter(lambda x: x[0]!='for') \
                      .filter(lambda x: x[0]!='with').filter(lambda x: x[0]!='my') \
                      .filter(lambda x: x[0]!='on').filter(lambda x: x[0]!='it') \
                      .filter(lambda x: x[0]!='they').filter(lambda x: x[0]!='that') \
                      .filter(lambda x: x[0]!='we').filter(lambda x: x[0]!='at') \
                      .filter(lambda x: x[0]!='re').filter(lambda x: x[0]!='this') \
                      .filter(lambda x: x[0]!='you').filter(lambda x: x[0]!='have').collect()

     

sorted_by_second = sorted(one_star_analysis, key=lambda tup: -tup[1])[:150]

x_only, y_only = zip(*sorted_by_second)
print(x_only)
#visualized the review count by state
import plotly.express as px
fig = px.bar(x=x_only,y=y_only, labels={'x':'word', 'y':'count'})
fig.update_layout(yaxis_range=[0,1000000])

fig.show()



rdd_cat_df = dict(five_star_analysis)


wordcloud = WordCloud (
                    background_color = 'white',
                    width = 1800,
                    height = 1500
                        ).generate_from_frequencies(rdd_cat_df)
plt.figure(figsize=(20,10),facecolor='k' )
 
plt.imshow(wordcloud)
plt.axis('off') # to off the axis of x and y
plt.show()

PMI Analysis for five star reviews

Reference: https://student.cs.uwaterloo.ca/~cs451/assignments.html

log(p(a,b) / ( p(a) * p(b) ))


In [None]:
#we reference the idea of homework of calculating the PMI
#PMI Analysis for five star reviews one token

top50 = df_review.filter(df_review.stars >=5.0).select('text') \
                      .rdd.filter(lambda x: x is not None).map(lambda x: x[0].split()).cache()
words = top50.flatMap(lambda x: x).cache()
total = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b).count()                  
total

word = top50.map(lambda x: list(set(x))).flatMap(lambda line:line) \
          .map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b).map(lambda x:(x[1]/total,x[1], x[0])).sortBy(lambda x: -x[1])
word.take(50)

In [None]:
#PMI Analysis for five star reviews token pairs
import math
import collections
import itertools
# top50 = df_review.filter(df_review.stars >=5.0).select('text') \
#                       .rdd.filter(lambda x: x is not None).map(lambda x: x[0].split()).cache()
# words = top50.flatMap(lambda x: x).cache()
# total = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b).count()                  
# total
threshold = 6000
word = top50.flatMap(lambda line:line).map(lambda word: (word, 1)) \
          .reduceByKey(lambda a,b:a+b).sortBy(lambda x: -x[1]) \
          .collect()
def helper(input_v):
    word1 = collections.Counter(word)
    for i in word1:
        if input_v in i:
            return int(i[1])
count_line_total = total
token_pair = top50
token_pair_all = token_pair.map(lambda x: list(set(x))) \
                  .map(lambda x: list(itertools.permutations(x,2))) \
                  .filter(lambda x: set(x)).flatMap(lambda x: x) \
                  .map(lambda word: (word, 1)).reduceByKey(lambda a,b:a+b).cache()
token_pair_all = token_pair_all.filter(lambda x: x[1] >= threshold) \
                  .map(lambda x: (x[0], math.log10((count_line_total*x[1])/(helper(x[0][0])*helper(x[0][1]))),x[1],helper(x[0][0]),helper(x[0][1])))
token_pair_all.take(50)