# Spark SQL Quiz

This quiz uses the same dataset and questions from the Spark Data Frames Programming Quiz. For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt


In [4]:
spark = SparkSession \
    .builder \
    .appName("Spark SQL Quiz") \
    .getOrCreate()

In [5]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [6]:
user_log.createOrReplaceTempView("user_log_table")

# Question 1

Which page did user id ""(empty string) NOT visit?

In [17]:
x = spark.sql('''
    SELECT DISTINCT page
    FROM user_log_table
    WHERE userId=""
    '''
    )

In [18]:
y = spark.sql('''
    SELECT DISTINCT page
    FROM user_log_table
    '''
    )

In [19]:
(y.subtract(x)).collect()

[Row(page='Submit Downgrade'),
 Row(page='Downgrade'),
 Row(page='Logout'),
 Row(page='Save Settings'),
 Row(page='Settings'),
 Row(page='NextSong'),
 Row(page='Upgrade'),
 Row(page='Error'),
 Row(page='Submit Upgrade')]

In [33]:
# SELECT distinct pages for the blank user and distinc pages for all users
# Right join the results to find pages that blank visitor did not visit
spark.sql("SELECT * \
            FROM ( \
                SELECT DISTINCT page \
                FROM user_log_table \
                WHERE userId='') AS user_pages \
            RIGHT JOIN ( \
                SELECT DISTINCT page \
                FROM user_log_table) AS all_pages \
            ON user_pages.page = all_pages.page \
            WHERE user_pages.page IS NULL").show()

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?
no need to load whole data in SQL, calculation done in background if we want, more abstract, pandas more descriptive with more step by step process

# Question 3

How many female users do we have in the data set?

In [30]:
spark.sql('''
    SELECT COUNT(DISTINCT userID)
    FROM user_log_table
    WHERE gender="F" AND userId!=""
    GROUP BY gender
    '''
    ).show()

+----------------------+
|count(DISTINCT userID)|
+----------------------+
|                   462|
+----------------------+



# Question 4

How many songs were played from the most played artist?

In [12]:

spark.sql('''
    SELECT artist, COUNT(song)
    FROM user_log_table
    WHERE user_log_table.page="NextSong"
    GROUP BY artist
    ORDER BY COUNT(song) DESC
    LIMIT 1
    '''
    ).show()

+--------+-----------+
|  artist|count(song)|
+--------+-----------+
|Coldplay|         83|
+--------+-----------+



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [68]:
spark.udf.register("Home_visit", lambda x: 1 if x == "Home" else 0)
#user_log_valid = user_log_valid.withColumn("Home_visit", Home_visit("page"))

<function __main__.<lambda>(x)>

In [69]:
spark.sql('''
    SELECT userId, ts, song, page, Home_visit(page) AS Home_visit
    FROM user_log_table
    WHERE  userId!="" AND (page="NextSong" OR page="Home")
    ORDER BY userId, ts
    LIMIT 30
    '''
    ).show(30)

+------+-------------+--------------------+--------+----------+
|userId|           ts|                song|    page|Home_visit|
+------+-------------+--------------------+--------+----------+
|    10|1513790894284|             Secrets|NextSong|         0|
|    10|1513828388284|             Overdue|NextSong|         0|
|   100|1513750214284|                1972|NextSong|         0|
|   100|1513750442284|             Secrets|NextSong|         0|
|   100|1513775431284|                null|    Home|         1|
|   100|1513775556284|Don't It Make My ...|NextSong|         0|
|   100|1513775710284|Clouds (Of Color ...|NextSong|         0|
|   100|1513776194284|                null|    Home|         1|
|   100|1513776308284|                0010|NextSong|         0|
|   100|1513839673284|                null|    Home|         1|
|  1000|1513720878284|       Cheryl Tweedy|NextSong|         0|
|  1003|1513749501284|                null|    Home|         1|
|  1003|1513749516284|         The Hipst

In [70]:
is_home_table = spark.sql('''
    SELECT userId, ts, song, page, Home_visit(page) AS Home_visit
    FROM user_log_table
    WHERE  userId!="" AND (page="NextSong" OR page="Home")
    ORDER BY userId, ts
    '''
    )

In [71]:
is_home_table.createOrReplaceTempView("is_home_table")

In [72]:
cum_sum = spark.sql("SELECT *, SUM(Home_visit) OVER \
    (PARTITION BY userID ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period \
    FROM is_home_table")


In [73]:
cum_sum.createOrReplaceTempView("period_table")

In [74]:
spark.sql('''
    SELECT *
    FROM period_table
    '''
    ).show()

+------+-------------+--------------------+--------+----------+------+
|userId|           ts|                song|    page|Home_visit|period|
+------+-------------+--------------------+--------+----------+------+
|  1436|1513783259284| Throw It In The Bag|NextSong|         0|   0.0|
|  1436|1513782858284|Atom Bomb (Atomix 2)|NextSong|         0|   0.0|
|  2088|1513805972284|                null|    Home|         1|   1.0|
|  2088|1513805859284|          Back To Me|NextSong|         0|   1.0|
|  2088|1513805494284|Keep On Hoping [F...|NextSong|         0|   1.0|
|  2088|1513805065284|              Shanti|NextSong|         0|   1.0|
|  2088|1513804786284|   Rest Of Your Life|NextSong|         0|   1.0|
|  2088|1513804555284|Inside The Fire (...|NextSong|         0|   1.0|
|  2088|1513804196284|             Siechen|NextSong|         0|   1.0|
|  2088|1513803967284|            Spectrum|NextSong|         0|   1.0|
|  2088|1513803820284|Wait And Bleed (A...|NextSong|         0|   1.0|
|  208

In [75]:
# cusum.filter((cusum.page == 'NextSong')) \
#     .groupBy('userID', 'period') \
#     .agg({'period':'count'}) \
#     .agg({'count(period)':'avg'}).show()

spark.sql('''
    SELECT COUNT(*) AS count_results
    FROM period_table
    WHERE page="NextSong"
    GROUP BY userId, period
    '''
    ).show()

+-------------+
|count_results|
+-------------+
|            2|
|           13|
|           19|
|           15|
|            4|
|           17|
|            1|
|            3|
|            3|
|           16|
|            4|
|           11|
|            9|
|           17|
|            3|
|            4|
|            1|
|            1|
|            1|
|            2|
+-------------+
only showing top 20 rows



In [76]:
spark.sql('''
    SELECT AVG(count_results) FROM
    (SELECT COUNT(*) AS count_results
    FROM period_table
    GROUP BY userId, period, page HAVING page = 'NextSong') AS counts
    '''
    ).show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+

