In [1]:
### Import all the necessary packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
from pyspark.sql.functions import col, max as max_
from pyspark.sql.functions import *
from time import time
import os
import configparser
import pandas as pd
import numpy as np
# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
### Read config file
config = configparser.ConfigParser()
config.read('dl.cfg')
# Retrieve value using config['KEYWORD']['SUBKEYWORD']
os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
#create a spark session 
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
### The package `org.apache.hadoop:hadoop-aws:2.7.0` allows you to connect aws S3.

In [4]:
#input_path = 'output'
# location of all the tables
input_path ='s3a://apache-spark/output'
tables = ['songs', 'artists', 'users', 'time', 'songplays']

In [5]:
#read all tables from S3 file storage 
song_df = spark.read.parquet("s3a://apache-spark/output/songs")
song_df.createOrReplaceTempView('songs')

users_df = spark.read.parquet("s3a://apache-spark/output/users")
users_df.createOrReplaceTempView('users')

artists_df = spark.read.parquet("s3a://apache-spark/output/artists")
artists_df.createOrReplaceTempView('artists')

songplays_df = spark.read.parquet("s3a://apache-spark/output/songplays")
songplays_df.createOrReplaceTempView('songplays')

time_df = spark.read.parquet("s3a://apache-spark/output/time")
time_df.createOrReplaceTempView('time_table')

In [6]:
#Set higher spark.sql.broadcastTimeout to increase timeout of Spark Session 
#This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.
#go to this link for futher reference: https://stackoverflow.com/questions/41123846/why-does-join-fail-with-java-util-concurrent-timeoutexception-futures-timed-ou
spark = SparkSession.builder\
                     .config("spark.sql.broadcastTimeout","36000")\
                     .getOrCreate()

In [29]:
#query one
query_df = spark.sql("""
SELECT t.hour as Hour, COUNT(*) as PlayCount
FROM songplays sp
JOIN time_table t ON sp.start_time=t.start_time
GROUP BY t.hour
ORDER BY t.hour
""").show()

+----+---------+
|Hour|PlayCount|
+----+---------+
|   0|      155|
|   1|      154|
|   2|      117|
|   3|      109|
|   4|      136|
|   5|      162|
|   6|      183|
|   7|      179|
|   8|      207|
|   9|      270|
|  10|      312|
|  11|      336|
|  12|      308|
|  13|      324|
|  14|      432|
|  15|      477|
|  16|      542|
|  17|      494|
|  18|      498|
|  19|      367|
+----+---------+
only showing top 20 rows



In [26]:
#query two
query_df = spark.sql("""
SELECT COUNT(*)
FROM users
""").show()

+--------+
|count(1)|
+--------+
|      96|
+--------+



In [7]:
#query three 
query_df = spark.sql("""SELECT songplays.location, songplays.level, songs.title,songs.duration, time_table.hour \
	From songplays Left join songs on songplays.song_id = songs.song_id\
  LEFT JOIN time_table ON songplays.start_time= time_table.start_time 
  Order BY time_table.hour""").show()

+--------------------+-----+-----+--------+----+
|            location|level|title|duration|hour|
+--------------------+-----+-----+--------+----+
|San Jose-Sunnyval...| free| null|    null|   0|
|New York-Newark-J...| paid| null|    null|   0|
|San Francisco-Oak...| paid| null|    null|   0|
|Waterloo-Cedar Fa...| paid| null|    null|   0|
|Lansing-East Lans...| paid| null|    null|   0|
|New York-Newark-J...| paid| null|    null|   0|
|    Marinette, WI-MI| paid| null|    null|   0|
|    Marinette, WI-MI| paid| null|    null|   0|
|        Richmond, VA| free| null|    null|   0|
|Lansing-East Lans...| paid| null|    null|   0|
|Chicago-Napervill...| paid| null|    null|   0|
|Portland-South Po...| paid| null|    null|   0|
|Lansing-East Lans...| paid| null|    null|   0|
|Lake Havasu City-...| paid| null|    null|   0|
|    Marinette, WI-MI| paid| null|    null|   0|
|    Marinette, WI-MI| paid| null|    null|   0|
|    Marinette, WI-MI| paid| null|    null|   0|
|Lansing-East Lans..

In [12]:
#query four 
#the number of songplays from different level group 
result_df = spark.sql("""SELECT level, COUNT(*) Count 
FROM songplays 
GROUP BY level 
ORDER BY Count DESC LIMIT 10""").show()

+-----+-----+
|level|Count|
+-----+-----+
| paid| 5591|
| free| 1229|
+-----+-----+



In [21]:
#query five 
#top ten users in the dataframe 
query = """
SELECT songplays.user_id, users.gender, users.level, songplays.location, COUNT(*) total 
FROM songplays 
JOIN users  
ON songplays.user_id = users.userId
GROUP BY songplays.user_id, users.gender, users.level, songplays.location
ORDER BY total DESC 
LIMIT 10
"""
top_users_df = spark.sql(query).toPandas()

In [22]:
# display the result as pandas dataframe 
top_users_df.style.background_gradient(cmap = 'Greens', subset = ['total'])

Unnamed: 0,user_id,gender,level,location,total
0,49,F,free,"San Francisco-Oakland-Hayward, CA",689
1,80,F,paid,"Portland-South Portland, ME",665
2,97,F,paid,"Lansing-East Lansing, MI",557
3,15,F,paid,"Chicago-Naperville-Elgin, IL-IN-WI",463
4,44,F,paid,"Waterloo-Cedar Falls, IA",397
5,29,F,free,"Atlanta-Sandy Springs-Roswell, GA",346
6,24,F,paid,"Lake Havasu City-Kingman, AZ",321
7,73,M,paid,"Tampa-St. Petersburg-Clearwater, FL",289
8,88,M,free,"Sacramento--Roseville--Arden-Arcade, CA",270
9,36,M,paid,"Janesville-Beloit, WI",248
