In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://publicbucketphi"

# Song data

In [5]:
# get filepath to song data file
song_data = input_data+'song_data/A/A/A/*.json'

In [6]:
# read song data file
df = spark.read.json(song_data)

In [7]:
# extract columns to create songs table
df.createOrReplaceTempView("song_data_table")

songs_table = spark.sql('''
    SELECT DISTINCT 
        song_id, 
        title, 
        artist_id, 
        year, 
        duration
    FROM song_data_table
''')

In [8]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [9]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year','artist_id').mode('overwrite').parquet(output_data+'/songs')

In [10]:
# extract columns to create artists table
artists_table = spark.sql('''
    SELECT DISTINCT 
        artist_id, 
        artist_name AS name, 
        artist_location AS location, 
        artist_latitude AS latitude, 
        artist_longitude AS longitude
    FROM song_data_table
''')

In [11]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [12]:
# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(output_data+'/artists')

# Log data

In [13]:
# get filepath to log data file
log_data = input_data+'log_data/*/*/*.json'

In [14]:
# read log data file
df = spark.read.json(log_data)

In [15]:
# filter by actions for song plays
df = df.where("page=='NextSong'")

In [16]:
# extract columns for users table
df.createOrReplaceTempView('log_data_table')

user_table = spark.sql('''
    SELECT DISTINCT
        userId AS user_id,
        firstName AS first_name,
        lastName AS last_name,
        gender,
        level
    FROM log_data_table
''')

In [17]:
user_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [19]:
# write users table to parquet files
user_table.write.mode('overwrite').parquet(output_data+'/users')

In [20]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: int(x/1000.0))
df = df.withColumn('timestamp', get_timestamp(df.ts))
    
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())
df = df.withColumn('date', get_datetime(df.timestamp), )
    
# extract columns to create time table
df.createOrReplaceTempView('log_data_table')

In [21]:
spark.udf.register("get_hour", lambda x: x.hour)
spark.udf.register("get_day", lambda x: x.day)
spark.udf.register("get_week", lambda x: x.strftime('%W'))
spark.udf.register("get_year", lambda x: x.year)
spark.udf.register("get_month", lambda x: x.month)
spark.udf.register("get_weekday", lambda x: x.weekday())

<function __main__.<lambda>(x)>

In [22]:
time_table = spark.sql('''
        SELECT DISTINCT
            date AS start_time,
            CAST (get_hour(date) AS int) AS hour,
            CAST (get_day(date) AS int) AS day,
            CAST (get_week(date) AS int) AS week,
            CAST (get_month(date) AS int) AS month,
            CAST (get_year(date) AS int) AS year,
            CAST (get_weekday(date) AS int) AS weekday
        FROM log_data_table
    ''')

In [23]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [24]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').mode('overwrite').parquet(output_data+'/time')

In [25]:
# read in song data to use for songplays table
song_df = spark.sql('''
    SELECT 
        *, 
        CAST (get_month(date) AS int) AS month,
        CAST (get_year(date) AS int) AS year
    FROM log_data_table
''')

In [30]:
# extract columns from joined song and log datasets to create songplays table
song_df.createOrReplaceTempView('log_data_table')

songplays_table = spark.sql('''
    SELECT DISTINCT
        l.date AS start_time,
        l.userId AS user_id,
        l.level,
        s.song_id,
        s.artist_id,
        l.sessionId AS session_id,
        l.location,
        l.userAgent,
        l.month,
        l.year
    FROM log_data_table l
    JOIN song_data_table s
    ON l.song = s.title
''')

In [31]:
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [32]:
df_tmp = songplays_table.rdd.zipWithIndex().toDF()
songplays_table = df_tmp.select(col("_1.*"),col("_2").alias('songplay_id'))
songplays_table.toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,userAgent,month,year,songplay_id
0,2018-11-15 16:19:05,97,paid,SOBLFFE12AF72AA5BA,ARJNIUY12298900C91,605,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018,0


In [33]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year','month').mode('overwrite').parquet(output_data+'/songplays')