First unzip data so we can load it in pyspark.

In [11]:
!unzip data/log-data.zip -d data/log_data

Archive:  data/log-data.zip
  inflating: data/log_data/2018-11-01-events.json  
  inflating: data/log_data/2018-11-02-events.json  
  inflating: data/log_data/2018-11-03-events.json  
  inflating: data/log_data/2018-11-04-events.json  
  inflating: data/log_data/2018-11-05-events.json  
  inflating: data/log_data/2018-11-06-events.json  
  inflating: data/log_data/2018-11-07-events.json  
  inflating: data/log_data/2018-11-08-events.json  
  inflating: data/log_data/2018-11-09-events.json  
  inflating: data/log_data/2018-11-10-events.json  
  inflating: data/log_data/2018-11-11-events.json  
  inflating: data/log_data/2018-11-12-events.json  
  inflating: data/log_data/2018-11-13-events.json  
  inflating: data/log_data/2018-11-14-events.json  
  inflating: data/log_data/2018-11-15-events.json  
  inflating: data/log_data/2018-11-16-events.json  
  inflating: data/log_data/2018-11-17-events.json  
  inflating: data/log_data/2018-11-18-events.json  
  inflating: data/log_data/2018-11-1

For some reason, the song_data zip has a folder song_data while log_data.zip is just the files.

In [15]:
!unzip data/song-data.zip -d data/

Archive:  data/song-data.zip
   creating: data/song_data/
  inflating: data/song_data/.DS_Store  
   creating: data/song_data/A/
  inflating: data/song_data/A/.DS_Store  
   creating: data/song_data/A/A/
  inflating: data/song_data/A/A/.DS_Store  
   creating: data/song_data/A/A/A/
  inflating: data/song_data/A/A/A/TRAAAAW128F429D538.json  
  inflating: data/song_data/A/A/A/TRAAABD128F429CF47.json  
  inflating: data/song_data/A/A/A/TRAAADZ128F9348C2E.json  
  inflating: data/song_data/A/A/A/TRAAAEF128F4273421.json  
  inflating: data/song_data/A/A/A/TRAAAFD128F92F423A.json  
  inflating: data/song_data/A/A/A/TRAAAMO128F1481E7F.json  
  inflating: data/song_data/A/A/A/TRAAAMQ128F1460CD3.json  
  inflating: data/song_data/A/A/A/TRAAAPK128E0786D96.json  
  inflating: data/song_data/A/A/A/TRAAARJ128F9320760.json  
  inflating: data/song_data/A/A/A/TRAAAVG12903CFA543.json  
  inflating: data/song_data/A/A/A/TRAAAVO128F93133D4.json  
   creating: data/song_data/A/A/B/
  inflating: data/song

In [57]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import TimestampType


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [2]:
spark = create_spark_session()

In [3]:
spark

In [67]:
# s3 path
#input_data = "s3a://udacity-dend/"
# workspace path
input_data = 'data'
output_data = "s3a://udacity-dend-spark-dwh/"

In [4]:
import time

In [24]:
# get filepath to song data file
song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')

# read song data file
start = time.time()
df = spark.read.json(song_data)
end = time.time()
print('took', int(end-start), 'seconds')

took 2 seconds


In [25]:
df.head()

Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)

In [26]:
df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [28]:
start = time.time()
# extract columns to create songs table
songs_cols = ['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table = df.select(songs_cols).dropDuplicates()

# write songs table to parquet files partitioned by year and artist
songs_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs'))
end = time.time()
print('took', int(end-start), 's to write songs table')

In [32]:
start = time.time()
# extract columns to create artists table
artists_cols = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artists_table = df.select(artists_cols).dropDuplicates()

# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, 'artists'))
end = time.time()
print('took', int(end-start), 's to write artists table')

took 424 s to write artists table


In [33]:
# get filepath to log data file
# on s3, paths are log_data/year/month/date.json
# in workspace, path is logdata/date.json
# log_data = os.path.join(input_data, 'log_data/*/*/*.json')
log_data = os.path.join(input_data, 'log_data/*.json')

# read log data file
start = time.time()
df = spark.read.json(log_data)
end = time.time()
print('took', int(end-start), 'seconds')

took 0 seconds


In [34]:
df.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

In [35]:
df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [40]:
df.dtypes

[('artist', 'string'),
 ('auth', 'string'),
 ('firstName', 'string'),
 ('gender', 'string'),
 ('itemInSession', 'bigint'),
 ('lastName', 'string'),
 ('length', 'double'),
 ('level', 'string'),
 ('location', 'string'),
 ('method', 'string'),
 ('page', 'string'),
 ('registration', 'double'),
 ('sessionId', 'bigint'),
 ('song', 'string'),
 ('status', 'bigint'),
 ('ts', 'bigint'),
 ('userAgent', 'string'),
 ('userId', 'string')]

In [37]:
# filter by actions for song plays
df = df.filter(df['page'] == 'NextSong')

start = time.time()
# extract columns for users table
users_cols = ['userId', 'firstName', 'lastName', 'gender', 'level']
users_table = df.select(users_cols).dropDuplicates()

# write users table to parquet files
users_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))
end = time.time()
print('took', int(end-start), 's to write users table')

took 549 s to write users table


In [41]:
# create timestamp column from original timestamp column
# ts is timestamp in ms
# this is not necessary since we don't use it
# get_timestamp = udf(lambda x: x / 1000, TimestampType())
# df = df.withColumn('ts_s', get_timestamp('ts'))

In [53]:
# create datetime column from original timestamp column
# timestamp is is ms
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
df = df.withColumn('start_time', get_datetime('ts'))

In [54]:
# extract columns to create time table

time_table = df.select("start_time").dropDuplicates() \
                        .withColumn("hour", hour(col("start_time"))) \
                        .withColumn("day", dayofmonth(col("start_time"))) \
                        .withColumn("week", weekofyear(col("start_time"))) \
                        .withColumn("month", month(col("start_time"))) \
                        .withColumn("year", year(col("start_time"))) \
                        .withColumn("weekday", date_format(col("start_time"), 'E'))

In [55]:
time_table.head()

Row(start_time=datetime.datetime(2018, 11, 21, 6, 18, 12, 796000), hour=6, day=21, week=47, month=11, year=2018, weekday='Wed')

In [56]:
start = time.time()
# write time table to parquet files partitioned by year and month
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_data, 'time'))
end = time.time()

print('took', int(end-start), 's to write time table')

took 1683 s to write time table


In [63]:
# read in song data to use for songplays table
songs_df = spark.read.parquet(os.path.join(output_data, 'songs/*/*/*'))

# read in artist data
artists_df = spark.read.parquet(output_data + 'artists/*')

# extract columns from joined song and log datasets to create songplays table
# columns desired: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
# we are joining the songs and artists data with logs to get the song_id and artist_id from the songs data
# need to join on song title and artist name

# first join songs and logs dfs on song title
songs_logs_df = df.join(songs_df, (df.song == songs_df.title))
# next join that df with artists on artist name
artists_songs_logs_df = songs_logs_df.join(artists_df, (songs_logs_df.artist == artists_df.artist_name))
songplay_cols = ['start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']
# calculate year and month from start_time -- probably faster than a join on the time table
songplays_table = artists_songs_logs_df.select(songplay_cols) \
                    .withColumn('songplay_id', monotonically_increasing_id()) \
                    .withColumn("month", month(col("start_time"))) \
                    .withColumn("year", year(col("start_time")))

# write songplays table to parquet files partitioned by year and month
start = time.time()
songplays_table.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays'))
end = time.time()
print('took', int(end-start), 's to write songplays table')

took 168 s to write songplays table
