# Data Wrangling with Spark SQL Quiz

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) create a view to use with your SQL queries
# 5) write code to answer the quiz questions 

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [3]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [4]:
user_log.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

In [5]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [6]:
user_log.createOrReplaceTempView("user_log_table")

# Question 1

Which page did user id ""(empty string) NOT visit?

In [7]:
# TODO: write your code to answer question 1
spark.sql("SELECT distinct page from user_log_table where page not in (SELECT page from user_log_table where userId == '')").show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|       Downgrade|
|          Logout|
|   Save Settings|
|        Settings|
|        NextSong|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

# Question 3

How many female users do we have in the data set?

In [8]:
# TODO: write your code to answer question 3
spark.sql("SELECT count(distinct userId) from user_log_table where gender = 'F'").show()

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



# Question 4

How many songs were played from the most played artist?

In [9]:
# TODO: write your code to answer question 4
spark.sql("SELECT distinct artist, count(artist) as n from user_log_table group by 1 order by 2 desc limit 1").show()


+--------+---+
|  artist|  n|
+--------+---+
|Coldplay| 83|
+--------+---+



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [10]:
# TODO: write your code to answer question 5
is_home = spark.sql("""
          SELECT distinct userId, page, ts,
          case when page == 'Home' then 1 else 0 end as homevisit
          from user_log_table
          where page in ('Home', 'NextSong')
          """)  # do not add show here!


In [11]:
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
# The frame, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, means that the window consists of the first row of the partition and all the rows up to the current row. 
cumsum = spark.sql("""
                   SELECT *, 
                   sum(homevisit) over ( \
                   partition by userId order by ts desc ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as period \
                   from is_home_table
                   """)
cumsum.show()

+------+--------+-------------+---------+------+
|userId|    page|           ts|homevisit|period|
+------+--------+-------------+---------+------+
|  1436|NextSong|1513783259284|        0|     0|
|  1436|NextSong|1513782858284|        0|     0|
|  2088|    Home|1513805972284|        1|     1|
|  2088|NextSong|1513805859284|        0|     1|
|  2088|NextSong|1513805494284|        0|     1|
|  2088|NextSong|1513805065284|        0|     1|
|  2088|NextSong|1513804786284|        0|     1|
|  2088|NextSong|1513804555284|        0|     1|
|  2088|NextSong|1513804196284|        0|     1|
|  2088|NextSong|1513803967284|        0|     1|
|  2088|NextSong|1513803820284|        0|     1|
|  2088|NextSong|1513803651284|        0|     1|
|  2088|NextSong|1513803413284|        0|     1|
|  2088|NextSong|1513803254284|        0|     1|
|  2088|NextSong|1513803057284|        0|     1|
|  2088|NextSong|1513802824284|        0|     1|
|  2162|NextSong|1513781246284|        0|     0|
|  2162|NextSong|151

In [12]:
# find the total count for NextSong between periods
cumsum.createOrReplaceTempView("period_table")

total_songs = spark.sql("SELECT userId, period, page, COUNT(*) AS count_results FROM period_table \
GROUP BY userId, period, page HAVING page = 'NextSong' order by 1,2")

total_songs.show()

+------+------+--------+-------------+
|userId|period|    page|count_results|
+------+------+--------+-------------+
|    10|     0|NextSong|            2|
|   100|     1|NextSong|            1|
|   100|     2|NextSong|            2|
|   100|     3|NextSong|            2|
|  1000|     0|NextSong|            1|
|  1003|     1|NextSong|            1|
|  1005|     0|NextSong|            1|
|  1006|     0|NextSong|            3|
|  1017|     0|NextSong|            1|
|  1017|     1|NextSong|            3|
|  1017|     2|NextSong|            6|
|  1019|     0|NextSong|           16|
|  1019|     2|NextSong|            3|
|  1019|     3|NextSong|            3|
|  1020|     0|NextSong|            4|
|  1022|     0|NextSong|            2|
|  1025|     1|NextSong|            4|
|  1030|     0|NextSong|            1|
|  1035|     0|NextSong|           31|
|  1035|     1|NextSong|           40|
+------+------+--------+-------------+
only showing top 20 rows



In [13]:
# average number of songs 
total_songs.createOrReplaceTempView("total_songs_table")

spark.sql("SELECT avg(count_results) FROM total_songs_table").show()

+------------------+
|avg(count_results)|
+------------------+
| 6.938487115544472|
+------------------+

