In [8]:
import pandas as pd
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ShortType, DoubleType, DateType
from pyspark.sql import functions as F

In [9]:
try:
    spark.stop()
except:
    print('spark not initialized')

spark not initialized


In [10]:
spark = SparkSession \
    .builder \
    .master('18.188.68.193:40823') \
    .appName("Sparkify AWS EMR ETL") \
    .getOrCreate()

spark.newSession()

Exception: Java gateway process exited before sending its port number

In [None]:
# /opt/data/song_data/
# └── A
#     ├── A
#     │   ├── A
#     │   │   ├── TRAAAAK128F9318786.json
#     │   │   ├── TRAAAAV128F421A322.json
#     │   │   ├── TRAAABD128F429CF47.json
#     │   │   ├── TRAAACN128F9355673.json

song_schema = StructType([
    StructField("artist_id", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", ShortType(), True),
])

# song_df = spark.read.json('/opt/data/song_data/A/A/A/*.json', song_schema).cache()
song_df = spark.read.json('/opt/data/song_data/*/*/*/*.json', song_schema).cache()
song_df.printSchema()

In [None]:
print('song record count:', song_df.count())

In [None]:
song_df.limit(3).toPandas()

In [None]:
# workspace/data/log_data/
# └── 2018
#     └── 11
#         ├── 2018-11-01-events.json
#

event_df = spark.read.json('/opt/data/log_data/*/*/*.json') \
    .filter(F.col('page') == 'NextSong') \
    .withColumn('timestamp', F.from_unixtime(F.col('ts') / 1000)) \
    .withColumn('start_time', F.date_format('timestamp', 'yyyyMMddHH')) \
    .cache()

event_df.printSchema()

In [None]:
print('event (log) record count:', event_df.count())

In [None]:
event_df.limit(3).toPandas()

# Start Transform

## User Dimension DataFrame

In [None]:
d_user_df = event_df.select('userId', 'lastName', 'firstName', 'gender') \
    .dropDuplicates(['userId']) \
    .cache()


In [None]:
print('user dimension record count: ', d_user_df.count())

In [None]:
d_user_df.limit(3).toPandas()

## Artist Dimension DataFrame

In [None]:
d_artist_df = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') \
    .dropDuplicates(['artist_id']) \
    .cache()

In [None]:
print('artist dimension record count:', d_artist_df.count())

In [None]:
d_artist_df.limit(3).toPandas()

## Song Dimension DataFrame

In [None]:
d_song_df = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration') \
    .cache()

In [None]:
print('song dimension record count:', d_song_df.count())

In [None]:
d_song_df.limit(3).toPandas()

## Time Dimension DataFrame

In [None]:
d_time_df = event_df.select('timestamp', 'start_time') \
    .withColumn('year', F.year('timestamp')) \
    .withColumn('month', F.month('timestamp')) \
    .withColumn('day', F.dayofmonth('timestamp')) \
    .withColumn('hour', F.hour('timestamp')) \
    .withColumn('week_of_year', F.weekofyear('timestamp')) \
    .withColumn('weekday', F.dayofweek('timestamp')) \
    .select(['start_time', 'year', 'month', 'day', 'hour', 'week_of_year', 'weekday']) \
    .dropDuplicates(['start_time'])


In [None]:
print('time dimension record count:', d_time_df.count())

In [None]:
d_time_df.limit(3).toPandas()

## Songplay Fact DataFrame

In [None]:
tmp_df = d_song_df.withColumnRenamed('artist_id', 'song_artist_id')
tmp_df = tmp_df.join(d_artist_df, d_artist_df.artist_id == tmp_df.song_artist_id) \
    .select('song_id', 'title', 'duration', 'artist_id', 'artist_name')

# print('tmp record count:', tmp_df.count())
tmp_df.limit(3).toPandas()

In [None]:
comparison = [event_df.song == tmp_df.title, event_df.length.cast(ShortType()) == tmp_df.duration.cast(ShortType())]

f_songplay_df = event_df.withColumn('songplay_id', F.sha1(F.concat_ws('|', 'timestamp', 'userId', 'song'))) \
    .join(tmp_df, comparison, 'left') \
    .select(['songplay_id', 'start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent'])


In [None]:
print('songplay fact record count:', f_songplay_df.count())

In [None]:
c = f_songplay_df.filter(F.col('song_id').isNotNull()).count()
print('songplay fact records with song_id value:', c)

In [None]:
# f_songplay_df.write.save('f_songplay_df.json', format='json')
f_songplay_df.toPandas().to_csv('f_songplay_df.csv')
d_user_df.toPandas().to_csv('d_user_df.csv')
d_song_df.toPandas().to_csv('d_song_df.csv')
d_artist_df.toPandas().to_csv('d_artist_df.csv')
d_time_df.toPandas().to_csv('d_time_df.csv')