# Draft notebook for Dataset check

In [1]:
import configparser
from datetime import datetime, date
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType, DateType
import argparse

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [3]:
spark = create_spark_session()

In [4]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data =  input_data + "/song_data/*/*/*/*.json"
    
    # read song data file
    df = spark.read.json(song_data)
    df.printSchema()

    # extract columns to create songs table
    songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()
    songs_table.createOrReplaceTempView("songs")
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(output_data+"songs", 
                              mode='overwrite',
                              partitionBy=["year", "artist_id"]
                            )

    # extract columns to create artists table
    artists_table = df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])
    artists_table = artists_table.withColumnRenamed("artist_name", "name") \
                                 .withColumnRenamed("artist_location", "location") \
                                 .withColumnRenamed("artist_latitude", "latitude") \
                                 .withColumnRenamed("artist_longitude", "longitude") \
                                 .distinct()      
    
    # write artists table to parquet files
    artists_table.write.parquet(output_data+"artists", 
                              mode='overwrite',
                            )


In [30]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = input_data + "log_data/*/*/*.json"

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter("page = 'NextSong'")

    # extract columns for users table    
    users = df.select(["ts", "userId", "firstName", "lastName", "gender", "level"]) \
                    .withColumnRenamed("userId", "user_id") \
                    .withColumnRenamed("firstName", "first_name") \
                    .withColumnRenamed("lastName", "last_name")
    users.createOrReplaceTempView("users")
    users_table = spark.sql(
            """
                WITH numbered_levels AS (
                  SELECT ROW_NUMBER() over (PARTITION by user_id ORDER BY ts DESC) AS row_num,
                         user_id,
                         first_name, 
                         last_name, 
                         gender, 
                         level
                    FROM users
                )
                SELECT DISTINCT user_id, first_name, last_name, gender, level
                  FROM numbered_levels
                 WHERE row_num = 1
            """
        )    
    
    # write users table to parquet files
    users_table.write.parquet(output_data + "users", 
                              mode='overwrite',
                            )

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda epoch: datetime.fromtimestamp(epoch / 1000.0), TimestampType())
    df = df.withColumn("start_time", get_timestamp("ts"))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda epoch: date.fromtimestamp(epoch / 1000.0), DateType())
    df = df.withColumn("date", get_datetime("ts"))
    
    # extract columns to create time table
    time_table = df.select("start_time", 
                           hour("date").alias("hour"), 
                           dayofmonth("date").alias("day"), 
                           weekofyear("date").alias("week"), 
                           month("date").alias("month"),
                           year("date").alias("year"),
                           dayofweek("date").alias("weekday")
                        ).distinct()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(output_data + "times", 
                              mode='overwrite',
                              partitionBy=["year", "month"]
                            )

    # read in song data to use for songplays table
    song_df = df.select("song", 
                         "length", 
                         "page", 
                         "start_time",
                         "userId", 
                         "level", 
                         "sessionId",
                         "location", 
                         "userAgent",
                        month("date").alias("month"),
                        year("date").alias("year"),
                        )

    # extract columns from joined song and log datasets to create songplays table 
    song_df.createOrReplaceTempView("staging_events") 

    songplays_table = spark.sql(
            """
            SELECT row_number() OVER (PARTITION BY start_time ORDER BY start_time) as songplay_id,
                   e.start_time, 
                   e.userId AS user_id, 
                   e.level AS level, 
                   s.song_id AS song_id, 
                   s.artist_id AS artist_id, 
                   e.sessionId AS session_id, 
                   e.location AS location, 
                   e.userAgent AS user_agent,
                   e.year,
                   e.month
            FROM staging_events e
            LEFT JOIN songs s 
                   ON e.song = s.title
                  AND e.length = s.duration
                  AND e.year = s.year
            """
        )
    print("songplays_table", songplays_table.limit(5).collect())
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(output_data + "songplays", 
                              mode='overwrite',
                              partitionBy=["year", "month"]
                            )

In [5]:
input_data = "data/"
output_data = "out/"

In [7]:
process_song_data(spark, input_data, output_data)

In [38]:
process_log_data(spark, input_data, output_data)

songplays_table [Row(songplay_id=1, start_time=datetime.datetime(2018, 11, 2, 12, 30, 22, 796000), user_id='15', level='paid', song_id=None, artist_id=None, session_id=172, location='Chicago-Naperville-Elgin, IL-IN-WI', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', year=2018, month=11), Row(songplay_id=1, start_time=datetime.datetime(2018, 11, 3, 19, 1, 33, 796000), user_id='95', level='paid', song_id=None, artist_id=None, session_id=152, location='Winston-Salem, NC', user_agent='"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53"', year=2018, month=11), Row(songplay_id=1, start_time=datetime.datetime(2018, 11, 4, 6, 33, 51, 796000), user_id='25', level='paid', song_id=None, artist_id=None, session_id=128, location='Marinette, WI-MI', user_agent='"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/

In [14]:
song_data =  input_data + "/song_data/*/*/*/*.json"
    
# read song data file
df = spark.read.json(song_data)

In [15]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [16]:
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()

In [19]:
# songs_table.toPandas()

In [6]:
log_data = input_data + "log_data/*/*/*.json"
# log_data = input_data + "log_data/*.json"

In [7]:
df = spark.read.json(log_data)

In [8]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

