# SIADS 516: Homework 4

- **Dr. Chris Teplovs**, School of Information, University of Michigan
- **Kris Steinhoff**, School of Information, University of Michigan


In [1]:
# The AutograderHelper class provides methods used by the autograder.
from autograder_helper import AutograderHelper

In [2]:
# Autograder cell. This cell is worth 0 points.
# This cell has hidden code used to configure the autograder.

This homework assignment uses the Yelp Academic dataset, with which you should now be familiar.
We have created a few cells to get you started, but you're largely on your own to devise solutions to the
"real-world" questions below.

In this assignment, provide solutions that use spark.sql() calls to query the dataset. For example, to find the answer to "How many users have more than 100 "cool" votes?", this:
```
query = """
SELECT count(*) FROM user WHERE cool > 100
"""
spark.sql(query).show()
```
is similar to:
```
user.filter('cool > 100').show()
```
But in this assignment, use the first approach. The autograder will check for the use of `spark.sql()`

Our usual Spark mantra:

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

22/07/14 06:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Load the JSON files:

In [4]:
business = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_business.json.gz')
checkin = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_checkin.json.gz')
review = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_review.json.gz')
tip = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_tip.json.gz')
user = spark.read.json('../../assets/data/yelp_academic/yelp_academic_dataset_user.json.gz')

22/07/14 06:11:19 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

Create temp views for the DataFrames:

In [5]:
business.createOrReplaceTempView("business")
checkin.createOrReplaceTempView("checkin")
tip.createOrReplaceTempView("tip")
review.createOrReplaceTempView("review")
user.createOrReplaceTempView("user")

---

## -- EXAMPLE PROBLEM --

Get a list of users named "Kahlil" with the number of their reviews tagged "funny".

- The result should have these columns:
  - `user_id`
  - `name`
  - `funny`
- The result rows do NOT need to be ordered

In [6]:
# Solve the problem by assigning populating the provided variable 
# with the result of the Spark SQL query

def users_kahlil():
    return spark.sql("""\
        SELECT user_id, name, funny
        FROM user
        WHERE name = "Kahlil"
        """)

In [7]:
# It can be helpful to look at the result with .show()

results = users_kahlil()
results.show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+------+-----+
|             user_id|  name|funny|
+--------------------+------+-----+
|HE5fZW8m7MpdLHa3H...|Kahlil|   32|
|BAX7MdujQiv_Camqi...|Kahlil|    0|
|fepcVUPERVRA16b4M...|Kahlil|    0|
|uvG9MAZF6vIVBoj24...|Kahlil|    4|
|sEQtegzBDjARGB_YM...|Kahlil|    0|
|JpOCv0TtT2nz0gv0S...|Kahlil|    0|
+--------------------+------+-----+



                                                                                

In [8]:
# This notebook provides several asserts for each problem. 
#
# There are also hidden tests that are run by the autograder after submission.

