In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
spark = create_spark_session()
#input_data = "s3a://udacity-dend/"

In [None]:
# get filepath to song data file
#song_data_path = f'{input_data}/song_data/A/A/*/*.json'
song_data_path = f'/home/workspace/data/song_data/*/*/*/*.json'

In [None]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, LongType, TimestampType
songdataSchema = StructType([
    StructField("num_songs", IntegerType()),
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_name", StringType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("duration", DoubleType()),
    StructField("year", IntegerType()),
])

In [None]:
# read song data file
song_df = spark.read.json(song_data_path, songdataSchema)

In [None]:
song_df.printSchema()
song_df.show(5)

In [None]:
# extract columns to create songs table
song_df.createOrReplaceTempView("song_data")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM song_data
""")
songs_table.show(5)

In [None]:
num = spark.sql("""
    SELECT count(song_id)
    FROM song_data
""")
num.show()

In [None]:
#output_data = "s3a://nancys-data-lake-project/"
output_data = "/home/workspace/result/"

In [None]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(os.path.join(output_data, "songs/"), mode='overwrite', partitionBy=["year","artist_id"])

In [None]:
# extract columns to create artists table
artists_table = spark.sql("""
    SELECT artist_id, artist_name AS name, artist_location AS location, artist_latitude AS latitude, 
    artist_longitude AS longitude
    FROM song_data
""")
artists_table.show(5)

In [None]:
# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, "artists/"), mode='overwrite')

In [None]:
# get filepath to log data file
#log_data = '{input_data}/log_data/'
#log_data = f'{input_data}/log_data/2018/11/2018-11-08-events.json'
log_data_path = f'/home/workspace/data/log_data/'

In [None]:
logdataSchema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", IntegerType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()),
    StructField("sessionId", IntegerType()),
    StructField("song", StringType()),
    StructField("status", IntegerType()),
    StructField("ts", LongType()),
    StructField("userAgent", StringType()),
    StructField("userId", StringType()),
])

In [None]:
# read log data file
logdf = spark.read.json(log_data_path, logdataSchema)

In [None]:
logdf.printSchema()
logdf.show(5)

In [None]:
# filter by actions for song plays
logdf = logdf.where("page == 'NextSong'")
logdf.createOrReplaceTempView("log_data")

In [None]:
tempdf = spark.sql("""
    SELECT max(s2.ts)
    FROM log_data s2 
""")
tempdf.show(5)

In [None]:
# extract columns for users table    
users_table = spark.sql("""
    SELECT DISTINCT s1.userId AS user_id, 
        s1.firstName AS first_name, 
        s1.lastName AS last_name, 
        s1.gender, 
        s1.level
    FROM log_data s1
    WHERE s1.userId IS NOT NULL 
    AND s1.ts = (SELECT max(s2.ts)
                        FROM log_data s2
                        WHERE s1.userId = s2.userId)    
""")

In [None]:
users_table.show(10)

In [None]:
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, "users/"), mode='overwrite')

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
logdf = logdf.withColumn('start_time', get_timestamp('ts'))

In [None]:
logdf.show(5)

In [None]:
logdf.printSchema()

In [None]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'), StringType())
logdf = logdf.withColumn('date_time', get_datetime('ts'))

In [None]:
logdf.show(5)

In [None]:
logdf.printSchema()

In [None]:
# extract columns to create time table
logdf.createOrReplaceTempView("log_data")
time_table = spark.sql("""
    SELECT date_time AS start_time,
        hour(start_time) AS hour, 
        day(start_time) AS day, 
        weekofyear(start_time) AS week, 
        month(start_time) AS month, 
        year(start_time) AS year, 
        weekday(start_time) AS weekday 
    FROM log_data
""")
time_table.show(5)

In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, "time/"), mode='overwrite', partitionBy=["year","month"])

In [None]:
songplays_df = song_df.join(logdf, (song_df.title == logdf.song) & (song_df.artist_name == logdf.artist) & ((song_df.duration == logdf.length)))

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
songplays_df = songplays_df.withColumn('songplay_id', monotonically_increasing_id())

In [None]:
songplays_df.printSchema()
songplays_df.show(1)

In [None]:
songplays_df.createOrReplaceTempView("songplays")

In [None]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table  = spark.sql("""
    SELECT 
        songplay_id,
        date_time AS start_time, 
        userId AS user_id, 
        level, 
        song_id, 
        artist_id, 
        sessionId AS session_id, 
        location, 
        userAgent AS user_agent,
        month(start_time) AS month, 
        year(start_time) AS year
    FROM songplays
    WHERE userId IS NOT NULL
        AND level IS NOT NULL
        AND sessionId IS NOT NULL
        AND location IS NOT NULL
        AND userAgent IS NOT NULL                                
        AND date_time IS NOT NULL
        AND song_id IS NOT NULL
        AND artist_id IS NOT NULL
""")

In [None]:
songplays_table.show(5)

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, "songplays/"), mode='overwrite', partitionBy=["year","month"])