In [1]:
from pyspark.sql import SparkSession

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()
spark

In [32]:
def process_song_data(spark, input_data, output_data) -> None:
    # get filepath to song data file
    song_data = input_data 
    
    # read song data file
    df = spark.read.json(song_data)
    print(df.printSchema())
    
    df.createOrReplaceTempView("song_data_table")

    # extract columns to create songs table
    query = """select song_id, artist_id, year, duration
    from song_data_table;
    """
    
    songs_table = spark.sql(query)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy("year", "artist_id").mode("overwrite").parquet(output_data + "songs/songs_table.parquet")

    # extract columns to create artists table
    query = """select artist_id, artist_name, artist_latitude, artist_longitude
    from song_data_table;
    """

    artists_table = spark.sql(query)
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(output_data + "artists/artists_table.parquet")
    

In [235]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = "../data/log_data/*.json"

    # read log data file
    df = spark.read.json(log_data)
    print(df.printSchema())
    
    df.createOrReplaceTempView("log_data_table")
    
    # extract columns from joined song and log datasets to create songplays table 
    query = """with songplay_data as
    (
        select e.userId as user_id, 
            s.song_id as song_id, 
            s.artist_id as artist_id, 
            e.sessionid as session_id, 
            e.ts as start_time, 
            e.level as level, 
            e.location as location, 
            e.userAgent as user_agent,
            year(TIMESTAMP 'epoch' + e.ts/1000 * INTERVAL '1 second') as year,
            month(TIMESTAMP 'epoch' + e.ts/1000 * INTERVAL '1 second') as month
        from log_data_table e
        join song_data_table s
        on s.title = e.song 
            and s.artist_name = e.artist
                and round(s.duration) = round(e.length)
        where page = 'NextSong'
    ) 
    select row_number() over (order by "monotonically_increasing_id") as songplay_id, * 
    
    from songplay_data;
    """
    
    songplays_table = spark.sql(query)
    songplays_table.createOrReplaceTempView("songplays_data_table")

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy("year", "month").mode("overwrite").parquet(output_data + "songplays/songplays_table.parquet")
    
    # extract columns for users table
    
    query = """with user_data (
        select userId, firstName, lastName, gender, level, row_number() over (partition by userId order by ts desc) as obs_num
        from log_data_table
        where page = 'NextSong'
    )
    select userId as user_id,
        firstName as first_name,
        lastName as last_name,
        gender,
        level,
        obs_num
    from user_data
    where obs_num = 1;
    """
    users_table = spark.sql(query)
    
    # write users table to parquet files
    users_table.write.mode("overwrite").parquet(output_data + "users/users_table.parquet")
    
    # extract columns to create time table
    query = """select distinct start_time,
                    hour(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as hour,
                    dayofyear(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as day,
                    weekofyear(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as week,
                    month(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as month,
                    year(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as year,
                    dayofweek(TIMESTAMP 'epoch' + start_time/1000 * INTERVAL '1 second') as weekday        
                from songplays_data_table;
    """
    time_table = spark.sql(query)
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year", "month").mode("overwrite").parquet(output_data + "time/time_table.parquet")
