# Configuration

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_date

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

create spark

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
# using test datasets
input_data='/home/workspace/data/'
output_data = "s3a://awsbucketofmxy/"

# Process song_data

In [5]:
# get filepath to song data file
song_data = input_data + "song-data/*/*/*/*.json"

In [None]:
# read song data file
df_song = spark.read.json(song_data)

print(df_song.count())
df_song.show(5)

In [42]:
df_song.createOrReplaceTempView("song_data")

In [43]:
# extract columns to create songs table
songs_table = spark.sql("""
    select 
        distinct (song_id) AS song_id, 
        title AS title,
        artist_id AS artist_id,
        year AS year,
        duration AS duration
    from song_data
    where song_id IS NOT NULL
""")
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [50]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + 'songs', mode="overwrite")

In [47]:
# extract columns to create artists table
artists_table = spark.sql("""
    select 
        distinct (artist_id) AS artist_id, 
        artist_name AS name,
        artist_location AS location,
        artist_latitude AS lattitude,
        artist_longitude AS longitude
    from song_data
    where artist_id IS NOT NULL
""")
artists_table.show(5)

+------------------+------------+---------------+---------+----------+
|         artist_id|        name|       location|lattitude| longitude|
+------------------+------------+---------------+---------+----------+
|ARPBNLO1187FB3D52F|    Tiny Tim|   New York, NY| 40.71455| -74.00712|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|     null|      null|
|AR0IAWL1187B9A96D0|Danilo Perez|         Panama|   8.4177| -80.11278|
|ARMBR4Y1187B9990EB|David Martin|California - SF| 37.77916|-122.42005|
|ARD0S291187B9B7BF5|     Rated R|           Ohio|     null|      null|
+------------------+------------+---------------+---------+----------+
only showing top 5 rows



In [49]:
# write artists table to parquet files
artists_table.write.parquet(output_data + 'artist', mode="overwrite")

# Process log data

In [7]:
log_data = input_data + "log-data/*.json"

In [8]:
# read log data file
df_log = spark.read.json(log_data)

print(df_log.count())
df_log.show(5)

8056
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gun

In [9]:
# filter by actions for song plays
df_log = df_log.filter(df_log.page == 'NextSong')

In [10]:
df_log.createOrReplaceTempView("log_data")

In [45]:
# extract columns for users table    
users_table=spark.sql("""
    select 
        distinct (userId) AS user_id, 
        firstName AS first_name,
        lastName AS last_name,
        gender AS gender,
        level AS level
    from log_data
    where userId IS NOT NULL
""")
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [None]:
# write users table to parquet files
users_table.write.parquet(output_data + 'users', mode="overwrite")

In [11]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: int(x/1000))
df_log =df_log.withColumn('timestamp', get_timestamp(df_log.ts))

df_log.select('timestamp').drop_duplicates().show(5)


In [12]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x).strftime('%Y-%m-%d %H:%M:%S'))
df_log = df_log.withColumn('datetime', get_datetime(df_log.timestamp))

df_log.select('timestamp','datetime').drop_duplicates().show(5)

+----------+-------------------+
| timestamp|           datetime|
+----------+-------------------+
|1542267665|2018-11-15 07:41:05|
|1542283677|2018-11-15 12:07:57|
|1542285148|2018-11-15 12:32:28|
|1542807206|2018-11-21 13:33:26|
|1542822073|2018-11-21 17:41:13|
+----------+-------------------+
only showing top 5 rows



In [13]:
# extract columns to create time table
time_table =  df_log.select(col('ts').alias('ts'),
                            col('timestamp').alias('start_time'),
                           hour('datetime').alias('hour'),
                           dayofmonth('datetime').alias('day'),
                           weekofyear('datetime').alias('week'),
                           month('datetime').alias('month'),
                           year('datetime').alias('year'),
                           date_format('datetime','E').alias('weekday')
                           ).drop_duplicates()
time_table.show(3)

+-------------+----------+----+---+----+-----+----+-------+
|           ts|start_time|hour|day|week|month|year|weekday|
+-------------+----------+----+---+----+-----+----+-------+
|1542280285796|1542280285|  11| 15|  46|   11|2018|    Thu|
|1542795222796|1542795222|  10| 21|  47|   11|2018|    Wed|
|1542181772796|1542181772|   7| 14|  46|   11|2018|    Wed|
+-------------+----------+----+---+----+-----+----+-------+
only showing top 3 rows



In [43]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').parquet(output_data + 'time', mode="overwrite")

In [14]:
time_table.createOrReplaceTempView("time")

In [39]:
# read in song data to use for songplays table
song_data = input_data + "song-data/*/*/*/*.json"
df_song = spark.read.json(song_data)

df_song.createOrReplaceTempView("song_data")
df_song.count()

71

In [41]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql('''
    SELECT
        t.start_time AS start_time, 
        l.userId AS user_id, 
        l.level AS level, 
        s.song_id AS song_id, 
        s.artist_id AS artist_id, 
        l.sessionId AS session_id, 
        l.location AS location, 
        l.userAgent AS user_agent,
        t.year AS year,
        t.month AS month
    FROM
        log_data l 
        JOIN song_data s ON
            l.song=s.title
            AND l.artist=s.artist_name
        JOIN time t ON
            l.ts=t.ts
''')
songplays_table.show()

+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|1542837407|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+



In [42]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year','month').parquet(output_data + 'songplays', mode="overwrite")