In [101]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [3]:
spark = create_spark_session()

In [9]:
# !unzip data/song-data.zip -d data/

In [26]:
output_data = "data/out/"
input_data = "data/"

## Songs

In [17]:
# read song data file
songs_data = spark.read.json("data/song_data/*/*/*/*.json")

In [18]:
songs_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [133]:
# extract columns to create songs table
songs_table = songs_data.select("song_id", "title", "artist_id", "year", "duration").distinct()
songs_table.take(2)

[Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934),
 Row(song_id='SOTTDKS12AB018D69B', title='It Wont Be Christmas', artist_id='ARMBR4Y1187B9990EB', year=0, duration=241.47546)]

In [134]:
# write songs table to parquet files partitioned by year and artist
(
    songs_table
    .write
    .partitionBy("year", "artist_id")
    .mode("overwrite")
    .parquet(output_data + "songs")
)

In [117]:
from pyspark.sql.functions import col

In [118]:
# extract columns to create artists table
artists_table = songs_data.select(
    "artist_id",
    col("artist_name").alias("name"),
    col("artist_location").alias("location"),
    col("artist_latitude").alias("latitude"),
    col("artist_longitude").alias("longitude")
).distinct()

In [120]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data + "artists")

## Logs

In [33]:
# !unzip data/log-data.zip -d data/log_data/2018/11

In [34]:
log_data = input_data + "log_data/*/*/*.json"

In [35]:
# read log data file
df = spark.read.json(log_data)

In [39]:
# filter by actions for song plays
df = df.where(df.page == "NextSong")

In [40]:
# extract columns for users table
artists_table = df.select("userId", "firstName", "lastName", "gender", "level").distinct()

In [41]:
# write users table to parquet files
artists_table.write.mode("overwrite").parquet(output_data + "users")

In [42]:
df.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [94]:
from pyspark.sql.types import FloatType, TimestampType

In [91]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x / 1000, FloatType())
df = df.withColumn("timestamp", get_timestamp(df.ts))

In [93]:
from datetime import datetime

In [98]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())
df = df.withColumn("start_time", get_datetime(df.timestamp))

In [105]:
# extract columns to create time table
temp_df = (
    df
    .withColumn("hour", hour("start_time"))
    .withColumn("day", dayofmonth("start_time"))
    .withColumn("week", weekofyear("start_time"))
    .withColumn("month", month("start_time"))
    .withColumn("year", year("start_time"))
    .withColumn("weekday", dayofweek("start_time"))
)
time_table = temp_df.select("start_time", "hour", "day", "week", "month", "year", "weekday")

In [106]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_data + "time")

In [141]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + "songs/*/*/*")

In [111]:
from pyspark.sql.functions import monotonically_increasing_id

In [172]:
# extract columns from joined song and log datasets to create songplays table 
artists_table = spark.read.parquet(output_data + "artists")

song_log_df = df.join(song_df, df.song == song_df.title)
song_log_artist_df = song_log_df.join(artists_table, song_log_df.artist == artists_table.name)
song_log_artist_time_df = song_log_artist_df.join(
    time_table,
    song_log_artist_df.start_time == time_table.start_time, 'left'
)
songplays_table = song_log_artist_time_df.select(
    monotonically_increasing_id().alias("songplay_id"),
    song_log_artist_df.start_time,
    col("userId").alias("user_id"),
    "level",
    "song_id",
    "artist_id", 
    "sessionId",
    song_log_df.location,
    col("userAgent").alias("user_agent"),
    "year",
    "month"
).repartition("year", "month")

In [173]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_data + 'songplays')