This is a notebook for quick development and testing

In this project, we use Spark and data lakes to build an ETL pipeline for a data lake hosted on S3 for music streaming startup Sparkify.
  
The datasets include the Song Dataset and Log Dataset.  

Output tables include  

- Fact Table  
1. songplays - records in log data associated with song plays i.e. records with page NextSong  
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

- Dimension Tables  
1. users - users in the app  
user_id, first_name, last_name, gender, level
  
2. songs - songs in music database  
song_id, title, artist_id, year, duration  

3. artists - artists in music database  
artist_id, name, location, lattitude, longitude  
  
4. time - timestamps of records in songplays broken down into specific units  
start_time, hour, day, week, month, year, weekday

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, from_unixtime, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.context import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types as T

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['aws']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['aws']['AWS_SECRET_ACCESS_KEY']

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()
spark

In [3]:
input_data = "s3a://udacity-dend/song-data/A/A/C/*.json"
df1 = spark.read.json(input_data)
%time

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.68 µs


In [4]:
dfsong = df1.withColumnRenamed("name","artist_name")

songs table: song_id, title, artist_id, year, duration

In [5]:
songs_table = dfsong.select(*['song_id', 'title', 'artist_id', 'year','duration']).distinct()
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SODIWID12A6D4F803A|Lucille (1999 Dig...|ARGS47D1187FB40225|   0|127.97342|
|SOLLHMX12AB01846DC|   The Emperor Falls|AR1Y2PT1187FB5B9CE|   0|484.62322|
|SOIGIVK12AB018E9AA|             Ionized|ARNCNV91187FB4D552|1996|444.83873|
|SOAFBKM12AB01837A7|          Brain Dead|ARL14X91187FB4CF14|1995| 94.22322|
|SOEBWSU12AB018377B|      Sane As Can Be|ARDTIOA1187B99E779|2000|105.87383|
|SOWCWAD12AB017FD51|     Memories & Rust|ARPLTRF11E8F5C15C5|   0|222.82404|
|SOTCOTZ12A8C136BCB|            Elevator|AR7WK5411A348EF5EA|2008|248.31955|
|SOMZZON12A6701D3B9|My Lady (2003 Dig...|ARKUI581187B9A6856|1997|162.40281|
|SOAOJYY12A5

In [6]:
output_data = "s3a://de-zwmtrue/data-lake/"
songs_table.write.partitionBy('year','artist_id').parquet(output_data + "songs.parquet", mode="overwrite")

artists : artist_id, name, location, lattitude, longitude

In [7]:
artists_table = dfsong['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude'].\
    distinct().withColumnRenamed("artist_name","name").\
    withColumnRenamed("artist_location","location").\
    withColumnRenamed("artist_lattitude","lattitude").\
    withColumnRenamed("artist_longitude","longitude")

In [8]:
artists_table = artists_table.distinct()

In [9]:
artists_table.write.parquet(output_data + "artists.parquet", mode="overwrite")

In [10]:
input_data = "s3a://udacity-dend/log-data/2018/11/*.json"
dflog = spark.read.json(input_data)

In [11]:
dflog.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [12]:
dflog = dflog.where(dflog["page"] == 'NextSong')

In [13]:
users_table = dflog.select(
            col('userId').alias('user_id'),
            col('firstName').alias('first_name'),
            col('lastName').alias('last_name'),
            col('gender').alias('gender'),
            col('level').alias('level')
        ).distinct()

In [14]:
get_timestamp = udf(lambda ms: datetime.fromtimestamp(ms/1000.0), T.TimestampType())

In [15]:
dflog=dflog.withColumn("ts_timestamp",get_timestamp(dflog.ts))

In [16]:
dflog.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|        ts_timestamp|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|         The Prodig

In [17]:
time_table = (
    dflog.withColumn('hour', hour(col('ts_timestamp')))
        .withColumn('day', dayofmonth(col('ts_timestamp')))
        .withColumn('week', weekofyear(col('ts_timestamp')))
        .withColumn('month', month(col('ts_timestamp')))
        .withColumn('year', year(col('ts_timestamp')))
        .select(col("hour"),
            col("day"),
            col("week"),
            col("month"),
            col("year"),
            date_format('ts_timestamp', 'EEEE').alias('week_day')
               )
        )

In [18]:
time_table.show()

+----+---+----+-----+----+--------+
|hour|day|week|month|year|week_day|
+----+---+----+-----+----+--------+
|   0| 15|  46|   11|2018|Thursday|
|   0| 15|  46|   11|2018|Thursday|
|   0| 15|  46|   11|2018|Thursday|
|   3| 15|  46|   11|2018|Thursday|
|   5| 15|  46|   11|2018|Thursday|
|   5| 15|  46|   11|2018|Thursday|
|   5| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   6| 15|  46|   11|2018|Thursday|
|   7| 15|  46|   11|2018|Thursday|
|   7| 15|  46|   11|2018|Thursday|
|   7| 15|  46|   11|2018|Thursday|
+----+---+----+-----+----+--------+
only showing top 20 rows



In [19]:
songs_df = spark.read.option("mergeSchema", "true").parquet(output_data + "songs.parquet")

In [20]:
songs_df.show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODIWID12A6D4F803A|Lucille (1999 Dig...|127.97342|   0|ARGS47D1187FB40225|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|ARMAC4T1187FB3FA4C|
|SOMZZON12A6701D3B9|My Lady (2003 Dig...|162.40281|1997|ARKUI581187B9A6856|
|SOQXJID12A8AE456CD| Domingo astronómico|264.69832|2005|ARBJ3VU1187B9B472D|
|SOBIODA12A8C13CDAD|   Cradle the Crater|258.29832|2007|ARNUJQM1187FB3EE72|
|SOLLHMX12AB01846DC|   The Emperor Falls|484.62322|   0|AR1Y2PT1187FB5B9CE|
|SOYTPEP12AB0180E7B|     Twist and Shout|164.80608|1964|ARAJPHH1187FB5566A|
|SORUZFR12AB01866C1|     Hypnotic Trance|280.37179|1987|ARZJDBC1187FB52056|
|SOWCWAD12AB017FD51|     Memories & Rust|222.82404|   0|ARPLTRF11E8F5C15C5|
|SOEBWSU12AB

In [21]:
songs_df.columns

['song_id', 'title', 'duration', 'year', 'artist_id']

songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [28]:
 songplays_table = (
     dflog.join(songs_df, songs_df.title == dflog.song)
     .select(
         col('ts_timestamp').alias('start_time'),
         col('userId').alias('user_id'),
         col('level'),
         col('song_id'),
         col('artist_id'),
         col('sessionId').alias('session_id'),
         col('location'),
         col('userAgent').alias('user_agent'),
         month(col('ts_timestamp')).alias('month'),
         year(col('ts_timestamp')).alias("year")
     ).withColumn('songplay_id', F.monotonically_increasing_id()
        )
 )

In [29]:
songplays_table.columns

['start_time',
 'user_id',
 'level',
 'song_id',
 'artist_id',
 'session_id',
 'location',
 'user_agent',
 'month',
 'year',
 'songplay_id']

In [None]:
songplays_table.write.partitionBy('year','month').parquet(output_data + "songplays.parquet", mode="overwrite")