assert type(users_kahlil()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

AutograderHelper.assert_function_calls(users_kahlil, ["spark.sql"])

users_kahlil_ids = [r["user_id"] for r in users_kahlil().collect()]


                                                                                

In [9]:
assert len(users_kahlil_ids) == 6, \
    "The result must have 6 rows."

expected_user_id = "HE5fZW8m7MpdLHa3HGp1FA"
assert expected_user_id in users_kahlil_ids, f'The user_id column should include "{expected_user_id}"'

---

## -- USERS WITH 500 FANS --

Determine how many users have more than 500 fans.

- The result should have 1 column and 1 row
- The name of the column does not matter

In [10]:
def count_users_500_fans():
    return spark.sql("""SELECT count(*) FROM (SELECT user_id, sum(fans) FROM user group by user_id having sum(fans) > 500)""")
    # YOUR CODE HERE
    #raise NotImplementedError()

In [11]:
assert type(count_users_500_fans()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

AutograderHelper.assert_function_calls(count_users_500_fans, ["spark.sql"])

count_users_500_fans_submitted = count_users_500_fans().collect()[0][0]

                                                                                

In [12]:
assert count_users_500_fans_submitted != 8286, \
    "That is the number of users who have more than 500 funny ratings."

In [13]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

## -- BUSINESS REVIEWS --

Determine how many businesses have at least 4 stars and at least 100 reviews.

- The result should have 1 column and 1 row
- The name of the column does not matter

In [14]:
def business_reviews_count():
    
    return spark.sql("""SELECT count(business_id) FROM business WHERE stars >=4 and review_count >= 100""")

    # YOUR CODE HERE
    #raise NotImplementedError()

In [15]:
assert type(business_reviews_count()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

AutograderHelper.assert_function_calls(business_reviews_count, ["spark.sql"])

business_reviews_count_submitted = business_reviews_count().collect()[0][0]


                                                                                

In [16]:
assert business_reviews_count_submitted != 2814, \
    (
        "2814 is the number of businesses with greater than 4 stars (you should include ones with 4 stars) "
        "and greater than 100 reviews (you should include ones with 100 reviews)."
    )

assert business_reviews_count_submitted != 7397, \
    (
        "7397 is the number of businesses with at least 4 stars and greater than 100 reviews (you should "
        "include ones with 100 reviews)."
    )
assert business_reviews_count_submitted != 2842, \
    (
        "2842 is the number of businesses with greater than 4 stars (you should include ones with 4 stars) "
        "and at least 100 reviews."
    )

In [17]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

## -- LITCHFIELD OHIO --

Get a list of businesses from Litchfield, OH. 

- The result should have these columns:
  - `business_id`
  - `name`
- The result rows do NOT need to be ordered

In [18]:
def litchfield_oh_businesses():
    # YOUR CODE HERE
    return spark.sql("""SELECT business_id, name FROM business WHERE state == 'OH' and city == 'Litchfield'""")
    #raise NotImplementedError()

In [19]:
AutograderHelper.assert_function_calls(litchfield_oh_businesses, ["spark.sql"])

assert type(litchfield_oh_businesses()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

litchfield_oh_business_names = [r["name"] for r in litchfield_oh_businesses().collect()]

                                                                                

In [20]:
assert "Tonios Pizza" in litchfield_oh_business_names, "'Tonios Pizza' should appear in the result."
assert "Hayseed" not in litchfield_oh_business_names, "'Hayseed' should not appear in the result."

In [21]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

## -- US STATES --

Determine which US states are represented in the data set. (The file `../../assets/data/states.csv` contains a list of US state names and abbreviations.)

- The result should have this columns:
  - `state` (the full name of the state in the dataset)
- The result rows do NOT need to be ordered

In [22]:
def states_names_in_data():

    state = spark.read.option("header", True).csv('../../assets/data/states.csv')
    state.createOrReplaceTempView("state")
    query = """
    SELECT DISTINCT(state.state) 
    FROM state
    RIGHT JOIN business
    ON state.abbreviation = business.state
    WHERE state.state IS NOT NULL
    """
    result = spark.sql(query)

    return result

In [23]:
AutograderHelper.assert_function_calls(states_names_in_data, ["spark.sql"])

assert type(states_names_in_data()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

state_names_list = [r["state"] for r in states_names_in_data().collect()]

                                                                                

In [24]:
assert "North Carolina" in state_names_list, "North Carolina should appear in the result."
assert "Michigan" not in state_names_list, "Michigan should appear in the result."

In [25]:
# Autograder cell. This cell is worth 3 points (out of 20). This cell contains hidden tests.

## -- FUNNIEST REVIEW --

Determine the text of the funniest review.

- The result should have 1 column and 1 row
- The name of the column does not matter

In [26]:
def funniest_review():
    # YOUR CODE HERE
    return spark.sql("""SELECT text FROM review where funny = (SELECT funny FROM review order by funny DESC LIMIT 1)""")
    #raise NotImplementedError()

In [27]:
AutograderHelper.assert_function_calls(funniest_review, ["spark.sql"])

assert type(funniest_review()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

funniest_review_first_row = funniest_review().take(1)[0]

                                                                                

In [28]:
funniest_review_len = len(funniest_review_first_row[0])
assert funniest_review_len == 421, \
    f"Hint: the funniest review has 421 characters (found {funniest_review_len})"

In [29]:
# Autograder cell. This cell is worth 2 points (out of 20). This cell contains hidden tests.

## -- REVIEW WORD COUNT -- 

Find the 10 reviews with the largest word counts.

- Use a UDF to determine review word counts using the Python `.split()` method on the string object.
- The result should have this columns:
  - `word_count`

In [30]:
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [31]:
# def text_split(x):
#     return len(x.split())

# from pyspark.sql.functions import udf
# from pyspark.sql.types import IntegerType
# word_count = udf(lambda z: text_split(z), IntegerType())

# spark.udf.register('word_count_len',word_count)

# query = """
# SELECT word_count_len(text) AS word_count 
#    FROM review 
#    ORDER BY word_count DESC
#    LIMIT 10
# """
# result = spark.sql(query)

In [32]:
def reviews_top_10_word_counts():
    # YOUR CODE HERE
    
    def text_split(x):
        return len(x.split())

    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    word_count = udf(lambda z: text_split(z), IntegerType())

    spark.udf.register('word_count_len',word_count)

    query = """
    SELECT word_count_len(text) AS word_count 
       FROM review 
       ORDER BY word_count DESC
       LIMIT 10
    """
    result = spark.sql(query)
    
    return result
    #raise NotImplementedError()

In [33]:
AutograderHelper.assert_function_calls(reviews_top_10_word_counts, ["spark.sql"])

assert type(reviews_top_10_word_counts()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

reviews_top_10_word_counts_first_row = reviews_top_10_word_counts().take(1)[0]

                                                                                

In [34]:
assert reviews_top_10_word_counts_first_row["word_count"] == 1056, "The first word_count should be 1056"

In [35]:
# Autograder cell. This cell is worth 3 points (out of 20). This cell contains hidden tests.

## -- MOST TIPS --

Determine the names of the top 100 users who provided the most tips.

- The result should have these columns:
  - `name`
  - `tip_count`
- The result should be sorted by highest-to-lowest tip_count, in the case of tip_count ties, the results should be sorted by name alphabetically. For example (this is fake data):
  ```
  +--------+---------+
  |    name|tip_count|
  +--------+---------+
  | Weifong|      167|
  |   Alice|       42|
  |     Bob|       42|
  |   Jamal|        3|
  +--------+---------+
  ```

In [36]:
#business.printSchema() # tip, user, business, checkin, review

In [37]:
def users_top_100_tip_count():
    # YOUR CODE HERE
#     query = """
#     SELECT user.name as name, sum(tip.compliment_count) as tip_count
#     FROM tip
#     LEFT JOIN user on
#     tip.user_id = user.user_id
#     GROUP BY name
#     ORDER BY tip_count DESC, name ASC
#     LIMIT 100

#     """

    query = """
    SELECT name, tip_count
    FROM
    (
    SELECT user.name as name,user.user_id as userid, count(user.user_id) as tip_count
    FROM tip
    LEFT JOIN user on
    tip.user_id = user.user_id
    GROUP BY user.user_id,user.name
    ORDER BY tip_count DESC, name ASC
    LIMIT 100
    )

    """
    result = spark.sql(query)
    return result
    #raise NotImplementedError()

In [38]:
AutograderHelper.assert_function_calls(users_top_100_tip_count, ["spark.sql"])

assert type(users_top_100_tip_count()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

users_top_100_tip_count_first_row = users_top_100_tip_count().take(1)[0]

                                                                                

In [39]:
assert users_top_100_tip_count_first_row["name"] == "Momo", "The first name should be Momo"
assert users_top_100_tip_count_first_row["tip_count"] == 2439, "The first tip_count should be 2439"

In [40]:
# Autograder cell. This cell is worth 3 points (out of 20). This cell contains hidden tests.

## -- ARIZONA SUMMARY -- 

List the names, number of reviews of businesses in Arizona ('AZ') and total number of reviews of the top 10 users (as determined by who has created the most number of reviews of businesses in Arizona). Include a column that shows the percentage of reviews that are of businesses from Arizona. 

- The result should have these columns:
  - `name`
  - `az_count`
  - `total_count`
  - `percent` (this will only be checked to within 0.01)
- The result should be sorted by highest-to-lowest `az_count`, in the case of `az_count ties`, the results should be sorted by highest-to-lowest `percent`



The first row of the results should be:
```
+--------+--------+-----------+---------+
|    name|az_count|total_count|  percent|
+--------+--------+-----------+---------+
|    Brad|    1637|       1642|99.695496|
+--------+--------+-----------+---------+
```

In [41]:
def arizona_summary():
    query = """
    SELECT name, az_count,total_count,percent
    FROM
    (
    SELECT t1.user_id as userid, user.name as name, sum(t1.total_count) as az_count,sum(user.review_count) as total_count,
    round(100*(sum(t1.total_count)/sum(user.review_count)),6) as percent
    FROM user
    LEFT JOIN
    (SELECT review.user_id as user_id, count(review.review_id) as total_count
    FROM review
    LEFT JOIN business
    ON review.business_id = business.business_id
    WHERE business.state = 'AZ'
    GROUP BY user_id
    ORDER BY total_count DESC
    LIMIT 10) as t1
    on user.user_id = t1.user_id
    GROUP BY t1.user_id,user.name
    ORDER BY az_count DESC
    LIMIT 10
    )
    """

    result = spark.sql(query)
    return result

In [42]:
AutograderHelper.assert_function_calls(arizona_summary, ["spark.sql"])

assert type(arizona_summary()) == pyspark.sql.dataframe.DataFrame, \
    "The return value should be a Spark DataFrame."

arizona_summary_first_row = arizona_summary().take(1)[0]

                                                                                

In [43]:
assert arizona_summary_first_row["name"] == "Brad", "The first name should be Brad"
assert arizona_summary_first_row["az_count"] == 1637, "The first az_count should be 1637"
assert arizona_summary_first_row["total_count"] == 1642, "The first total_count should be 1642"

assert round(arizona_summary_first_row["percent"], 2) == 99.70, \
    (
        f"The first percent should be about 99.70 (checking to "
        f"nearest 0.01, found {arizona_summary_first_row['percent']})"
    )

In [44]:
# Autograder cell. This cell is worth 3 points (out of 20). This cell contains hidden tests.