# Data Wrangling with Spark SQL

## Overview
Use Spark SQL API to gain some analytical insights on a small sparkify log dataset  


In [2]:
# Import pyspark sql libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, desc, asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import StringType, IntegerType

import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

In [3]:
# Create a spark session
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [4]:
# Create table
user_log = spark.read.json("./sparkify_log_small.json")

In [5]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



## Create a View And Run Queries

The code below creates a temporary view for various performing SQL queries.

In [6]:
# Create a temp VIEW 
user_log.createOrReplaceTempView("user_log_table")

In [7]:
spark.sql("SELECT * FROM user_log_table LIMIT 2").show()

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|      

In [8]:
spark.sql('''
          SELECT * 
          FROM user_log_table 
          LIMIT 2
          '''
          ).show()

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|      

How many entries in log_table?

In [9]:
spark.sql('''
          SELECT COUNT(*) 
          FROM user_log_table 
          '''
          ).show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



What songs are played by userId= 1046?


In [10]:
spark.sql('''
          SELECT userID, firstname, page, song
          FROM user_log_table 
          WHERE userID == '1046'
          '''
          ).collect()

[Row(userID='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
 Row(userID='1046', firstname='Kenneth', page='Home', song=None),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='War on war'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userID='1046', firstname='Kenneth', page='Logout', song=None),
 Row(userID='1046', firstname='Kenneth'

Which pages are visited ?

In [11]:
spark.sql('''
          SELECT DISTINCT page
          FROM user_log_table 
          ORDER BY page ASC
          '''
          ).show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



Which pages did user "" NOT visit?

In [13]:
# SELECT distinct pages for the blank user and distinc pages for all users
# Right join the results to find pages that blank visitor did not visit

spark.sql("""SELECT * 
            FROM ( 
                SELECT DISTINCT page 
                FROM user_log_table 
                WHERE userId='') AS user_pagesblank 
            RIGHT JOIN ( 
                SELECT DISTINCT page 
                FROM user_log_table) AS all_pages 
            ON user_pagesblank.page = all_pages.page 
            WHERE user_pagesblank.page IS NULL""").show()

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



How many female users do we have in the dataset?

In [15]:
# TODO: write your code to answer question 3

spark.sql("""
            SELECT COUNT(DISTINCT userId) as total_female
            FROM user_log_table
            WHERE gender="F"
            """).show()

+------------+
|total_female|
+------------+
|         462|
+------------+



How many songs were played from the most played artist?

In [17]:
spark.sql("""
            SELECT artist, 
                    count(song)
            FROM user_log_table
            GROUP BY artist
            ORDER BY 2 DESC
            LIMIT 1
            """).show()

+--------+-----------+
|  artist|count(song)|
+--------+-----------+
|Coldplay|         83|
+--------+-----------+



How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [20]:
# SELECT CASE for HOMEpages
ishome = spark.sql('''
                    SELECT userId, page, ts, 
                            CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home
                    FROM user_log_table
                    WHERE (page='NextSong') OR (page='Home')
                    ''')

# create new view
ishome.createOrReplaceTempView('is_home_table')

In [21]:
# Find cum sum over is_home column
cum_sum = spark.sql('''
                    SELECT *,
                            SUM(is_home) OVER
                            (PARTITION BY userID
                            ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period
                    FROM is_home_table
                    ''')

# create new view
cum_sum.createOrReplaceTempView('period_table')

In [22]:
# find the average count for next song

spark.sql('''
            SELECT AVG(count_results)
            FROM (
                 SELECT COUNT(*) AS count_results 
                 FROM period_table
                 GROUP BY userId, period, page
                 HAVING page='NextSong') AS counts 
            ''').show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+



### User Defined Functions

In [11]:
# define a function to get hour
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

<function __main__.<lambda>(x)>

Example using hour function

In [13]:
spark.sql('''
          SELECT *, get_hour(ts) AS hour
          FROM user_log_table 
          LIMIT 1
          '''
          ).collect()

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='14')]

How many songs were played every hour?

In [15]:
songs_in_hour = spark.sql('''
          SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour
          FROM user_log_table
          WHERE page = "NextSong"
          GROUP BY hour
          ORDER BY cast(hour as int) ASC
          '''
          )

In [16]:
songs_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           375|
|   1|           249|
|   2|           216|
|   3|           228|
|   4|           251|
|   5|           339|
|   6|           462|
|   7|           479|
|   8|           484|
|   9|           430|
|  10|           362|
|  11|           295|
|  12|           257|
|  13|           248|
|  14|           369|
|  15|           375|
|  16|           456|
|  17|           454|
|  18|           382|
|  19|           302|
+----+--------------+
only showing top 20 rows



### Using Pandas API

In [20]:
songs_in_hour_pd = songs_in_hour.toPandas()
pd.DataFrame(songs_in_hour_pd).head()

Unnamed: 0,hour,plays_per_hour
0,0,375
1,1,249
2,2,216
3,3,228
4,4,251
