In [27]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

## Read config file for AWS credentials

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()
spark

# Process song data (sample)
### **EXPLORATION ONLY - DOES NOT WRITE PARQUET FILES** 

In [6]:
# Sample data
input_data = "data/data"

## Song Data

In [7]:
# get filepath to song data file
song_data = "{}/song_data/*/*/*/*.json".format(input_data)

# read song data file
print ('Reading data from S3...')
df_song = spark.read.json(song_data)

# Sample record of raw song data
df_song.show(1, vertical=True)

Reading data from S3...
-RECORD 0--------------------------------
 artist_id        | ARDR4AC1187FB371A1   
 artist_latitude  | null                 
 artist_location  |                      
 artist_longitude | null                 
 artist_name      | Montserrat Caball... 
 duration         | 511.16363            
 num_songs        | 1                    
 song_id          | SOBAYLL12A8C138AF9   
 title            | Sono andati? Fing... 
 year             | 0                    
only showing top 1 row



In [8]:
# extract columns to create songs table
songs_table = df_song.select("song_id","title","artist_id","year","duration").dropDuplicates()
# sample record of song table
songs_table.show(1, vertical=True)

-RECORD 0-----------------------
 song_id   | SOGOSOV12AF72A285E 
 title     | ¿Dónde va Chichi?  
 artist_id | ARGUVEV1187B98BA17 
 year      | 1997               
 duration  | 313.12934          
only showing top 1 row



## Artist data

In [9]:
# extract columns to create artists table
artists_table = df_song.selectExpr("artist_id", 
                          "artist_name",
                          "artist_location as location", 
                          "artist_latitude as lat", 
                          "artist_longitude as lon").dropDuplicates()
# sample record from artist table 
artists_table.show(1, vertical=True)

-RECORD 0-------------------------
 artist_id   | ARGUVEV1187B98BA17 
 artist_name | Sierra Maestra     
 location    |                    
 lat         | null               
 lon         | null               
only showing top 1 row



## Log data

In [10]:
log_data ="{}/log_data/*.json".format(input_data)

# read log data file
df_log = spark.read.json(log_data)

# sample record from raw log file
df_log.show(1, vertical=True)

-RECORD 0-----------------------------
 artist        | Harmonia             
 auth          | Logged In            
 firstName     | Ryan                 
 gender        | M                    
 itemInSession | 0                    
 lastName      | Smith                
 length        | 655.77751            
 level         | free                 
 location      | San Jose-Sunnyval... 
 method        | PUT                  
 page          | NextSong             
 registration  | 1.541016707796E12    
 sessionId     | 583                  
 song          | Sehr kosmisch        
 status        | 200                  
 ts            | 1542241826796        
 userAgent     | "Mozilla/5.0 (X11... 
 userId        | 26                   
only showing top 1 row



## Users data

In [11]:
# extract columns for users table    
users_table = df_log.filter(df_log.page == 'NextSong').selectExpr("userid",
                                                                  "firstname",
                                                                  "lastname",
                                                                  "gender",
                                                                  "level").dropDuplicates()
# sample record from users table
users_table.show(1,vertical=True)

-RECORD 0----------
 userid    | 26    
 firstname | Ryan  
 lastname  | Smith 
 gender    | M     
 level     | free  
only showing top 1 row



## Time data

In [12]:
# Set up UDFs for timeparsing 
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000).strftime('%Y-%m-%d %H:%M:%S'))
df_log = df_log.withColumn("time_stamp", get_datetime(df_log.ts))
#hour
get_hour = udf(lambda x: datetime.fromtimestamp(x / 1000.0).hour)
df_log = df_log.withColumn("hour", get_hour(df_log.ts))
#day
get_day = udf(lambda x: datetime.fromtimestamp(x / 1000.0).day)
df_log = df_log.withColumn("day", get_day(df_log.ts))
#month
get_month = udf(lambda x: datetime.fromtimestamp(x / 1000.0).month)
df_log = df_log.withColumn("month", get_month(df_log.ts))
#year
get_year = udf(lambda x: datetime.fromtimestamp(x / 1000.0).year)
df_log = df_log.withColumn("year", get_year(df_log.ts))
# weekday
get_dow = udf(lambda x: datetime.fromtimestamp(x / 1000.0).weekday())
df_log = df_log.withColumn("dow", get_dow(df_log.ts))

In [13]:
# sample record from log data with time parsing 
df_log.show(1, vertical=True)

