# Intialization

Install pyspark

In [None]:
!pip install pyspark



Import libraries

In [None]:
import pyspark
from pyspark.sql import SparkSession
import logging as l

Mount google drive to share data across collaboraters. Initialize data source from google drive.

In [None]:
from google.colab import drive
drive.mount('/content/data')

Drive already mounted at /content/data; to attempt to forcibly remount, call drive.mount("/content/data", force_remount=True).


In [None]:
l.basicConfig(format='%(levelname)s | %(asctime)s | %(message)s', level=l.INFO)

In [None]:
FILESYSTEM = "file://" # localfile needs 'file://' else "hdfs://"
HDFS_NODE = "" # hostname and namenode port, default = 9000


Init spark session and context

In [None]:
spark = SparkSession.builder.appName('local[*]').getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
l.info("-----SPARK SESSION STARTED-----")

INFO | 2021-12-10 23:29:55,212 | -----SPARK SESSION STARTED-----


# Data Loading and Processing

#Yelp business json file that contains details of 160,585 businesses collected across several cities

Read from hdfs/local data file

In [None]:
import time

HDFS_PATH= "/content/data/MyDrive/bdm-data/Yelp/yelp_academic_dataset_business.json"
# if not hdfs, disregard node variable
if "hdfs" not in FILESYSTEM:
    HDFS_NODE = ""

FILE_ACCESS = f"{FILESYSTEM}{HDFS_NODE}{HDFS_PATH}"

l.info(f"Accesing file: {FILE_ACCESS}")

# read from json file
st_time = time.time()
df = spark.read.json(FILE_ACCESS)
end_time = time.time()
l.info("file read complete")
l.info(f"file details - rows: {df.count()}, columns: {len(df.columns)}")
df.printSchema()

In [None]:
l.info("time taken to load file = "+ str(end_time-st_time))

Create a filter to extract business located in USA only

In [None]:
US_STATES_LIST = ['AL','AK','AZ','AR','AS','CA','CO','CT','DE','DC','EM','FL', 'GA', 'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MH', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'MP', 'OH', 'OK', 'OR', 'PW', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'VI', 'WA', 'WV', 'WI', 'WY']
df_r_usa = df.filter(df.categories.contains("Restaurants")).filter(df.state.isin(US_STATES_LIST))
df_r_usa.count()

Create a filter to extract restaurants from all businesses

In [None]:
df_restaurants = df_r_usa.select(df_r_usa.business_id,df_r_usa.name,df_r_usa.address,df_r_usa.postal_code,df_r_usa.state,df_r_usa.city,df_r_usa.latitude,df_r_usa.longitude,df_r_usa.stars,df_r_usa.review_count,df_r_usa.categories)
df_restaurants.printSchema()
df_restaurants.count()


In [None]:
restaurant_business_ids = df_restaurants.select('business_id').rdd.flatMap(lambda x: x).collect()
print(len(restaurant_business_ids))

43256


In [None]:
df_restaurants.groupBy("state").count().show()

+-----+-----+
|state|count|
+-----+-----+
|   OR| 7391|
|   WA|  773|
|   OH| 4377|
|   TX| 5444|
|   GA| 6140|
|   MA|10550|
|   KS|    1|
|   CO|  865|
|   FL| 7710|
|   MN|    1|
|   VA|    1|
|   WY|    1|
|   KY|    1|
|   NH|    1|
+-----+-----+



#Yelp - Review json file that contains details of reviews collected for each business

Read from hdfs/local data file

In [None]:
HDFS_PATH= "/content/data/MyDrive/bdm-data/Yelp/yelp_academic_dataset_review.json"
# if not hdfs, disregard node variable
if "hdfs" not in FILESYSTEM:
    HDFS_NODE = ""

FILE_ACCESS = f"{FILESYSTEM}{HDFS_NODE}{HDFS_PATH}"

l.info(f"Accesing file: {FILE_ACCESS}")

# read from json file
df_review = spark.read.json(FILE_ACCESS)
l.info("file read complete")

INFO | 2021-12-10 10:41:40,319 | Accesing file: file:///content/data/MyDrive/bdm-data/Yelp/yelp_academic_dataset_review.json
INFO | 2021-12-10 10:43:53,156 | file read complete


In [None]:
df_review.printSchema()

Create a filter to extract reviews for resturants only

In [None]:
df_restaurants_review = df_review.filter(df_review.business_id.isin(restaurant_business_ids))

Converting to Pandas Dataframe, as Spark dataframe had performance issues

In [None]:
df_pd = df_restaurants_review.select('*').toPandas()

Aggregating all reviews for a business to a csv with business id as filename and which will also contain tweets scrapped for the particular resturant

In [None]:
data_itr = df_restaurants_review.rdd.toLocalIterator()
count = 0
for row in data_itr:
      f = open(f'/content/data/MyDrive/bdm-data/review-text/{row["business_id"]}.txt', 'a+')
      f.write(row["text"])
      count+=1
      f.close()

