In [16]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions  as F 
# udf, col, year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import *


# config = configparser.ConfigParser()
# config.read('dl.cfg')

# #os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
# #os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

# def create_spark_session():
#     spark = SparkSession \
#         .builder \
#         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#         .getOrCreate()
#     return spark


def process_song_data(spark, input_data, output_data):
    """
    Processes the songs data files and extracts info from it.
    :param spark: a spark session instance
    :param input_data: input file path
    :param output_data: output file path
    """
    # get filepath to song data file
    song_data = input_data + "song_data/*/*/*/*"
    
    # read song data file
    df = spark.read.json(song_data, mode='PERMISSIVE', \
                         columnNameOfCorruptRecord='corrupt_record')\
                .drop_duplicates()

    # extract columns to create songs table
    songs_table = df.select("song_id","title","artist_id","year","duration").distinct()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet\
    (output_data + "songs/", mode="overwrite", partitionBy=["year","artist_id"])

    # extract columns to create artists table
    artists_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude")\
    .distinct()
    
    # write artists table to parquet files
    artists_table.write.parquet(output_data + "artists/", mode="overwrite")


def process_log_data(spark, input_data, output_data):
    """
    Processes the event log file and extracts info from it.
    :param spark: a spark session instance
    :param input_data: input file path
    :param output_data: output file path
    """
    log_data = os.path.join(input_data, "log_data/")

    # read log data file
    df = spark.read.json(log_data, mode='PERMISSIVE', \
                         columnNameOfCorruptRecord='corrupt_record').distinct()
    
    # filter by actions for song plays
    df = df.filter(F.col('page') == "NextSong")

    # extract columns for users table    
    users_table = df.select("userId","firstName","lastName","gender","level").drop_duplicates()
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, "users/") , mode="overwrite")

    # create timestamp column from original timestamp column
    get_timestamp = F.udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
    df = df.withColumn("start_time", get_timestamp("ts"))
    
    # extract columns to create time table
    time_table =  df.withColumn("hour",F.hour("start_time"))\
                    .withColumn("day",F.dayofmonth("start_time"))\
                    .withColumn("week",F.weekofyear("start_time"))\
                    .withColumn("month",F.month("start_time"))\
                    .withColumn("year",F.year("start_time"))\
                    .withColumn("weekday",F.dayofweek("start_time"))\
                    .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").distinct()

    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet\
    (os.path.join(output_data, "time_table/"), mode='overwrite', partitionBy=["year","month"])

    # read in song data to use for songplays table
    song_df = spark.read\
                .format("parquet")\
                .option("basePath", os.path.join(output_data, "songs/"))\
                .load(os.path.join(output_data, "songs/*/*/"))

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, df.song == song_df.title, how='inner')\
                        .select(F.monotonically_increasing_id().alias("songplay_id")\
                            ,F.col("start_time"),F.col("userId").alias("user_id"),"level","song_id","artist_id", 
                            F.col("sessionId").alias("session_id"), "location", F.col("userAgent").alias("user_agent"))\
                        .join(time_table, ['start_time'], 'inner')\
                         .select("songplay_id", 'start_time', "user_id", "level", "song_id", "artist_id", "session_id", 
                                 "location", "user_agent", "year", "month")

    # write songplays table to parquet files partitioned by year and month
    songplays_table.drop_duplicates().write.parquet(os.path.join(output_data, "songplays/"), mode="overwrite", partitionBy=["year","month"])





VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
def main():
    spark = create_spark_session()
    input_data = "s3://patrickudacityde/"
    output_data = "s3://patrickudacityde/output/"

    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
input_data = "s3://patrickudacityde/"
output_data = "s3://patrickudacityde/output/"

# process_song_data(spark, input_data, output_data)    
process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
songplays_df = spark.read\
        .format("parquet")\
        .option("basePath", os.path.join(output_data, "songplays/"))\
        .load(os.path.join(output_data, "songplays/*/*/"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
songplays_df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
| songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|           0|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
| 17179869184|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
| 17179869185|2018-11-27 22:35:...|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|188978561024|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658| 