In [None]:
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
import pyspark.sql.types as t

In [None]:
spark = SparkSession.builder.config("spark.sql.catalogImplementation","in-memory").getOrCreate()

In [None]:
print(dir(spark))

In [None]:
df_song_data = spark.read.json("s3://udacity-dend2-mogo/song_data/")

In [None]:
df_log_data = spark.read.json("s3://udacity-dend2-mogo/log_data/")

In [None]:
df_song_data.show(5)

In [None]:
df_log_data.show(5)

<h3>Create Songs and Artist tables</h3>

Create songs table and write it to parquet file

In [None]:
# Create a songs dataframe view from existing songs dataframe
df_song_data.createOrReplaceTempView("songs_table_view")

songs_table = spark.sql(
    "SELECT artist_name, song_id, title, artist_id, year, duration FROM songs_table_view"
)
songs_table.show(5, False)

In [None]:
# create timestamp attached to each song parquet file
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
song_table_path = 'songs_table' + '.parquet' + "_" + now 
print(song_table_path)

In [None]:
# Write songs table dataframe view to parquet file
songs_table.write.partitionBy("year", "artist_id").parquet("s3://udacity-dend-mogo-output/" + song_table_path)

Create artists table and write it to parquet file

In [None]:
# Create a artist dataframe view from existing songs dataframe
df_song_data.createOrReplaceTempView("artist_table_view")
artist_table = spark.sql("\
    SELECT artist_id, artist_name AS name, artist_location AS location, artist_latitude AS latitude, artist_longitude AS longitude\
    FROM artist_table_view ")

artist_table.show(1)

In [None]:
# create timestamp attached to each artist parquet file
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
artist_table_path = 'artist_table' + '.parquet' + "_" + now 
print(artist_table_path)

In [None]:
# Write artist table dataframe view to parquet file
artist_table.write.partitionBy("artist_id").parquet("s3://udacity-dend-mogo-output/" + artist_table_path)

<h3>Create Users, Time, and Songplays tables</h3>

Create Users table and write it to parquet file

In [None]:
# Filter log data to get only logs where 'page' column is equals to 'NextSong'
filtered_ld_df = df_log_data.filter(df_log_data.page == 'NextSong')
filtered_ld_df.show(6)

In [None]:
# Create a artist dataframe view from existing songs dataframe
filtered_ld_df.createOrReplaceTempView("users_table_view")
user_table = spark.sql(
    "SELECT DISTINCT userId AS user_id, firstName AS first_name, lastName AS last_name, gender, level\
    FROM users_table_view"
)
user_table.show(5)

In [None]:
# create timestamp attached to each users parquet file
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
users_table_path = 'users_table' + '.parquet' + "_" + now 
print(users_table_path)

In [None]:
# Write user table dataframe view to parquet file
user_table.write.partitionBy("user_id").parquet("s3://udacity-dend-mogo-output/" + users_table_path)

Create Time table and write it to parquet file

In [None]:
# create a new log_data dataframe where the 'ts' column has been converted to timestamp

@F.udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)
    
filtered_ld_df = filtered_ld_df.withColumn("timestamp", get_timestamp("ts"))
filtered_ld_df.printSchema()

In [None]:
# create a new log_data dataframe where the 'ts' column has been converted to datetime string
@F.udf(t.StringType())
def get_datetime (ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
    
filtered_ld_df = filtered_ld_df.withColumn("datetime", get_datetime("ts"))
                    

filtered_ld_df.printSchema()
# filtered_ld_df.show(5)

In [None]:
# Create a time dataframe view from existing songs dataframe
filtered_ld_df.createOrReplaceTempView("time_table_view")
time_table = spark.sql(
    """SELECT DISTINCT datetime AS start_time, HOUR(timestamp) AS hour, 
        DAY(timestamp) AS day, WEEKOFYEAR(timestamp) AS week, 
        MONTH(timestamp) AS month, YEAR(timestamp) AS year, 
        DAYOFWEEK(timestamp) AS weekday
    FROM time_table_view"""
)

time_table.show(5)

In [None]:
# create timestamp attached to each time table parquet file
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
time_table_path = 'time_table' + '.parquet' + "_" + now 
print(time_table_path)

In [None]:
# Write users table dataframe view to parquet file
time_table.write.partitionBy("year").parquet("s3://udacity-dend-mogo-output/" + time_table_path)

Create songplays table and write it to parquet file tables

In [None]:
# First, join the filtered log_data dataframe with the song_data dataframe using Spark '.join' method

joined_ld_sd_df = filtered_ld_df.join(df_song_data, (filtered_ld_df.artist == df_song_data.artist_name) &  (filtered_ld_df.song == df_song_data.title))
# joined_ld_sd_df.printSchema()
joined_ld_sd_df.show(5)

In [None]:
joined_ld_sd_df = joined_ld_sd_df.withColumn("songplay_id", F.monotonically_increasing_id())


joined_ld_sd_df.createOrReplaceTempView("songplays_table_view")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_view
    ORDER BY (user_id, session_id) 
""")

# songplays_table.printSchema()
songplays_table.show(5)

In [None]:
# create timestamp attached to each songplay table parquet file
now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
songplay_table_path = 'songplay_table' + '.parquet' + "_" + now 
print(songplay_table_path)

In [None]:
# Write songplay table dataframe view to parquet file
songplays_table.write.parquet("s3://udacity-dend-mogo-output/" + songplay_table_path)