#Yelp - Checkin data collected for businesses

Read from hdfs/local data file

In [None]:
HDFS_PATH= "/content/data/MyDrive/bdm-data/Yelp/yelp_academic_dataset_checkin.json"
# if not hdfs, disregard node variable
if "hdfs" not in FILESYSTEM:
    HDFS_NODE = ""

FILE_ACCESS = f"{FILESYSTEM}{HDFS_NODE}{HDFS_PATH}"

l.info(f"Accesing file: {FILE_ACCESS}")

# read from json file
df_checkin = spark.read.json(FILE_ACCESS)
l.info("file read complete")
l.info(f"file details - rows: {df_checkin.count()}, columns: {len(df_checkin.columns)}")

INFO | 2021-12-10 01:17:52,184 | Accesing file: file:///Users/varunsapre/OneDrive/UCR-Fall21/Courses/CS226-BigData/project/walte-big-data/data/yelp_academic_dataset_checkin.json
INFO | 2021-12-10 01:17:52,896 | file read complete
INFO | 2021-12-10 01:17:53,327 | file details - rows: 138876, columns: 2


Create a filter to extract checkin data for only resturants

In [None]:
df_restaurants_checkin = df_checkin.filter(df_checkin.business_id.isin(restaurant_business_ids))

In [None]:
gb = df_restaurants_checkin.rdd.flatMap(lambda x: x).collect()


In [None]:
df_c_itr = df_restaurants_checkin.rdd.toLocalIterator()

#Data Analysis

#Twint Code Space

Download twint and install. Twint can bypass the tweet limit in place for developer account

In [None]:
!rm -r /content/twint/
!git clone --depth=1 https://github.com/twintproject/twint.git
!rm /content/twint/requirements.txt

In [None]:
%%writefile /content/twint/requirements.txt
aiohttp==3.7.0
aiodns
beautifulsoup4
cchardet
dataclasses
elasticsearch
pysocks
pandas>=0.23.0
aiohttp_socks<=0.4.1
schedule
geopy
fake-useragent
googletransx

Writing /content/twint/requirements.txt


In [None]:
!cd /content/twint && pip3 install . -r requirements.txt

#Run twint commands here

**EXPERIMENTAL**

In [None]:
a_1=df_restaurants.collect()


In [None]:
s = f"{a_1[0]['latitude']},{a_1[0]['longitude']},50km,{a_1[0]['name']}"
print(s)

Create csv files containing scrapped tweets for each business, the reviews and the tweets scrapped are stored in the same place.

In [None]:
import twint
import nest_asyncio

nest_asyncio.apply()

c = twint.Config()

for i in range(5):
  c.Search = f"{a_1[i]['name']}"
  c.Near= f"{a_1[i]['city']}"
  c.Limit = 100
  c.Pandas = True
  c.Hide_output = True
  twint.run.Search(c)

  Tweets_df = twint.storage.panda.Tweets_df
  try:
    temp_df = Tweets_df[['id', 'tweet']]
  except KeyError as e:
    print(f"'{c.Search}' error - {e}")

  for row in temp_df.iterrows():
    f = open(f"/content/data/MyDrive/bdm-data/review-text/{a_1[i]['business_id']}.txt", 'a+')
    f.write(row["tweet"])    
    f.close()
    

#Ranking Algorithm

Read from hdfs/local data file

In [None]:
HDFS_PATH= "/content/data/MyDrive/bdm-data/Yelp/restaurants_with_checkin_senti.csv"
# if not hdfs, disregard node variable
if "hdfs" not in FILESYSTEM:
    HDFS_NODE = ""

FILE_ACCESS = f"{FILESYSTEM}{HDFS_NODE}{HDFS_PATH}"

l.info(f"Accesing file: {FILE_ACCESS}")

# read from json file
df_rankingDataFrame=spark.read.format("csv").option("header","true").load(FILE_ACCESS)
l.info("file read complete")
l.info(f"file details - rows: {df_rankingDataFrame.count()}, columns: {len(df_rankingDataFrame.columns)}")
df_rankingDataFrame.printSchema()

Create a filter to extract rows for major 8 cities that was extracted from table by groupBy and sorting by number of businesses.

In [None]:
cities = ["Portland", "Austin", "Atlanta", "Orlando", "Boston", "Columbus", "Kissimmee", "Cambridge", "Boulder"]

Compute values for the dataframe

In [None]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col

#intialize all variables
sumNormalizationCheckinSum = maxNormalizationCheckinMax = minMaxScalingCheckinMin = sumNormalizationReviewCountSum = maxNormalizationReviewCountMax = minMaxScalngReviewCountMin = 0
count = 0

