# SIADS 516 Homework 4: Spark SQL
Version 1.0.20200221.1
### Dr. Chris Teplovs, School of Information, University of Michigan
<small><a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a>This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

This homework assignment uses the Yelp Academic dataset, with which you should now be familiar.
We have created a few cells to get you started, but you're largely on your own to devise solutions to the
"real-world" questions below.

The best solutions will use spark.sql() calls as a preferred way to query the dataset and also use the fewest number of steps.  For example, to find the answer to "How many users have more than 100 "cool" votes?", this:
```
query = """
SELECT count(*) FROM user WHERE cool > 100
"""
spark.sql(query).show()
```
is preferable to:
```
user.filter('cool > 100').show()
```
or 
```
query = """
SELECT * FROM user
"""
df = spark.sql(query)
df.filter('cool > 100').show()
```
(Note that the last number is somewhat ridiculous.)

Our usual Spark mantra:

In [107]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

In [108]:
# Import other required packages for the questions
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import *

import re
import string

Load the JSON files:

In [109]:
business = spark.read.json('data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
review = spark.read.json('data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('data/yelp_academic/yelp_academic_dataset_user.json.gz')

Create temp views for the DataFrames:

In [110]:
business.createOrReplaceTempView("business")
checkin.createOrReplaceTempView("checkin")
tip.createOrReplaceTempView("tip")
review.createOrReplaceTempView("review")
user.createOrReplaceTempView("user")

## Q1. How many users have more than 500 fans?

In [61]:
#code block created to inspect elements
# user.printSchema()
# business.printSchema()
# business.show()
# review.printSchema()
# review.show()
# tip.printSchema()
# tip.show()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [95]:
# insert your code here
res1 = spark.sql("SELECT COUNT(*) AS user_count FROM user WHERE fans > 500")
q1_ans = res1.head()[0]

In [6]:
q1_ans

185

## Q2. How many businesses from Madison, Wisconsin are represented in the dataset?

In [7]:
# insert your code here
res2 = spark.sql("SELECT COUNT(*) AS biz_count FROM business WHERE LOWER(city) = LOWER('Madison') AND state = 'WI' ")
q2_ans = res2.head()[0]

In [8]:
q2_ans

3494

## Q3: How many users have more than 500 fans?

In [None]:
# insert your code here

## Q4: Which US states are represented in the data set?  Use full names of states (you will need to look up the list of state abbreviations is you don't know them).

In [9]:
us_states_abbr = ["AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY"]
us_states_dict = {"AL":"Alabama","AK":"Alaska","AZ":"Arizona","AR":"Arkansas","CA":"California","CO":"Colorado","CT":"Connecticut","DE":"Delaware","FL":"Florida","GA":"Georgia","HI":"Hawaii","ID":"Idaho","IL":"Illinois","IN":"Indiana","IA":"Iowa","KS":"Kansas","KY":"Kentucky","LA":"Louisiana","ME":"Maine","MD":"Maryland","MA":"Massachusetts","MI":"Michigan","MN":"Minnesota","MS":"Mississippi","MO":"Missouri","MT":"Montana","NE":"Nebraska","NV":"Nevada","NH":"New Hampshire","NJ":"New Jersey","NM":"New Mexico","NY":"New York","NC":"North Carolina","ND":"North Dakota","OH":"Ohio","OK":"Oklahoma","OR":"Oregon","PA":"Pennsylvania","RI":"Rhode Island","SC":"South Carolina","SD":"South Dakota","TN":"Tennessee","TX":"Texas","UT":"Utah","VT":"Vermont","VA":"Virginia","WA":"Washington","WV":"West Virginia","WI":"Wisconsin","WY":"Wyoming"}

In [10]:
# insert your code here
res4 = spark.sql("SELECT DISTINCT state AS state_abbr FROM business")
res4_filtered = res4.where((col("state_abbr").isin(us_states_abbr)))
res4_pd = res4_filtered.toPandas()

In [11]:
res4_pd["State_Name"] = res4_pd["state_abbr"].map(us_states_dict)
q4_ans = res4_pd["State_Name"].tolist()
q4_ans

['Arizona',
 'South Carolina',
 'New Jersey',
 'Virginia',
 'Nevada',
 'Wisconsin',
 'California',
 'Nebraska',
 'Connecticut',
 'North Carolina',
 'Vermont',
 'Illinois',
 'Washington',
 'Alabama',
 'Ohio',
 'Tennessee',
 'New Mexico',
 'Pennsylvania',
 'New York',
 'Texas',
 'Georgia',
 'Florida',
 'Alaska',
 'Arkansas',
 'Utah']

## Q5: What is the text of the funniest review?

In [25]:
# insert your code here
r5_query = """
SELECT funny AS funny_ratings, text 
FROM review 
WHERE funny = (SELECT MAX(funny) FROM review)
"""
res5 = spark.sql(r5_query)
q5_ans = res5.head()[1]

In [26]:
q5_ans

"Flew to Arizona a few months ago to try this. Disappointed that this place closed after Gordon Ramsay's return. Their food was one of the best food I had in my whole entire life!! Their caesar salad was great, sauce is very good. Their pizza was THE BEST I HAD IN MY WHOLE ENTIRE LIFE. From the garlic crust to the extremely cheesy topping, the pizza was absolutely outstanding. Wish I could come back but its now closed."

## Q6: Which review(s) has/have the most words?  What do you notice about the maximum word count?

In [104]:
columns = ["Seqno","Name"]
data = [("1000", "john jones john jones john jones john jones john jones"),
    ("299", "tracey smith"),
    ("30", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

def text_count_split(text):
    return len(text.split()), text


+-----+------------------------------------------------------+
|Seqno|Name                                                  |
+-----+------------------------------------------------------+
|1000 |john jones john jones john jones john jones john jones|
|299  |tracey smith                                          |
|30   |amy sanders                                           |
+-----+------------------------------------------------------+



In [105]:
# insert your code here
# Here I apply word count functions with the conventional .split() method and regular expression.

def text_count_split(text):
    return len(text.split())

def text_count_regex(text):
    return len(re.findall(r'[\w]+', text))

def text_count_sumstrip(text):
    return sum([i.strip(string.punctuation).isalpha() for i in
text.split()])


udf_text_count1 = F.udf(lambda x: text_count_split(x), IntegerType())
udf_text_count2 = F.udf(lambda x: text_count_regex(x), IntegerType())
udf_text_count3 = F.udf(lambda x: text_count_sumstrip(x), IntegerType())

# After the first iteration I have noticed that the word count capped at 999. When I inspected the corresponding DF it shows the collumn datatype was string, the I looked it up and found out we should also assign the default datatype when registering the udf function for SQL.


spark.udf.register('udf_text_count1',text_count_split, IntegerType())
spark.udf.register('udf_text_count2',text_count_regex, IntegerType())
spark.udf.register('udf_text_count3',text_count_sumstrip, IntegerType())

r6_query_1 = """
SELECT udf_text_count1(text) AS word_count, text
FROM review 
ORDER BY word_count DESC
"""

r6_query_2 = """
SELECT udf_text_count2(text) AS word_count, text
FROM review 
ORDER BY word_count DESC
"""


r6_query_3 = """
SELECT udf_text_count3(text) AS word_count, text
FROM review 
ORDER BY word_count DESC
"""

res6_1 = spark.sql(r6_query_1)
res6_2 = spark.sql(r6_query_2)


In [111]:
res6_1.show()
res6_2.show()

+----------+--------------------+
|word_count|                text|
+----------+--------------------+
|      1056|They asked me to ...|
|      1052|I made my appoint...|
|      1051|CAUTION!!!!!! DO ...|
|      1051|Where do I even s...|
|      1049|If I could negati...|
|      1049|Should have come ...|
|      1041|This place is HOR...|
|      1041|This place was on...|
|      1034|We stayed here th...|
|      1031|i have never been...|
|      1029|So I walked into ...|
|      1029|I had the worst e...|
|      1028|NOW THAT'S A KNIF...|
|      1028|Our experience he...|
|      1027|I originally went...|
|      1026|I have lived ther...|
|      1025|The reason I'm gi...|
|      1025|If i could i woul...|
|      1025|After my horrible...|
|      1024|This is Thomas Sl...|
+----------+--------------------+
only showing top 20 rows

+----------+--------------------+
|word_count|                text|
+----------+--------------------+
|      1097|Went in Sunday 12...|
|      1083|They asked

In [108]:
# Assuming that we should use the .split() method, as [\w]+ in regex could not take care of the words with apostrophes:

q6_ans = res6_1.head()[1]
q6_ans

"They asked me to bring my dog back after they Killed her & hoped I had a great visit..!! They are so on organized and thoughtless to ask me to bring my dog in after the killer and they charged me twice the kill my dog.  I never finished telling how they killed my dog they kept my dog overnight because they were closing,I only went there to get her yr. shots but they were closing & said keep her overnight & we will give u the blood results in the Morning,I paid $260 for the visit & Blood they call me at 6am in the morning & tell me they won't give me the results unless I come down & give them a credit card so I just gave them my card over the phone & signed a form of any problems they can take care of it from X-Rays to pills,so now they have my card & say my dog has a temp. Of 104* news to me..!! She was perfect going in..!! So they said she had a blood infection & need to keep her overnight again to put her on IV, the next day the Vet calls & said come get your dog rt. Now for I'm the

## Q7: What are the names of the top 10 users who provided the most tips?

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
    

In [56]:
# insert your code here

r7_query = """
SELECT tip.user_id, user.name, COUNT(tip.user_id) AS tips_count
FROM tip 
LEFT JOIN user
ON tip.user_id = user.user_id
GROUP BY tip.user_id, user.name
ORDER BY tips_count DESC
"""

res7_1 = spark.sql(r7_query)
q7_ans = res7_1.limit(10)

In [57]:
#Showing the top 10 users with the most count user id in tip dataset
q7_ans.show()

+--------------------+--------+----------+
|             user_id|    name|tips_count|
+--------------------+--------+----------+
|mkbx55W8B8aPLgDqe...|    Momo|      2439|
|CxDOIDnH8gp9KXzpB...|Jennifer|      1598|
|6ZC-0LfOAGwaFc5XP...|Samantha|      1509|
|0tvCcnfJnSs55iB6m...|  Daniel|      1376|
|eZfHm0qI8A_HfvXSc...|Christie|      1352|
|O8eDScRAg6ae0l9Bc...|     May|      1255|
|8DGFWco9VeBAxjqsu...|   Kurdy|      1178|
|WJKocp9RE0KatUwh3...| Anthony|      1161|
|2EuPAGalYnP7eSxPg...| Shirley|      1154|
|QPJJohtGqkMkaN0Gt...| Cherrie|      1017|
+--------------------+--------+----------+



## Q8: List the names, number of reviews of businesses in Arizona and total number of reviews of the top 5 users (as determined by who has created the most number of reviews of businesses in Arizona).  Include a column that shows the percentage of reviews that are of businesses from Arizona.  The first row of the results should be:
```
+--------+--------+-----------+---------+
|    name|az_count|total_count|  percent|
+--------+--------+-----------+---------+
|    Brad|    1637|       1642|99.695496|
+--------+--------+-----------+---------+
```

### who has created the most number of reviews of businesses in Arizona?

count the user ids in reivew, then join the business table where business's state = "AZ", group by username and user id, order by count of user_id descending and limt to 5.

#### user schema
root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)


#### review schema
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

#### business schema

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: string (nullable = true)
 |    |-- GoodForKids: string (nullable = true)
 |    |-- GoodForMeal: string (nullable = true)
 |    |-- HairSpecializesIn: string (nullable = true)
 |    |-- HappyHour: string (nullable = true)
 |    |-- HasTV: string (nullable = true)
 |    |-- Music: string (nullable = true)
 |    |-- NoiseLevel: string (nullable = true)
 |    |-- Open24Hours: string (nullable = true)
 |    |-- OutdoorSeating: string (nullable = true)
 |    |-- RestaurantsAttire: string (nullable = true)
 |    |-- RestaurantsCounterService: string (nullable = true)
 |    |-- RestaurantsDelivery: string (nullable = true)
 |    |-- RestaurantsGoodForGroups: string (nullable = true)
 |    |-- RestaurantsPriceRange2: string (nullable = true)
 |    |-- RestaurantsReservations: string (nullable = true)
 |    |-- RestaurantsTableService: string (nullable = true)
 |    |-- RestaurantsTakeOut: string (nullable = true)
 |    |-- Smoking: string (nullable = true)
 |    |-- WheelchairAccessible: string (nullable = true)
 |    |-- WiFi: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- hours: struct (nullable = true)
 |    |-- Friday: string (nullable = true)
 |    |-- Monday: string (nullable = true)
 |    |-- Saturday: string (nullable = true)
 |    |-- Sunday: string (nullable = true)
 |    |-- Thursday: string (nullable = true)
 |    |-- Tuesday: string (nullable = true)
 |    |-- Wednesday: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)

In [111]:
# insert your code here
r8_query_1 = """
SELECT user.name AS name, COUNT(review.user_id) AS az_count, review.user_id, business.state
FROM business
INNER JOIN review
ON business.business_id = review.business_id
INNER JOIN user
ON review.user_id = user.user_id
GROUP BY review.user_id, user.name, business.state
HAVING business.state = 'AZ'
ORDER BY az_count DESC
LIMIT 5
"""

r8_query_2 = """
SELECT COUNT(review.user_id) AS total_count, review.user_id
FROM business
INNER JOIN review
ON business.business_id = review.business_id
INNER JOIN user
ON review.user_id = user.user_id
GROUP BY review.user_id
ORDER BY total_count DESC
"""

res8_1 = spark.sql(r8_query_1)
res8_2 = spark.sql(r8_query_2)
res8_3 = res8_1.join(res8_2, on=['user_id'], how='inner')

In [112]:
# The total_count here is based the aggregation from the count of user_ids based on the review table:
q8_ans_1 = res8_3.drop(*['user_id', 'state'])
q8_ans_1 = q8_ans_1.withColumn('percent', col('az_count') / col('total_count') *100)
q8_ans_1.show()

+--------+--------+-----------+-----------------+
|    name|az_count|total_count|          percent|
+--------+--------+-----------+-----------------+
|    Brad|    1637|       1764|92.80045351473923|
|   Karen|    1559|       1727|90.27214823393167|
|Jennifer|    1250|       1360|91.91176470588235|
|Jennifer|    1059|       1255|84.38247011952191|
|    Gabi|    1151|       1198|96.07679465776295|
+--------+--------+-----------+-----------------+



In [None]:
q8_ans_1.show()

In [113]:
# Another solutuin I would use is the review_count as the source:
r8_query_3 = """
SELECT user.name AS name, COUNT(review.user_id) AS az_count, user.review_count AS total_count, review.user_id, business.state
FROM business
INNER JOIN review
ON business.business_id = review.business_id
INNER JOIN user
ON review.user_id = user.user_id
GROUP BY review.user_id, user.name, business.state, total_count
HAVING business.state = 'AZ'
ORDER BY az_count DESC
LIMIT 5
"""

res8_4 = spark.sql(r8_query_3)
q8_ans_2 = res8_4.drop(*['user_id', 'state'])
q8_ans_2 = q8_ans_2.withColumn('percent', col('az_count') / col('total_count') *100)
q8_ans_2.show()

+--------+--------+-----------+------------------+
|    name|az_count|total_count|           percent|
+--------+--------+-----------+------------------+
|    Brad|    1637|       1642| 99.69549330085262|
|   Karen|    1559|       2340| 66.62393162393163|
|Jennifer|    1250|       1929| 64.80041472265422|
|    Gabi|    1151|       1932| 59.57556935817805|
|Jennifer|    1059|       4190|25.274463007159902|
+--------+--------+-----------+------------------+

