In [45]:
import configparser
from datetime import datetime
import os
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
!ls ./data/

log_data  log-data.zip	processed_data	song_data  song-data.zip


In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
input_data = "./data/"
output_data = "s3a://udacity-longnpp/sparkify/"
spark = create_spark_session()

In [None]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = input_data + "log_data/"

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.where(col("page") == "NextSong")

    # extract columns for users table    
    window = Window.partitionBy("userId").orderBy(F.desc("ts"))
    artists_table = df.withColumn("rank", F.rank().over(window))\
                .where(col("rank") == 1)\
                .where(col('userId').isNotNull())\
                .select(col('userId').alias('user_id'), 
                        col('firstName').alias('first_name'), 
                        col('lastName').alias('last_name'), 
                        col('gender'), 
                        col('level'))

    
    # write artists table to parquet files
    artists_table.write.parquet(output_data  + "artists_table")

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda row: datetime.fromtimestamp(row / 1000), TimestampType())
    df = df.withColumn("timestamp", get_timestamp("ts"))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda row: datetime.fromtimestamp(row / 1000).date(), DateType())
    df = df.withColumn("date", get_datetime("ts")
    
    # extract columns to create time table
    time_table = df.select('ts', 
                       'timestamp', 
                       'date', 
                       F.hour('timestamp').alias('hour'),
                       F.dayofmonth('timestamp').alias('day'),
                       F.weekofyear('timestamp').alias('week'),
                       F.month('timestamp').alias('month'),
                       F.year('timestamp').alias("year"),
                       F.dayofweek('timestamp').alias("weekday"))
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year", "month").parquet(output_data  + "time_table")

    # read in song data to use for songplays table
    song_df = spark.read.parquet(output_data + 'song_table') 

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, on=cond, how="left").select(col('userId').alias("user_id"),
                                                                'ts',
                                                                'year',
                                                                'month'
                                                                'level',
                                                                'song_id',
                                                                'artist_id',
                                                                col('sessionId').alias("session_id"),
                                                                'location',
                                                                col("userAgent").alias("user_agent")) 

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy("year", "month").parquet(output_data  + "songplays_table")

In [24]:
# get filepath to log data file
log_data = input_data + "log_data/"

# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df = df.where(col("page") == "NextSong")

df.printSchema()
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



6820

In [19]:
# extract columns for users table
window = Window.partitionBy("userId").orderBy(F.desc("ts"))
artists_table = df.withColumn("rank", F.rank().over(window))\
                .where(col("rank") == 1)\
                .where(col('userId').isNotNull())\
                .select(col('userId').alias('user_id'), 
                        col('firstName').alias('first_name'), 
                        col('lastName').alias('last_name'), 
                        col('gender'), 
                        col('level'))

artists_table.count()

96

In [50]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda row: datetime.fromtimestamp(row / 1000), TimestampType())
df = df.withColumn("timestamp", get_timestamp("ts"))

In [57]:
# create datetime column from original timestamp column
get_datetime = udf(lambda row: datetime.fromtimestamp(row / 1000).date(), DateType())
df = df.withColumn("date", get_datetime("ts"))

In [58]:
df.select(["ts", "timestamp", "date"]).limit(5).toPandas()

Unnamed: 0,ts,timestamp,date
0,1542241826796,2018-11-15 00:30:26.796,2018-11-15
1,1542242481796,2018-11-15 00:41:21.796,2018-11-15
2,1542242741796,2018-11-15 00:45:41.796,2018-11-15
3,1542253449796,2018-11-15 03:44:09.796,2018-11-15
4,1542260935796,2018-11-15 05:48:55.796,2018-11-15


In [59]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)



In [70]:
time_table = df.select('ts', 
                       'timestamp', 
                       'date', 
                       F.hour('timestamp').alias('hour'),
                       F.dayofmonth('timestamp').alias('day'),
                       F.weekofyear('timestamp').alias('week'),
                       F.month('timestamp').alias('month'),
                       F.year('timestamp').alias("year"),
                       F.dayofweek('timestamp').alias("weekday"))

time_table.printSchema()

root
 |-- ts: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [73]:
song_df = spark.read.parquet(output_data + 'song_tables')

song_df.printSchema()
song_df.count()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



71

In [75]:
cond = [df.song == song_df.title, 
       df.length == song_df.duration]

songplays_table = df.join(song_df, on=cond, how="left").select(col('userId').alias("user_id"),
                                            'ts',
                                            'level',
                                            'song_id',
                                            'artist_id',
                                            col('sessionId').alias("session_id"),
                                            'location',
                                            col("userAgent").alias("user_agent"))

In [76]:
songplays_table.count()

6820

In [78]:
songplays_table.where(col('song_id').isNotNull()).show()

+-------+-------------+-----+------------------+------------------+----------+--------------------+--------------------+
|user_id|           ts|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------+-------------+-----+------------------+------------------+----------+--------------------+--------------------+
|     15|1542837407796| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-------+-------------+-----+------------------+------------------+----------+--------------------+--------------------+



In [79]:
song_df.where(col("song_id") == 'SOZCTXZ12AB0182364').show()

+------------------+--------------+---------+----+------------------+
|           song_id|         title| duration|year|         artist_id|
+------------------+--------------+---------+----+------------------+
|SOZCTXZ12AB0182364|Setanta matins|269.58322|   0|AR5KOSW1187FB35FF4|
+------------------+--------------+---------+----+------------------+