#compute all the value of variables, to calculate their respective rank
for row in df_rankingDataFrame.rdd.toLocalIterator():
  if (isinstance(row["checkin"], str)and isinstance(row["review_count"], str)):
    maxNormalizationCheckinMax = maxNormalizationCheckinMax if (maxNormalizationCheckinMax > int(row["checkin"])) else int(row["checkin"])
    #minMaxScalingCheckinMin = maxNormalizationCheckinMax
    minMaxScalingCheckinMin = minMaxScalingCheckinMin if (minMaxScalingCheckinMin < int(row["checkin"])) else int(row["checkin"])
    sumNormalizationCheckinSum = sumNormalizationCheckinSum + int(row["checkin"])

    maxNormalizationReviewCountMax = maxNormalizationReviewCountMax if (maxNormalizationReviewCountMax > int(row["review_count"])) else int(row["review_count"])
    #minMaxScalngReviewCountMin = maxNormalizationReviewCountMax
    minMaxScalngReviewCountMin = minMaxScalngReviewCountMin if (minMaxScalngReviewCountMin < int(row["review_count"])) else int(row["review_count"])
    sumNormalizationReviewCountSum = sumNormalizationReviewCountSum + int(row["review_count"])
    count+=1

print(minMaxScalingCheckinMin)
print(sumNormalizationCheckinSum)
print(maxNormalizationCheckinMax)
print(sumNormalizationReviewCountSum)
print(maxNormalizationReviewCountMax)
print(minMaxScalngReviewCountMin)
print(count)

Compute the final score for each row for different normalization used.

In [None]:
count = 0
import math

#computing and assigning ranking score to each row
for row in df_rankingDataFrame.rdd.toLocalIterator():
  if (isinstance(row["checkin"], str) and isinstance(row["review_count"], str)):
 
    maxNormalizationCheckinRank = int(row["checkin"]) / maxNormalizationCheckinMax
    sumNormalizationCheckinRank = int(row["checkin"]) / sumNormalizationCheckinSum
    minMaxScalingCheckinRank = (int(row["checkin"]) - minMaxScalingCheckinMin) / maxNormalizationCheckinMax - minMaxScalingCheckinMin
    
    #Uncomment following statements to add individual scores
    """
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationCheckin", when( col("business_id") == row["business_id"], maxNormalizationCheckinRank))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationCheckin", when( col("business_id") == row["business_id"], sumNormalizationCheckinRank))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinCheckin", when( col("business_id") == row["business_id"], minMaxScalingCheckinRank))
    """
    maxNormalizationReviewCountRank = int(row["checkin"]) / maxNormalizationReviewCountMax
    sumNormalizationReviewCountRank = int(row["checkin"]) / sumNormalizationReviewCountSum
    minMaxScalngReviewCountRank = (int(row["checkin"]) - minMaxScalngReviewCountMin) / maxNormalizationReviewCountMax - minMaxScalngReviewCountMin

    #Uncomment following statements to add individual scores
    """
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationReviewCount", when( col("business_id") == row["business_id"], maxNormalizationReviewCountRank))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationReviewCount", when( col("business_id") == row["business_id"], sumNormalizationReviewCountRank))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinReviewCount", when( col("business_id") == row["business_id"], minMaxScalngReviewCountRank))
    """

    finalMaxNormalizationScore = (math.log(maxNormalizationReviewCountRank) if (maxNormalizationReviewCountRank > 0) else 0)  + (math.log(maxNormalizationCheckinRank) if (maxNormalizationCheckinRank > 0) else 0) 
    finalSumNormalizationScore = (math.log(sumNormalizationReviewCountRank) if (sumNormalizationReviewCountRank > 0) else 0)  + (math.log(sumNormalizationCheckinRank) if (sumNormalizationCheckinRank > 0) else 0) 
    finalMinMaxScalingScore = (math.log(minMaxScalngReviewCountRank) if (minMaxScalngReviewCountRank > 0) else 0)  + (math.log(minMaxScalingCheckinRank) if (minMaxScalingCheckinRank > 0) else 0) 
    
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationScore", when( col("business_id") == row["business_id"], finalMaxNormalizationScore))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationScore", when( col("business_id") == row["business_id"], finalSumNormalizationScore))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinReviewCountScore", when( col("business_id") == row["business_id"], finalMinMaxScalingScore))
    
    count += 1
    print(count)

  else:
    
    #Uncomment following statements to add individual scores
    """
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationCheckin", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationCheckin", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinCheckin", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationReviewCount", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationReviewCount", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinReviewCount", when( col("business_id") == row["business_id"], 0))
    """
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMaxNormalizationScore", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankSumNormaizationScore", when( col("business_id") == row["business_id"], 0))
    df_rankingDataFrame = df_rankingDataFrame.withColumn("rankMinMaxScalinReviewCountScore", when( col("business_id") == row["business_id"], 0))
    