In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pwd

In [None]:
!cat /proc/meminfo

In [None]:
!ls -lh ../input/spark311/spark-3.0.1-bin-hadoop3.2\ \(1\).tgz

In [None]:
!ls /kaggle

In [None]:
!tar -xzf ../input/spark311/spark-3.0.1-bin-hadoop3.2\ \(1\).tgz -C .

In [None]:
!pip install findspark

In [None]:
!ls -lh ../input/findspark/

In [None]:
!pip install ../input/findspark/findspark-2.0.1-py2.py3-none-any.whl

In [None]:
!which java
!ls -lh /usr/bin/java
!ls -lh /etc/alternatives/java

In [None]:
import os
import sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/kaggle/working/spark-3.0.1-bin-hadoop3.2"
spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()

import pyspark
number_cores = 4
memory_gb = 2

conf = (pyspark.SparkConf().setMaster('local[{}]'.format(number_cores)).set('spark.driver.memory', '{}g'.format(memory_gb)))

sc = pyspark.SparkContext(conf=conf)

In [None]:
user = sc.textFile("../input/yelp-dataset/yelp_academic_dataset_user.json")
review = sc.textFile("../input/yelp-dataset/yelp_academic_dataset_review.json")
business = sc.textFile("../input/yelp-dataset/yelp_academic_dataset_business.json")

**<font size="5">The Assignment (Part 1):</font>**

Identify 100 users with highest number of ratings/fans.  

Extract the reviews of these users and combine it with the business information. Are they eating across multiple metropolitans? Is there a preference in restaurant/food style of their reviews? Can you infer the locations of these users?

**<font size="5">Step 1:</font>**

The first step was to find the top 100 users. This was done by taking a map of 'user_id' and 'fans' from all users in the dataset and then sorting the top 100 from highest to lowest.

In [None]:
from pyspark.sql import SparkSession
import json
spark = SparkSession.builder.appName('demo').master("local").enableHiveSupport().getOrCreate()
uj = spark.read.json("../input/yelp-dataset/yelp_academic_dataset_user.json")
rj = spark.read.json("../input/yelp-dataset/yelp_academic_dataset_review.json")
bj = spark.read.json("../input/yelp-dataset/yelp_academic_dataset_business.json")

In [None]:
user_json = user.map(lambda x: json.loads(x))
review_json = review.map(lambda x: json.loads(x))
business_json = business.map(lambda x: json.loads(x))

In [None]:
userRatings = user_json.map(lambda x: ("User ID: " + x['user_id'], x['fans']))
uRatings = userRatings.takeOrdered(100, key = lambda x: -x[1])

Now that I had the top 100 users, I really only need their user IDs. Now that the user ids were put into a list, I could filter all of the reviews through it.

In [None]:
userIDs = [i[0] for i in uRatings]

In [None]:
reviews = review_json.map(lambda x: (x['business_id'], "User ID: " + x['user_id']))
businessInfo = business_json.map(lambda x: (x['business_id'], (x['city'] + ", " + x['state'], x['categories'])))

In [None]:
totalInfo = reviews.filter(lambda x: x[1] in userIDs)

**<font size="5">Step 2:</font>**

The next step was to combine our filtered reviews with the business dataset. This was done by using the join function which takes in <Key1, Value1> and <Key1, Value2> and returns <Key1, (Value 1 and Value 2)>. I used the business_id from reviews and businessInfo as the key, and the values ended up being user ID, location (city,state), and categories.

In [None]:
test = totalInfo.join(businessInfo)

In [None]:
test.take(1)

Right now, the info is in the form (Business ID, (User ID, (Location, Categories))). It isnt too useful in this format, so I decided that it would be best if it was put in the form of (User ID, (Business ID, Location, Categories))

In [None]:
temp = test.map(lambda x: (x[1][0], (x[0], x[1][1][0], x[1][1][1])))
temp.take(1)

The next thing to do is group all reviews and business information to the respective users. As you can see, before grouping by key, the count is pretty high.

In [None]:
temp.count()

In [None]:
newTemp = temp.groupByKey().mapValues(list)

In [None]:
newTemp.count()

**<font size="5">Step 3:</font>**

Step 3 is broken up into two different parts: 

The first part is to create a method that grabs each individual business location and adds a count to it.

The second step, similar to the first, is to create a method that splits up the broad overall categories into individual categories and then add a count to each one.

I would like to point out that these two methods wouldn't be very effective if we were exploring a large sample group as opposed to just the top 100. But, given there were only 100 users and around 13,000 reviews combined between them, it made the below methods doable.

In [None]:
from collections import Counter

def locationCounter(a):
    l = [elem[1] for elem in a[1]]
    d = list(Counter(l).items())
    return d

def categoryCounter(a):
    for elem in a[1]:
        if elem[2] is None:
            return
    l = [elem[2].split(", ") for elem in a[1]]
    d = tuple(Counter(x for xs in l for x in set(xs)).items())
    return d

In [None]:
trythis = newTemp.map(lambda x: (x[0], locationCounter(x))) #Gets count of each business location per user id

Here, we can see the count for each location for each user. I think this is pretty useful in finding out what metropolitan area users live in. For example, user "0G-QF457q_0Z_jKqh6xWiA" has 548 reviews in the New Orleans, LA area while the second most visited area is Nashville, TN with only 32 reviews. Based on this information, we can assume that they most likely live in New Orleans, and this method can be applied to almost all of the top users. One last observation is that some users have a large variety of locations indicating that they eat in many different cities.