-RECORD 0-----------------------------
 artist        | Harmonia             
 auth          | Logged In            
 firstName     | Ryan                 
 gender        | M                    
 itemInSession | 0                    
 lastName      | Smith                
 length        | 655.77751            
 level         | free                 
 location      | San Jose-Sunnyval... 
 method        | PUT                  
 page          | NextSong             
 registration  | 1.541016707796E12    
 sessionId     | 583                  
 song          | Sehr kosmisch        
 status        | 200                  
 ts            | 1542241826796        
 userAgent     | "Mozilla/5.0 (X11... 
 userId        | 26                   
 time_stamp    | 2018-11-15 00:30:26  
 hour          | 0                    
 day           | 15                   
 month         | 11                   
 year          | 2018                 
 dow           | 3                    
only showing top 1 row



In [14]:
# extract columns to create time table
time_table = df_log.selectExpr("ts", "time_stamp", "hour","day","month","year", "dow").dropDuplicates()
# sample record from time table
time_table.show(1, vertical=True)

-RECORD 0-------------------------
 ts         | 1542281082796       
 time_stamp | 2018-11-15 11:24:42 
 hour       | 11                  
 day        | 15                  
 month      | 11                  
 year       | 2018                
 dow        | 3                   
only showing top 1 row



## Songplays data

In [18]:
# join song and log data dataframes 
songplays = df_log.join(df_song, df_log.artist==df_song.artist_name)
# sample record from songplays data 
songplays.show(1, vertical=True)

-RECORD 0--------------------------------
 artist           | Lionel Richie        
 auth             | Logged In            
 firstName        | Aleena               
 gender           | F                    
 itemInSession    | 40                   
 lastName         | Kirby                
 length           | 265.89995            
 level            | paid                 
 location         | Waterloo-Cedar Fa... 
 method           | PUT                  
 page             | NextSong             
 registration     | 1.541022995796E12    
 sessionId        | 619                  
 song             | Lady                 
 status           | 200                  
 ts               | 1542313967796        
 userAgent        | Mozilla/5.0 (Maci... 
 userId           | 44                   
 time_stamp       | 2018-11-15 20:32:47  
 hour             | 20                   
 day              | 15                   
 month            | 11                   
 year             | 2018          

In [31]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = songplays.selectExpr("ts", "artist_id", "song_id", "sessionid","location","useragent", "userid").withColumn(
    "time_stamp", get_datetime(df_log.ts)).withColumn(
    "year", get_year(df_log.ts)).withColumn(
    "month", get_month(df_log.ts)).withColumn(
    "songplay_id", monotonically_increasing_id()).dropDuplicates()

# sample record from songplays_table
songplays_table.show(1,vertical=True)

-RECORD 0---------------------------
 ts          | 1542641764796        
 artist_id   | ARXR32B1187FB57099   
 song_id     | SOFSOCN12A8C143F5D   
 sessionid   | 724                  
 location    | San Francisco-Oak... 
 useragent   | Mozilla/5.0 (Wind... 
 userid      | 49                   
 time_stamp  | 2018-11-19 15:36:04  
 year        | 2018                 
 month       | 11                   
 songplay_id | 8                    
only showing top 1 row



## Sample queries

In [25]:
# Songplays per month
songplays_table.createOrReplaceTempView("songplays_table")
sample = spark.sql('SELECT month, count(*) as cnt  FROM songplays_table group by month order by month')
sample.show()

+-----+---+
|month|cnt|
+-----+---+
|   11| 21|
+-----+---+



In [51]:
# soung popularity by time of day
time_table.createOrReplaceTempView("time_table")
artists_table.createOrReplaceTempView("artists_table")

sample2 = spark.sql('''
SELECT t.hour, a.artist_name, COUNT(*)
FROM songplays_table s
JOIN time_table t
ON s.ts=t.ts
JOIN artists_table a
ON s.artist_id=a.artist_id
GROUP BY t.hour, a.artist_name
ORDER BY cast(t.hour as int), a.artist_name
''')
sample2.show()

+----+-----------------+--------+
|hour|      artist_name|count(1)|
+----+-----------------+--------+
|   3|Sophie B. Hawkins|       1|
|   7|      Lupe Fiasco|       1|
|   7|        Tom Petty|       1|
|   9|        Tom Petty|       1|
|  12|    Lionel Richie|       1|
|  13|      Lupe Fiasco|       1|
|  15|       Blue Rodeo|       1|
|  15|              Gob|       1|
|  15|    Lionel Richie|       1|
|  16|      Lupe Fiasco|       1|
|  17|           Trafik|       1|
|  18|      Lupe Fiasco|       1|
|  20|     Gwen Stefani|       1|
|  20|     Jimmy Wakely|       1|
|  20|      Line Renaud|       1|
|  20|    Lionel Richie|       2|
|  20|        Tom Petty|       1|
|  21|            Elena|       1|
|  22|        Tom Petty|       1|
|  23|      Lupe Fiasco|       1|
+----+-----------------+--------+

