In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import posixpath


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
input_data = "s3a://udacity-dend"
output_data = "s3a://sparkify-data-processed"
song_data = posixpath.join(input_data, 'song_data/A/B/C/*.json')

## Explore Song Data

In [4]:
df = spark.read.json(song_data)

In [5]:
df

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [6]:
df.select('song_id', 'title', 'artist_id', 'year', 'duration').filter(col('song_id').isNotNull()).drop_duplicates()

DataFrame[song_id: string, title: string, artist_id: string, year: bigint, duration: double]

In [7]:
df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').filter(col('artist_id').isNotNull()).drop_duplicates()

DataFrame[artist_id: string, artist_name: string, artist_location: string, artist_latitude: double, artist_longitude: double]

## Test the ETL Pipeline code on our sample

In [8]:
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').filter(col('song_id').isNotNull()).drop_duplicates()

# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(posixpath.join(output_data, "songs/"), mode="overwrite", partitionBy=["year","artist_id"])

# extract columns to create artists table
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').filter(col('artist_id').isNotNull()).drop_duplicates()

# write artists table to parquet files
artists_table.write.parquet(posixpath.join(output_data, "artists/"), mode="overwrite", partitionBy=["artist_id"])

## Explore Log Data

In [9]:
log_data =posixpath.join(input_data, 'log_data/2018/11/*.json')

In [10]:
df_log = spark.read.json(log_data)

In [11]:
df_log

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [13]:
df_log.select('ts').show()

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
|1542247071796|
|1542252577796|
|1542253449796|
|1542253460796|
|1542260074796|
|1542260277796|
|1542260935796|
|1542261224796|
|1542261356796|
|1542261662796|
|1542261713796|
|1542262057796|
|1542262233796|
|1542262434796|
|1542262456796|
|1542262679796|
|1542262728796|
+-------------+
only showing top 20 rows



In [16]:
from pyspark.sql import functions as f
from pyspark.sql.types import TimestampType

df_log = df_log.withColumn('start_time', (df_log.ts/1000).cast(dataType=TimestampType()))

In [17]:
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

df_log.withColumn("hour",hour("start_time"))\
      .withColumn("day",dayofmonth("start_time"))\
      .withColumn("week",weekofyear("start_time"))\
      .withColumn("month",month("start_time"))\
      .withColumn("year",year("start_time"))\
      .withColumn("weekday",dayofweek("start_time"))\
      .select("start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

DataFrame[start_time: timestamp, hour: int, day: int, week: int, month: int, year: int, weekday: int]

## Test the ETL Pipeline code on a sample

In [18]:
# read log data file
df = df_log

# filter by actions for song plays
df = df.filter(col('page') == 'NextSong')

# extract columns for users table    
users_table = df.select("userId","firstName","lastName","gender","level").filter(col('userId').isNotNull()).drop_duplicates()

# write users table to parquet files
users_table.write.parquet(posixpath.join(output_data, "users/"), mode="overwrite")

# create timestamp column from original timestamp column
df = df.withColumn('start_time', (df.ts/1000).cast(dataType=TimestampType()))

# extract columns to create time table
time_table = df_log.withColumn("hour",hour("start_time"))\
                   .withColumn("day",dayofmonth("start_time"))\
                   .withColumn("week",weekofyear("start_time"))\
                   .withColumn("month",month("start_time"))\
                   .withColumn("year",year("start_time"))\
                   .withColumn("weekday",dayofweek("start_time"))\
                   .select("start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

# write time table to parquet files partitioned by year and month

In [24]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(posixpath.join(output_data, "time_table/"), mode="overwrite", partitionBy=["year", "month"])

In [25]:
# read in song data to use for songplays table
song_df =  spark.read.parquet(posixpath.join(output_data,'songs_table/'))

In [22]:
songplays_table

DataFrame[start_time: timestamp, user_id: string, level: string, song_id: string, artist_id: string, session_id: bigint, location: string, user_agent: string, songplay_id: bigint]

In [23]:
from pyspark.sql.functions import udf, col, monotonically_increasing_id
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, df.song == song_df.title, how='inner')\
                    .select(col("start_time"),col("userId").alias("user_id"),
                            "level","song_id","artist_id", col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent")).drop_duplicates()
songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(posixpath.join(output_data, "songplays_table/"), mode="overwrite")