In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
config = configparser.ConfigParser()
config.read('./dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']


In [140]:
spark = create_spark_session()

# Artist

In [40]:
df_song = spark.read.json('./data/song_data/*/*/*/*.json')

In [141]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [151]:
songs_table = df_song.select(['song_id', 'title', 'duration',
                            'year', 'artist_name', 'artist_id']).dropDuplicates()

In [153]:
songs_table.write.parquet('songs_table.parquet', mode='overwrite')

# Log

In [165]:
df_log = spark.read.json('./data/log-data/*.json')

In [166]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [167]:
df_log_songs = df_log.where(df_log.page == 'NextSong').toPandas()

In [168]:
from datetime import datetime

In [170]:
import pandas as pd
pd.to_datetime(df_log_songs['ts'])

0      1970-01-01 00:25:42.241826796
1      1970-01-01 00:25:42.242481796
2      1970-01-01 00:25:42.242741796
3      1970-01-01 00:25:42.253449796
4      1970-01-01 00:25:42.260935796
                    ...             
6815   1970-01-01 00:25:41.107734796
6816   1970-01-01 00:25:41.108520796
6817   1970-01-01 00:25:41.109125796
6818   1970-01-01 00:25:41.109325796
6819   1970-01-01 00:25:41.110994796
Name: ts, Length: 6820, dtype: datetime64[ns]

In [171]:
get_timestamp = udf(lambda x: x/1000)

In [172]:
df_log_ts = df_log.withColumn('startTimeStamp', get_timestamp('ts'))

In [173]:
df_log_ts.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTimeStamp: string (nullable = true)



In [174]:
from pyspark.sql.types import IntegerType, TimestampType
toDateTime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())


df_log_datetime = df_log_ts.withColumn('startDateTime', toDateTime('startTimestamp'))

In [175]:
df_log_datetime.toPandas().head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,startTimeStamp,startDateTime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542241826.796,2018-11-14 22:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242481.796,2018-11-14 22:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242741.796,2018-11-14 22:45:41.796
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,1542247071.796,2018-11-14 23:57:51.796
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12,1542252577.796,2018-11-15 01:29:37.796


In [176]:
date_table = df_log_datetime.select(['startTimestamp', 'startDateTime']).dropDuplicates()

In [177]:
date_table = date_table.select([col('startTimestamp').alias('timestamp'), col('startDateTime').alias('DateTime'), 
                            year('startDateTime').alias('year'), month('startDateTime').alias('month')])

In [178]:
date_table.toPandas()

Unnamed: 0,timestamp,DateTime,year,month
0,1.542793987796E9,2018-11-21 07:53:07.796,2018,11
1,1.542825241796E9,2018-11-21 16:34:01.796,2018,11
2,1.542826610796E9,2018-11-21 16:56:50.796,2018,11
3,1.542210687796E9,2018-11-14 13:51:27.796,2018,11
4,1.543369456796E9,2018-11-27 23:44:16.796,2018,11
...,...,...,...,...
8018,1.542469913796E9,2018-11-17 13:51:53.796,2018,11
8019,1.542470926796E9,2018-11-17 14:08:46.796,2018,11
8020,1.541287022796E9,2018-11-03 20:17:02.796,2018,11
8021,1.543155430796E9,2018-11-25 12:17:10.796,2018,11


In [131]:
df_songs = spark.read.parquet('./songs_table.parquet')

In [132]:
df_songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: long (nullable = true)
 |-- artist_id: string (nullable = true)



In [180]:
df_log_ts.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTimeStamp: string (nullable = true)



In [156]:
df_songs = spark.read.parquet('./songs_table.parquet')

In [158]:
df_songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: long (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_id: string (nullable = true)



In [182]:
df_log_datetime.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTimeStamp: string (nullable = true)
 |-- startDateTime: timestamp (nullable = true)



In [188]:
df_songplays = df_log_datetime.join(df_songs, (df_log_datetime.artist == df_songs.artist_name) 
                                    & (df_log_datetime.song == df_songs.title) 
                                    & (df_log_datetime.length == df_songs.duration), 'left_outer').select(
                                        col('startTimeStamp').alias('starttimestamp'),
                                        col('userId').alias('user_id'),
                                        df_log_datetime.level,
                                        df_songs.song_id,
                                        df_songs.artist_id,
                                        col('sessionId').alias('session_id'),
                                        df_log_datetime.location,
                                        col('useragent').alias('user_agent')
                                    )

In [189]:
df_songplays.printSchema()

root
 |-- starttimestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

