# Spark SQL Examples

Run the code cells below. This is the same code from the previous screencast.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [5]:
path = "data/sparkify_log_small.json"
sdf = spark.read.json(path)

In [6]:
sdf.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046


In [7]:
sdf.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Create a View And Run Queries

The code below creates a temporary view against which you can run SQL queries.

In [8]:
sdf.createOrReplaceTempView("logs")

In [9]:
spark.sql("SELECT * FROM logs LIMIT 2").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046
1,Lily Allen,Logged In,Elizabeth,F,7,Chase,195.23873,free,"Shreveport-Bossier City, LA",PUT,NextSong,1512718541284,5027,Cheryl Tweedy,200,1513720878284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1000


In [11]:
spark.sql("""
    SELECT * 
    FROM logs 
    LIMIT 2
""").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046
1,Lily Allen,Logged In,Elizabeth,F,7,Chase,195.23873,free,"Shreveport-Bossier City, LA",PUT,NextSong,1512718541284,5027,Cheryl Tweedy,200,1513720878284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1000


In [12]:
spark.sql("""
    SELECT count(*) 
    FROM logs 
    LIMIT 2
""").toPandas()

Unnamed: 0,count(1)
0,10000


In [13]:
spark.sql("""
    SELECT userID, firstname, page, song
    FROM logs 
    WHERE userID == '1046'
""").toPandas()

Unnamed: 0,userID,firstname,page,song
0,1046,Kenneth,NextSong,Christmas Tears Will Fall
1,1046,Kenneth,NextSong,Be Wary Of A Woman
2,1046,Kenneth,NextSong,Public Enemy No.1
3,1046,Kenneth,NextSong,Reign Of The Tyrants
4,1046,Kenneth,NextSong,Father And Son
5,1046,Kenneth,NextSong,No. 5
6,1046,Kenneth,NextSong,Seventeen
7,1046,Kenneth,Home,
8,1046,Kenneth,NextSong,War on war
9,1046,Kenneth,NextSong,Killermont Street


In [14]:
spark.sql("""
    SELECT DISTINCT page
    FROM logs 
    ORDER BY page ASC
""").toPandas()

Unnamed: 0,page
0,About
1,Downgrade
2,Error
3,Help
4,Home
5,Login
6,Logout
7,NextSong
8,Save Settings
9,Settings


# User Defined Functions

In [16]:
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

<function __main__.<lambda>(x)>

In [17]:
spark.sql("""
    SELECT *, get_hour(ts) AS hour
    FROM logs 
    LIMIT 1
""").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,hour
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046,22


In [18]:
songs_in_hour = spark.sql("""
    SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour
    FROM logs
    WHERE page = "NextSong"
    GROUP BY hour
    ORDER BY cast(hour as int) ASC
""")

In [19]:
songs_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           456|
|   1|           454|
|   2|           382|
|   3|           302|
|   4|           352|
|   5|           276|
|   6|           348|
|   7|           358|
|   8|           375|
|   9|           249|
|  10|           216|
|  11|           228|
|  12|           251|
|  13|           339|
|  14|           462|
|  15|           479|
|  16|           484|
|  17|           430|
|  18|           362|
|  19|           295|
+----+--------------+
only showing top 20 rows



# Converting Results to Pandas

In [20]:
songs_in_hour_pd = songs_in_hour.toPandas()

In [21]:
songs_in_hour_pd

Unnamed: 0,hour,plays_per_hour
0,0,456
1,1,454
2,2,382
3,3,302
4,4,352
5,5,276
6,6,348
7,7,358
8,8,375
9,9,249
