In [4]:
import json
import pandas as pd
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col,lower,current_date, expr
import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, desc
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from langdetect import detect
from pyspark.sql.types import StringType
import langid
import spacy
from spacy_langdetect import LanguageDetector

In [5]:
spark = SparkSession.builder.getOrCreate()

23/12/14 17:58:36 WARN Utils: Your hostname, CelinedeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.100.25 instead (on interface en0)
23/12/14 17:58:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 17:58:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 1. Data Cleaning

## a. Business Dataset

In [6]:
# Read the dataset in Spark
business = spark.read.json('/Users/celine/Desktop/5430nlp/group/yelp_dataset/yelp_academic_dataset_business.json')

# Count the number of rows in the DataFrame
num_rows = business.count()

# Print the number of rows
print("Number of rows in business: {}".format(num_rows))

23/12/14 17:58:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Number of rows in business: 150346


In [7]:
business.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{NULL, NULL, NULL...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...|Santa Barbara|                NULL|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{NULL, NULL, NULL...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|       Affton|{8:0-18:30, 0:0-0...|      1| 38.551126|  -90.335695|    

In [8]:
# Drop Null Values
business_without_na = business.na.drop()
num_rows_without_na = business_without_na.count()
print("Number of rows after dropping null values: {}".format(num_rows_without_na))

# Drop duplicates
business_clean = business_without_na.dropDuplicates()
num_rows_clean = business_clean.count()
print("Number of rows after dropping duplicates: {}".format(num_rows_clean))

                                                                                

Number of rows after dropping null values: 117618




Number of rows after dropping duplicates: 117618


                                                                                

In [9]:
# Select the business that is open
business_open = business_clean.filter(col('is_open') == 1)
num_rows_business_open = business_open.count()
print("Number of rows for open business: {}".format(num_rows_business_open))

# Select the data that is Restaurants
restaurants = business_open.filter(col('categories').contains('Restaurants') | col('categories').contains('Food'))
num_rows_restaurants = restaurants.count()
print("Number of rows for restaurants: {}".format(num_rows_restaurants))

                                                                                

Number of rows for open business: 94976
Number of rows for restaurants: 39448


In [10]:
# Group by "city" and count the number of unique business_ids and sum the review_count in each city
cityBusinessCountDF = restaurants.groupBy("city").agg(count("business_id").alias("business_count"))

# Order the result in descending order
sortedCityBusinessCountDF = cityBusinessCountDF.orderBy(desc("business_count"))

# Show the result
sortedCityBusinessCountDF.show(5)

+------------+--------------+
|        city|business_count|
+------------+--------------+
|Philadelphia|          3729|
|       Tampa|          2266|
|Indianapolis|          2083|
|      Tucson|          1889|
|   Nashville|          1829|
+------------+--------------+
only showing top 5 rows



In [11]:
# We select the city with the most businesses to be our target city
restaurants_in_Philadelphia = restaurants.filter(col("city") == "Philadelphia")

In [12]:
restaurants_in_Philadelphia.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|        city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|6153 Ridge Ave, U...|{NULL, NULL, NULL...|F6hASqb_Z5ASXz0-p...|Flowers & Gifts, ...|Philadelphia|{8:0-18:0, 8:0-18...|      1|    40.035169|    -75.216409| Edible Arrangements|      19128|          10|  3.5|   PA|
|226 W Rittenhouse...|{NULL, NULL, NULL...|K9gaM9XdGqGWsQMdj...|Coffee & Tea, Foo...|Philadelphia|{0:0-0:0, 0:0-0:0...|      1|39.9492424097

## b. Review Dataset

#### 1. Read data

In [13]:
# Read the dataset in Spark
review = spark.read.json('/Users/celine/Desktop/5430nlp/group/yelp_dataset/yelp_academic_dataset_review.json')

# Count the number of rows in the DataFrame
num_rows = review.count()

# Print the number of rows
print("Number of rows in reviews: {}".format(num_rows))

23/12/14 17:58:48 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

Number of rows in reviews: 6990280


                                                                                

In [14]:
review.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

#### 2. Drop null & duplicate values

In [15]:
# Drop Null Values
review_without_na = review.na.drop()
num_rows_review_without_na = review_without_na.count()
print("Number of rows after dropping null values: {}".format(num_rows_review_without_na))

# Drop duplicates
review_clean = review_without_na.dropDuplicates()
num_rows_review_clean = review_clean.count()
print("Number of rows after dropping duplicates: {}".format(num_rows_review_clean))


                                                                                

Number of rows after dropping null values: 6990280


23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/12/14 17:58:59 WARN RowBasedKeyValueBatch: Calling spill() on

Number of rows after dropping duplicates: 6990280


                                                                                

#### 3. Rename a column to avoid ambiguity

In [16]:
# Rename the star column to avoid confusion with the star column in business dataset
review_final = review.withColumnRenamed('stars', 'customer_review_stars')
review_final.show(5)

+--------------------+----+-------------------+-----+--------------------+---------------------+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|customer_review_stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+---------------------+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|                  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|                  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|                  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|                  5.0|Wow!  Yummy, diff...|     1|

#### 4. Observe the timespan for our review dataset

In [17]:
# Find the maximum and minimum dates
date_stats = review_final.agg(expr("max(date)").alias("max_date"), expr("min(date)").alias("min_date"))

# Show the maximum and minimum dates
date_stats.show(truncate=False)



+-------------------+-------------------+
|max_date           |min_date           |
+-------------------+-------------------+
|2022-01-19 19:48:45|2005-02-16 03:23:22|
+-------------------+-------------------+



                                                                                

#### 5. Select reviews made in recent 3 years

In [18]:
# Convert the 'date' column to a DateType
review_final = review_final.withColumn("date", col("date").cast("date"))

# Filter the DataFrame for the recent 3 years (of the dataset)
recent_3_years_df = review_final.filter(col("date") >= (expr("date '2022-01-19'") - expr("INTERVAL 3 YEARS")))

recent_3_years_df.count()

                                                                                

2068468

In [19]:
# Find the maximum and minimum dates
date_stats2 = recent_3_years_df.agg(expr("max(date)").alias("max_date"), expr("min(date)").alias("min_date"))

# Show the maximum and minimum dates
date_stats2.show(truncate=False)



+----------+----------+
|max_date  |min_date  |
+----------+----------+
|2022-01-19|2019-01-19|
+----------+----------+



                                                                                

## c. Join Two Datasets

#### 1.Combine Datasets

In [21]:
df = restaurants_in_Philadelphia.join(recent_3_years_df, on='business_id', how='inner')
print("Number of rows after merge: {}".format(df.count()))



Number of rows after merge: 135433


                                                                                

In [22]:
df.show(5)



+--------------------+---------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+------------------+-----------+------------+-----+-----+----+----------+-----+--------------------+---------------------+--------------------+------+--------------------+
|         business_id|        address|          attributes|          categories|        city|               hours|is_open|  latitude|  longitude|              name|postal_code|review_count|stars|state|cool|      date|funny|           review_id|customer_review_stars|                text|useful|             user_id|
+--------------------+---------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+------------------+-----------+------------+-----+-----+----+----------+-----+--------------------+---------------------+--------------------+------+--------------------+
|RI33oswGDkIsc0fuQ...| 2654 S 10th St|{NULL, NULL, u

                                                                                

## d. English Only

In [23]:
# Function to detect language using langid
def detect_language(text):
    lang, _ = langid.classify(text)
    return lang

language_detector = udf(detect_language, StringType())

# Apply the detect_language function to the 'text' column
df = df.withColumn('language', language_detector(F.col('text')))

# Filter rows where the language is English ('en')
english_df = df.filter(F.col('language')=='en')


english_df.count()

                                                                                

135230

## e. Businesses more than 10 reviews 

In [31]:
counts_df = english_df.groupBy('business_id').count()
filtered_counts_df = counts_df.filter(F.col('count') > 10)
more_than_10_df = english_df.join(filtered_counts_df, 'business_id')
more_than_10_df.show()


[Stage 82:=>(22 + 8) / 40][Stage 84:>   (0 + 0) / 2][Stage 85:>   (0 + 0) / 2]0]

In [10]:
more_than_10_df.to_json('/Users/celine/Desktop/5430nlp/group/English_Only_More_than_10_reviews.json', orient='records', lines=True)