# Data Wrangling with Spark SQL

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, Spark SQL is used instead of Spark Data Frames.

In [3]:
# 1) import libraries you might need
from pyspark.sql import SparkSession
# from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
# from pyspark.sql.functions import sum as Fsum
# from pyspark.sql.window import Window
# from pyspark.sql.types import IntegerType

In [4]:
# 2) instantiate a Spark session

spark = SparkSession \
    .builder \
    .appName("Spark SQL Quiz") \
    .getOrCreate()

# 3) read in the data set located at the path "data/sparkify_log_small.json"
user_log = spark.read.json("data/sparkify_log_small.json")

# 4) create a view to use with your SQL queries
user_log.createOrReplaceTempView("log_table")

# Question 1

Which page did user id ""(empty string) NOT visit?

In [5]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
# SELECT distinct pages for the blank user and distinc pages for all users
# Right join the results to find pages that blank visitor did not visit

spark.sql("SELECT * \
            FROM ( \
                SELECT DISTINCT page \
                FROM log_table \
                WHERE userId = '') AS user_pages \
            RIGHT JOIN ( \
                SELECT DISTINCT page \
                FROM log_table) AS all_pages \
            ON user_pages.page = all_pages.page \
            WHERE user_pages.page IS NULL").show()

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

Answer:

Both Spark SQL and Spark Data Frames are part of the Spark SQL library. Hence, they both use the Spark SQL Catalyst Optimizer to optimize queries.

You might prefer SQL over data frames because the syntax is clearer especially for teams already experienced in SQL.

Spark data frames give you more control. You can break down your queries into smaller steps, which can make debugging easier. You can also cache intermediate results or repartition intermediate results.

# Question 3

How many female users do we have in the data set?

In [9]:
# TODO: write your code to answer question 3

spark.sql("SELECT COUNT(DISTINCT userId) \
            FROM log_table \
            WHERE gender = 'F'").show()

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



# Question 4

How many songs were played from the most played artist?

In [13]:
# TODO: write your code to answer question 4

spark.sql("SELECT Artist, count(Artist) As plays \
            FROM log_table \
            GROUP BY Artist \
            ORDER BY plays DESC \
            LIMIT 1").show()


+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [10]:
# SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
is_home = spark.sql("SELECT userID, page, ts, CASE WHEN page = 'Home' THEN 1 ELSE 0 END as is_home FROM log_table \
                WHERE (page = 'NextSong') or (page = 'Home') \
                ")

# keep the results in a new view
is_home.createOrReplaceTempView("is_home_table")

spark.sql("SELECT * FROM is_home_table").show()

+------+--------+-------------+-------+
|userID|    page|           ts|is_home|
+------+--------+-------------+-------+
|  1046|NextSong|1513720872284|      0|
|  1000|NextSong|1513720878284|      0|
|  2219|NextSong|1513720881284|      0|
|  2373|NextSong|1513720905284|      0|
|  1747|    Home|1513720913284|      1|
|  1162|NextSong|1513720955284|      0|
|  1061|NextSong|1513720959284|      0|
|   748|    Home|1513720959284|      1|
|   597|    Home|1513720980284|      1|
|  1806|NextSong|1513720983284|      0|
|   748|NextSong|1513720993284|      0|
|  1176|NextSong|1513721031284|      0|
|  2164|NextSong|1513721045284|      0|
|  2146|NextSong|1513721058284|      0|
|  2219|NextSong|1513721077284|      0|
|  1176|    Home|1513721088284|      1|
|  2904|NextSong|1513721095284|      0|
|   597|NextSong|1513721097284|      0|
|   226|NextSong|1513721104284|      0|
|  1046|NextSong|1513721104284|      0|
+------+--------+-------------+-------+
only showing top 20 rows



In [11]:
# find the cumulative sum over the is_home column
cumulative_sum = spark.sql("SELECT *, SUM(is_home) OVER \
    (PARTITION BY userID ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period \
    FROM is_home_table")

# keep the results in a view
cumulative_sum.createOrReplaceTempView("period_table")

spark.sql("SELECT * FROM period_table").show()

+------+--------+-------------+-------+------+
|userID|    page|           ts|is_home|period|
+------+--------+-------------+-------+------+
|  1436|NextSong|1513783259284|      0|     0|
|  1436|NextSong|1513782858284|      0|     0|
|  2088|    Home|1513805972284|      1|     1|
|  2088|NextSong|1513805859284|      0|     1|
|  2088|NextSong|1513805494284|      0|     1|
|  2088|NextSong|1513805065284|      0|     1|
|  2088|NextSong|1513804786284|      0|     1|
|  2088|NextSong|1513804555284|      0|     1|
|  2088|NextSong|1513804196284|      0|     1|
|  2088|NextSong|1513803967284|      0|     1|
|  2088|NextSong|1513803820284|      0|     1|
|  2088|NextSong|1513803651284|      0|     1|
|  2088|NextSong|1513803413284|      0|     1|
|  2088|NextSong|1513803254284|      0|     1|
|  2088|NextSong|1513803057284|      0|     1|
|  2088|NextSong|1513802824284|      0|     1|
|  2162|NextSong|1513781246284|      0|     0|
|  2162|NextSong|1513781065284|      0|     0|
|  2162|NextS

In [12]:
# find the average count for NextSong

spark.sql("SELECT AVG(count_results) FROM \
         (SELECT COUNT(*) AS count_results FROM period_table \
        GROUP BY userID, period, page HAVING page = 'NextSong') AS counts").show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+

