In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()


In [None]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://udacity-dend-sparkproject4-osama/"

In [None]:
song_data = "{}song_data/*/*/*/*.json".format(input_data)

In [None]:
df = spark.read.json(song_data)

In [None]:
songs_table = df.select(['song_id','title','artist_id','year','duration'])
songs_table

In [None]:
s_path = f"{output_data}/songs.parquet"
songs_table.write.partitionBy(['year','artist_id']).parquet(s_path, mode="overwrite")

In [None]:
artists_table = df.selectExpr("artist_id",
                              "artist_name as name",
                              "artist_location as location",
                              "artist_latitude as latitude",
                              "artist_longitude as longitude")

In [None]:
a_path = f"{output_data}/artists.parquet"
artists_table.write.parquet(a_path, mode="overwrite")

In [None]:
log_data = "{}log_data/*/*/*.json".format(input_data)

In [None]:
df_log_data = spark.read.json(log_data)

In [None]:
# filter by actions for song plays
df_log_data = df_log_data.filter(df_log_data.page == 'NextSong')

# extract columns for users table    
users_table = df_log_data.selectExpr('userId as user_id','firstName as first_name','lastName as last_name','gender','level').distinct()


In [None]:
# write users table to parquet files
users_table.write.parquet("{}/users.parquet".format(output_data), mode="overwrite")

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts / 1000.0))
df_log_data = df_log_data.withColumn("timestamp", get_timestamp("ts"))

In [None]:
# create datetime column from original timestamp column
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df_log_data = df_log_data.withColumn("datetime", get_datetime("ts"))

In [None]:
df_log_data.createOrReplaceTempView("logs") 

time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM logs
    ORDER BY start_time
""")


In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy(['year','month']).parquet("{}/times.parquet".format(output_data), mode="overwrite")

In [None]:
# extract columns from joined song and log datasets to create songplays table
df_joined = df_log_data.join(df, (df_log_data.artist == df.artist_name) & (df_log_data.song == df.title))
df_joined = df_joined.withColumn("songplay_id", monotonically_increasing_id())

df_joined.createOrReplaceTempView("songplays")
songplays_table = spark.sql("""
    SELECT  songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level,
            song_id,
            artist_id,
            sessionId   AS session_id,
            location,
            userAgent   AS user_agent
    FROM songplays
    ORDER BY (user_id, session_id) 
""")


In [None]:
songplays_table.write.parquet("{}/songplays.parquet".format(output_data), mode="overwrite")


In [None]:
spark.sql("""
    SELECT  songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level,
            song_id,
            artist_id,
            sessionId   AS session_id,
            location,
            userAgent   AS user_agent
    FROM songplays
    ORDER BY (user_id, session_id) 
""").show()