In [None]:
trythis.take(100)

Now we can see the count for all categories for each user. This is fairly useful in finding out if a user has a certain preference. An example would be user "0G-QF457q_0Z_jKqh6xWiA". Although a few categories have high counts, "American (New)" category ranks the highest for them with seventy-one occurences. I would say one drawback to my method is that there isnt a great way to run a takeOrdered function on them to see the highest ranking categories. Additionally, there may be extraneous categories that aren't very useful like "Restaurants" or "Food". 

In [None]:
types = newTemp.map(lambda x: (x[0], categoryCounter(x))) #Shows count of each category for each user

In [None]:
types.take(100)

**<font size="5">The Assignment (Part 2):</font>**

Identify one of your favorite restaurants that is available on Yelp. Search for all reviews and reviewers for this restaurants. 

Is this restaurant frequented by non-local reviewers (how do you know)?

What are the positive things about this restaurant (study higher-rated reviews)

What are the negative things about this restaurant (study lower-rated reviews)

Right off the bat, my strategy is going to be the opposite of what I did to find user info as the question is basically asking us to work backwards.

In [None]:
bizness = business_json.map(lambda x: (x['business_id'], x['name'], (x['city'] + ", " + x['state'], x['review_count'])))

In [None]:
bizness.take(5)

In [None]:
biz = bizness.filter(lambda x: 'Las Fridas Mexican Kitchen' in x)

In [None]:
lasFridas = biz.take(1)
biz.take(1)

Now that I have my favorite restaurant's business ID, I can turn it into a list object that reviews can be filtered through.

In [None]:
lf = [i[0] for i in lasFridas]


In [None]:
ur = user_json.map(lambda x: ("User ID: " + x['user_id']))
rs = review_json.map(lambda x: (x['business_id'], "User ID: " + x['user_id']))

In [None]:
filteredRev = rs.filter(lambda x: x[0] in lf)

In [None]:
filteredRev.count()

In [None]:
revNames = filteredRev.take(22)

In [None]:
rev = [i[1] for i in revNames]
rev

So the first thing we want to do is find out if the restaurant is frequented by non locals. The way I want to do that is exactly how I solved the first part of this assignment: take the list of twenty-two users, run all of their individual reviews through reviews dataset and match it with business data set. Then, obtain their location by running the review/business info through our previously made function. We already have the user IDs so the rest should be fairly simple.

In [None]:
loc = reviews.filter(lambda x: x[1] in rev)

In [None]:
loc1 = loc.join(businessInfo)

In [None]:
loc2 = loc1.map(lambda x: (x[1][0], (x[0], x[1][1][0], x[1][1][1])))
loc2.take(1)

In [None]:
loc3 = loc2.groupByKey().mapValues(list)

In [None]:
loc3.count()

Twenty-two users, perfect! On the right track - again, simply just following our previous steps for part one of the assignment.

In [None]:
loc4 = loc3.map(lambda x: (x[0], locationCounter(x))) #Gets count of each business location per user id

In [None]:
loc4.take(22)

As expected, most of the reviewers live locally with most of them being from Philadelphia, PA and Ambler, PA. I doubt non-locals would be going to a hole-in-the-wall mexican restaurant in a shady shopping center. Regardless, the food is fantastic. 

Now, the final steps are to study the high and low reviews. While there are only twenty-two reviews, and it would be easy just to go over them manually, for the sake of automation I will use the previously unused "star" category to analyze the top highest reviews and the bottom reviews.

In [None]:
review_stuff = review_json.map(lambda x: (x['business_id'], x['stars'], x['text']))

In [None]:
allReviews = review_stuff.filter(lambda x: x[0] in lf)

First things first, lets grab the top five highest rated reviews:

In [None]:
highReviews = allReviews.takeOrdered(5, key=lambda x: -x[1])

Now, lets grab the bottom five reviews:

In [None]:
lowReviews = allReviews.takeOrdered(5, key=lambda x: x[1])

The function below was copied from https://stackoverflow.com/a/63819393. It basically uses TextBlob to find all Positive, Negative, and Neutral words in a given set. I thought it would be interesting to see how it works when the top and bottom reviews are fed into it. I did comment out the neutral column as that one is not as important as the other two. 

In [None]:
from textblob import TextBlob


def word_polarity(test_subset):

    pos_word_list=[]
    neu_word_list=[]
    neg_word_list=[]

    for word in test_subset:               
        testimonial = TextBlob(word)
        if testimonial.sentiment.polarity >= 0.5:
            pos_word_list.append(word)
        elif testimonial.sentiment.polarity <= -0.5:
            neg_word_list.append(word)
        else:
            neu_word_list.append(word)

    print('Positive :',pos_word_list)        
    #print('Neutral :',neu_word_list)    
    print('Negative :',neg_word_list)    

In [None]:
from collections import Counter
badReviews = ""
goodReviews = ""
for i in lowReviews:
    badReviews = badReviews + i[2]

for i in highReviews:
    goodReviews = goodReviews + i[2]

**Low reviews:**

In [None]:
word_polarity(badReviews.split())

**High reviews:**

In [None]:
word_polarity(goodReviews.split())

When analyzing the positive and negative words, it is interesting to see that even the bottom five reviews are mostly positive. There were only two negative words overall. It is nice to see that most of the reviews were praising this place.