In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1653324128788_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg', encoding='utf-8-sig')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

In [None]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
def process_song_data(spark, input_data, output_data):
    """
        Description: This function loads song_data from S3 and processes it by extracting the songs and artist tables
        and then again loaded back to S3
        
        Parameters:
            spark       : this is the Spark Session
            input_data  : the location of song_data from where the file is load to process
            output_data : the location where after processing the results will be stored
            
    """
    song_data = input_data+ 'song_data/*/*/*/*.json'

    # read song data file
    df = spark.read.text(song_data)
    columns_for_songs = ["song_id", "title", "artist_id", "year",      "duration"]
    columns_for_artists = ["artist_id", "name", "location", "lattitude", "longitude"]
    # extract columns to create songs table
    songs_table = df.select(columns_for_songs)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(output_data+ 'songs')

    # extract columns to create artists table
    artists_table = df.select(columns_for_artists)
    
    # write artists table to parquet files
    artists_table.write.parquet(output_data+ 'artists') 

In [None]:
def process_log_data(spark, input_data, output_data):
    """
        Description: This function loads log_data from S3 and processes it by extracting the songs and artist tables
        and then again loaded back to S3. Also output from previous function is used in by spark.read.json command
        
        Parameters:
            spark       : this is the Spark Session
            input_data  : the location of song_data from where the file is load to process
            output_data : the location where after processing the results will be stored
            
    """
    log_data = input_data + 'log_data/*.json'
    # read log data file
    df = spark.read.text(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page=="NextSong")

    # extract columns for users table    
    columns_for_users = ["user_id", "first_name", "last_name",    "gender", "level"]
    users_table = df.select(columns_for_songs)
    
    # write users table to parquet files
    users_table.write.parquet(output_data+ 'users')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x :date_format \
                           (x,"%Y %M %d %h %i %s"),TimestampType())
    df = df.withColumns("timestamp", get_timestamp)
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x :datetime(x),DateType())
    df = df.withColumn('start_time', (df['ts']/1000).cast('timestamp'))
    df = df.withColumns("datetime", get_datetime('start_time'))
    
    # extract columns to create time table
    time_table = df.select('datetime',"start_time")
    time_table = time_table.withColumn("hour",hour("datetime"))
    time_table = time_table.withColumn("day",dayofmonth("datetime"))
    time_table = time_table.withColumn("week",weekofyear("datetime"))
    time_table = time_table.withColumn("hour",dayofweek("datetime"))
    time_table = time_table.withColumn("month",month("datetime"))
    time_table = time_table.withColumn("year",year("datetime"))
    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(output_data+'time')

    # read in song data to use for songplays table
    song_df = park.read.parquet(output_data+'songs')

    # extract columns from joined song and log datasets to create songplays table 
    columns_for_songplays = ["songplay_id", "start_time", "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent"]
    songplays_table = pd.merge([song_df,df], how="right").select(columns_for_songplays)

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(output_data+'songplays')

In [None]:
def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = ""
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()
