# Data Wrangling with Spark SQL Quiz

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [1]:
from pyspark.sql import SparkSession

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
spark_session = SparkSession \
    .builder \
    .appName("Data wrangling with spark SQL quiz") \
    .getOrCreate()
# 3) read in the data set located at the path "data/sparkify_log_small.json"
sparkify_log = spark_session.read.json("data/sparkify_log_small.json")
# 4) create a view to use with your SQL queries
sparkify_log.createOrReplaceTempView("spark_log_table")
# 5) write code to answer the quiz questions 

# Question 1

Which page did user id ""(empty string) NOT visit?

In [2]:
# TODO: write your code to answer question 1
spark_session.sql('''
WITH null_user AS (
        SELECT DISTINCT page
        FROM spark_log_table
        WHERE userId = ""
                ),
    no_null_user AS(
        SELECT DISTINCT page
        FROM spark_log_table
        WHERE userId != ""
        )
    SELECT * 
    FROM no_null_user
    LEFT JOIN null_user ON null_user.page = no_null_user.page
    WHERE null_user.page IS NULL
''').show()

+----------------+----+
|            page|page|
+----------------+----+
|Submit Downgrade|null|
|       Downgrade|null|
|          Logout|null|
|   Save Settings|null|
|        Settings|null|
|        NextSong|null|
|         Upgrade|null|
|           Error|null|
|  Submit Upgrade|null|
+----------------+----+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

# Question 3

How many female users do we have in the data set?

In [3]:
# TODO: write your code to answer question 3
spark_session.sql('''
    SELECT count( DISTINCT userId) 
    FROM spark_log_table
    WHERE gender == "F"
''').show()


+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



# Question 4

How many songs were played from the most played artist?

In [4]:
spark_session.sql(''' 
    DROP TABLE IF EXISTS numsongs_table''')

DataFrame[]

In [5]:
# TODO: write your code to answer question 4
spark_session.sql('''
    WITH numsongs_table AS(
        SELECT artist, count(song) as song_listened
        FROM spark_log_table
        WHERE artist IS NOT NULL
        GROUP BY artist
        ),
    max_songs AS(
        SELECT max(song_listened) as max_num_songs
        FROM numsongs_table
        GROUP BY 1 = 1 
    )
    SELECT artist, song_listened
    FROM numsongs_table
    JOIN max_songs ON max_songs.max_num_songs = numsongs_table.song_listened
''').show()

+--------+-------------+
|  artist|song_listened|
+--------+-------------+
|Coldplay|           83|
+--------+-------------+



In [6]:
spark_session.sql('''
    SELECT artist,count(page) as song_listened
    FROM spark_log_table
    GROUP BY artist
    ORDER BY count(song) DESC
    ''').show()

+--------------------+-------------+
|              artist|song_listened|
+--------------------+-------------+
|            Coldplay|           83|
|       Kings Of Leon|           69|
|Florence + The Ma...|           52|
|            BjÃÂ¶rk|           46|
|       Dwight Yoakam|           45|
|       Justin Bieber|           43|
|      The Black Keys|           40|
|         OneRepublic|           37|
|                Muse|           36|
|        Jack Johnson|           36|
|           Radiohead|           31|
|        Taylor Swift|           29|
|Barry Tuckwell/Ac...|           28|
|          Lily Allen|           28|
|               Train|           28|
|           Daft Punk|           27|
|           Metallica|           27|
|          Nickelback|           27|
|          Kanye West|           26|
|Red Hot Chili Pep...|           24|
+--------------------+-------------+
only showing top 20 rows



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [7]:
spark_session.sql('''
    WITH flag_table AS(
        SELECT userId, ts, page, CAST(page == "Home" AS int) AS is_home
        FROM spark_log_table
        WHERE (page == "Home" OR page == "NextSong") AND (userId != "")        
        ORDER BY userId, ts ASC
        )
    SELECT * FROM flag_table
    ''').show()
                


+------+-------------+--------+-------+
|userId|           ts|    page|is_home|
+------+-------------+--------+-------+
|    10|1513790894284|NextSong|      0|
|    10|1513828388284|NextSong|      0|
|   100|1513750214284|NextSong|      0|
|   100|1513750442284|NextSong|      0|
|   100|1513775431284|    Home|      1|
|   100|1513775556284|NextSong|      0|
|   100|1513775710284|NextSong|      0|
|   100|1513776194284|    Home|      1|
|   100|1513776308284|NextSong|      0|
|   100|1513839673284|    Home|      1|
|  1000|1513720878284|NextSong|      0|
|  1003|1513749501284|    Home|      1|
|  1003|1513749516284|NextSong|      0|
|  1003|1513749525284|    Home|      1|
|  1005|1513782278284|NextSong|      0|
|  1006|1513773548284|NextSong|      0|
|  1006|1513773777284|NextSong|      0|
|  1006|1513774019284|NextSong|      0|
|  1017|1513817806284|NextSong|      0|
|  1017|1513818023284|NextSong|      0|
+------+-------------+--------+-------+
only showing top 20 rows



