In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.window import Window

# Test basic read and write

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
#     spark = SparkSession.builder.getOrCreate()
    return spark
spark = create_spark_session()

## Process Song Data

In [3]:
data = 'data/song_data/*/*/*/*.json'
output_data = 'data/parquet/'

# read song data file

df = spark.read.json(data)

df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [70]:
# extract columns to create songs table
df.createOrReplaceTempView("staging_songs")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM staging_songs
    ORDER BY song_id
""")

# write songs table to parquet files partitioned by year and artist
songs_table_path = "{}{}".format(output_data, 'songs.parquet')
songs_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(songs_table_path)

# extract columns to create artists table
artists_table = spark.sql("""
    SELECT artist_id, artist_name AS name, artist_location AS location,
           artist_latitude AS latitude, artist_longitude AS longitude
    FROM staging_songs
    ORDER BY artist_id
""")

# write artists table to parquet files
artists_table_path = "{}{}".format(output_data, 'artists.parquet')
artists_table.write.mode('overwrite').parquet(artists_table_path)

## Process Log Data

In [71]:
# data = 'data/song_data/*/*/*/*.json'
data = 'data/log-data/*.json'
output_data = 'data/parquet/'

# read song data file

df = spark.read.json(data)

df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [72]:
df.createOrReplaceTempView("staging_events")
df = spark.sql("""SELECT * FROM staging_events WHERE page='NextSong'""")

In [73]:
df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [74]:
# extract columns for users table    
users_table = spark.sql("""
    SELECT DISTINCT userId AS user_id, firstName AS first_name, lastName as last_name,
                    gender, level
    FROM staging_events
    ORDER BY user_id
""")

# write users table to parquet files
users_table_path = "{}{}".format(output_data, 'users.parquet')
users_table.write.mode('overwrite').parquet(users_table_path)

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0))
df = df.withColumn('timestamp', get_timestamp('ts'))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn('datetime', get_datetime('ts'))
df.createOrReplaceTempView("staging_events")

# extract columns to create time table
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, hour(timestamp) AS hour, day(timestamp)  AS day,
                     weekofyear(timestamp) AS week, month(timestamp) AS month, year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM staging_events
    ORDER BY start_time
""")

# write time table to parquet files partitioned by year and month
time_table_path = "{}{}".format(output_data, 'time.parquet')
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(time_table_path)

## Create Songplays table

In [133]:
songs_table = spark.read.parquet("{}{}".format(output_data, 'songs.parquet'))
artists_table = spark.read.parquet("{}{}".format(output_data, 'artists.parquet'))
w = Window().orderBy('song_id')
df_joint = df.join(songs_table.alias("s"), df.song == col('s.title')) \
             .join(artists_table.alias("a"), df.artist == col('a.name')) \
             .select(
                col('timestamp').alias('start_time'),
                col('userId').alias('user_id'),
                'level',
                's.song_id',
                'a.artist_id',
                col('sessionId').alias('session_id'),
                'a.location',
                col('userAgent').alias('user_agent')) \
             .withColumn('songplay_id', row_number().over(w))

In [134]:
df_joint.limit(10).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,songplay_id
0,"java.util.GregorianCalendar[time=?,areFieldsSe...",15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1


## Write to local parquet

In [31]:
df.write.parquet('data/parquet/test.parquet')