In [17]:
# TODO: write your code to answer question 5
spark_session.sql('''
    WITH flag_table AS(
        SELECT userId, ts, page, CAST(page == "Home" AS int) AS is_home
        FROM spark_log_table
        WHERE (page == "Home" OR page == "NextSong") AND (userId != "")        
        ORDER BY userId, ts ASC
    ),
    flag_period AS(
        SELECT userId, ts, is_home, 
        sum(is_home) OVER (PARTITION BY userId ORDER BY ts ASC ROWS UNBOUNDED PRECEDING) AS period
        FROM flag_table
        ORDER BY userId, ts
    )
    SELECT * FROM flag_period
    
''').show()

+------+-------------+-------+------+
|userId|           ts|is_home|period|
+------+-------------+-------+------+
|    10|1513790894284|      0|     0|
|    10|1513828388284|      0|     0|
|   100|1513750214284|      0|     0|
|   100|1513750442284|      0|     0|
|   100|1513775431284|      1|     1|
|   100|1513775556284|      0|     1|
|   100|1513775710284|      0|     1|
|   100|1513776194284|      1|     2|
|   100|1513776308284|      0|     2|
|   100|1513839673284|      1|     3|
|  1000|1513720878284|      0|     0|
|  1003|1513749501284|      1|     1|
|  1003|1513749516284|      0|     1|
|  1003|1513749525284|      1|     2|
|  1005|1513782278284|      0|     0|
|  1006|1513773548284|      0|     0|
|  1006|1513773777284|      0|     0|
|  1006|1513774019284|      0|     0|
|  1017|1513817806284|      0|     0|
|  1017|1513818023284|      0|     0|
+------+-------------+-------+------+
only showing top 20 rows



In [20]:
# TODO: write your code to answer question 5
spark_session.sql('''
    WITH flag_table AS(
        SELECT userId, ts, page, CAST(page == "Home" AS int) AS is_home
        FROM spark_log_table
        WHERE (page == "Home" OR page == "NextSong") AND (userId != "")        
        ORDER BY userId, ts ASC
    ),
    flag_period AS(
        SELECT userId, ts, is_home, 
        sum(is_home) OVER (PARTITION BY userId ORDER BY ts ASC ROWS UNBOUNDED PRECEDING) AS period
        FROM flag_table
        ORDER BY userId, ts
    )
    SELECT userId, period, count(ts)
    FROM flag_period
    WHERE is_home == 0
    GROUP BY userId, period
    ORDER BY userId, period
    
''').show()

+------+------+---------+
|userId|period|count(ts)|
+------+------+---------+
|    10|     0|        2|
|   100|     0|        2|
|   100|     1|        2|
|   100|     2|        1|
|  1000|     0|        1|
|  1003|     1|        1|
|  1005|     0|        1|
|  1006|     0|        3|
|  1017|     0|        6|
|  1017|     1|        3|
|  1017|     2|        1|
|  1019|     2|        3|
|  1019|     3|        3|
|  1019|     5|       16|
|  1020|     1|        4|
|  1022|     0|        2|
|  1025|     0|        4|
|  1030|     0|        1|
|  1035|     0|       15|
|  1035|     1|        6|
+------+------+---------+
only showing top 20 rows



In [22]:
# TODO: write your code to answer question 5
spark_session.sql('''
    WITH flag_table AS(
        SELECT userId, ts, page, CAST(page == "Home" AS int) AS is_home
        FROM spark_log_table
        WHERE (page == "Home" OR page == "NextSong") AND (userId != "")        
        ORDER BY userId, ts ASC
    ),
    flag_period AS(
        SELECT userId, ts, is_home, 
        sum(is_home) OVER (PARTITION BY userId ORDER BY ts ASC ROWS UNBOUNDED PRECEDING ROW) AS period
        FROM flag_table
        ORDER BY userId, ts
    ),
    nb_songs_per_period AS(
    SELECT userId, period, count(ts) AS nb_songs
    FROM flag_period
    WHERE is_home == 0
    GROUP BY userId, period
    ORDER BY userId, period
    )
    SELECT avg(nb_songs)
    FROM nb_songs_per_period
    GROUP BY 1=1
    
''').show()

+------------------+
|     avg(nb_songs)|
+------------------+
|6.9558333333333335|
+------------------+

