In [54]:
import configparser
import datetime as dt
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

In [2]:
spark = SparkSession \
    .builder \
    .appName("datalake_test") \
    .getOrCreate()


# SONG DATA

In [10]:
song_data ="data/song_data"


In [11]:
df=spark.read.json(song_data)

In [12]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [13]:
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARKFYS91187B98E58F|           null|                    |            null|Jeff And Sheri Ea...| 267.7024|        1|SOYMRWW12A6D4FAB14|The Moon And I (O...|   0|
|AR10USD1187B99F3F1|           null|Burlington, Ontar...|            null|Tweeterfriendly M...|189.57016|        1|SOHKNRJ12A6701D1F8|        Drop of Rain|   0|
|ARGSJW91187B9B1D6B|       35.21962|      North Carolina|       -80.01955|        JennyAnyKind|218.77506|        1|SOQHXMF12AB0182363|     Young Boy Blues|   0|
|ARMJAGH1187FB546F3|       35.1496

In [14]:
df.createOrReplaceTempView('stagingsongs_table')
songs_table= spark.sql('''
            SELECT song_id, title, artist_id, year, duration
            FROM stagingsongs_table''')

In [15]:
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOQHXMF12AB0182363|     Young Boy Blues|ARGSJW91187B9B1D6B|   0|218.77506|
|SOCIWDW12A8C13D406|           Soul Deep|ARMJAGH1187FB546F3|1969|148.03546|
|SONHOTT12A8C13493C|     Something Girls|AR7G5I41187FB4CE6C|1982|233.40363|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUDSGM12AC9618304|Insatiable (Instr...|ARNTLGG11E2835DDB9|   0|266.39628|
|SOMJBYD12A6D4F8557|Keepin It Real (S...|ARD0S291187B9B7BF5|   0|114.78159|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SOFSOCN12A8

In [16]:
# write songs table to parquet files partitioned by year and artist
#example df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
songs_table.write.partitionBy('year', 'artist_id').format('parquet').save('songs.parquet')

In [17]:
songs_parquet=spark.read.parquet("songs.parquet")

In [18]:
songs_parquet.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [19]:
songs_parquet.show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOUDSGM12AC9618304|Insatiable (Instr...|266.39628|   0|ARNTLGG11E2835DDB9|
|SOMJBYD12A6D4F8557|Keepin It Real (S...|114.78159|   0|ARD0S291187B9B7BF5|
|SOMZWCG12A8C13C480|    I Didn't Mean To|218.93179|   0|ARD7TVE1187B99BFB1|
|SOQHXMF12AB0182363|     Young Boy Blues|218.77506|   0|ARGSJW91187B9B1D6B|
|SONHOTT12A8C13493C|     Something Girls|233.40363|1982|AR7G5I41187FB4CE6C|
|SOXVLOJ12AB0189215|     Amor De Cabaret|177.47546|   0|ARKRRTF1187B9984DA|
|SOFSOCN12A8C143F5D|      Face the Ashes|209.60608|2007|ARXR32B1187FB57099|
|SOHKNRJ12A6701D1F8|        Drop of Rain|189.57016|   0|AR10USD1187B99F3F1|
|SOIAZJW12AB01853F1|          Pink World|269.81832|1984|AR8ZCNI1187B9A069B|
|SOCIWDW12A8

In [20]:
    # extract columns to create artists table

artists_table = spark.sql('''
                SELECT artist_id, 
                artist_name        AS name, 
                artist_location    AS location, 
                artist_latitude    AS latitude, 
                artist_longitude   AS longitude
                FROM stagingsongs_table
                ''')


In [21]:
artists_table.show()

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARKFYS91187B98E58F|Jeff And Sheri Ea...|                    |    null|     null|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|    null|     null|
|ARGSJW91187B9B1D6B|        JennyAnyKind|      North Carolina|35.21962|-80.01955|
|ARMJAGH1187FB546F3|        The Box Tops|         Memphis, TN|35.14968|-90.04892|
|AR7G5I41187FB4CE6C|            Adam Ant|     London, England|    null|     null|
|ARD7TVE1187B99BFB1|              Casual|     California - LA|    null|     null|
|ARNTLGG11E2835DDB9|                 Clp|                    |    null|     null|
|ARD0S291187B9B7BF5|             Rated R|                Ohio|    null|     null|
|ARKRRTF1187B9984DA|    Sonora Santanera|                    |    null|     null|
|AR8ZCNI1187B9A0

In [22]:
    
# write artists table to parquet files
artists_table.write.format('parquet').save('artist.parquet')

In [23]:
artists_parquet=spark.read.parquet("artist.parquet")

In [24]:
artists_parquet.select('name','location').orderBy('name').show()

+--------------------+--------------------+
|                name|            location|
+--------------------+--------------------+
|            Adam Ant|     London, England|
|              Casual|     California - LA|
|                 Clp|                    |
|                 Gob|                    |
|Jeff And Sheri Ea...|                    |
|        JennyAnyKind|      North Carolina|
|    Planet P Project|                    |
|             Rated R|                Ohio|
|    Sonora Santanera|                    |
|        The Box Tops|         Memphis, TN|
|Tweeterfriendly M...|Burlington, Ontar...|
+--------------------+--------------------+



# LOG DATA

In [27]:
log_data ="data/log_data"



In [28]:
# read log data file
df1 = spark.read.json(log_data)

In [29]:
df1.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [30]:
df1.count()

8056

In [31]:
# filter by actions for song plays
df2 =df1[df1.page =='NextSong']

In [32]:
df2.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [33]:
df2.count()

6820

In [34]:
# extract columns for users table    
df2.createOrReplaceTempView('stagingevents_table')
artists_table = spark.sql('''
                SELECT userId    AS user_id, 
                firstName        AS first_name, 
                lastName         AS last_name, 
                gender, 
                level
                FROM stagingevents_table
                ''')

In [35]:
artists_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     49|     Chloe|   Cuevas|  

In [36]:
    
# write users table to parquet files
artists_table.write.format('parquet').save('users.parquet')

In [37]:
users= spark.read.parquet('users.parquet')

In [38]:
users.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [55]:
#from pyspark.sql import functions as f
# create timestamp column from original timestamp column
#df.withColumn('epoch', f.to_date(df.epoch.cast(dataType=t.TimestampType())))
get_timestamp = udf(lambda x: dt.datetime.fromtimestamp(x / 1000), TimestampType())
#

In [56]:
df_ts =df2.withColumn("timestamp", get_timestamp(df2.ts))

In [66]:
df_ts=df_ts.drop('ts')

In [80]:
df_ts.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           userAgent|userId|           timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextS

In [118]:
    
# create datetime column from original timestamp column
#get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
#df_dt = df_ts.withColumn('starttime', get_datetime('timestamp'))

NameError: name 'TimestampType' is not defined

In [70]:
# extract columns to create time table
df_ts.createOrReplaceTempView('df_ts_table')
time_table = spark.sql('''
                    SELECT timestamp                   AS start_time, 
                    EXTRACT(hour    FROM timestamp)    AS hour, 
                    EXTRACT(day     FROM timestamp)    AS day, 
                    EXTRACT(week    FROM timestamp)    AS week, 
                    EXTRACT(month   FROM timestamp)    AS month, 
                    EXTRACT(year    FROM timestamp)    AS year, 
                    EXTRACT(dayofweek FROM timestamp)  AS weekday
                    FROM df_ts_table
                    ''')

In [71]:
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      5|
|2018-11-15 05:53:...|   5| 15|  46|   11|2018|      5|
|2018-11-15 05:55:...|   5| 15|  46|   11|2018|      5|
|2018-11-15 06:01:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:07:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:10:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:13:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:14:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:17:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:18:...|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:21:...|   6| 15|  46|   11|2018| 

In [81]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').format('parquet').save('time.parquet')

In [82]:
time_table_parquet = spark.read.parquet('time.parquet')

In [83]:
time_table_parquet.show(5)

+--------------------+----+---+----+-------+----+-----+
|          start_time|hour|day|week|weekday|year|month|
+--------------------+----+---+----+-------+----+-----+
|2018-11-15 00:30:...|   0| 15|  46|      5|2018|   11|
|2018-11-15 00:41:...|   0| 15|  46|      5|2018|   11|
|2018-11-15 00:45:...|   0| 15|  46|      5|2018|   11|
|2018-11-15 03:44:...|   3| 15|  46|      5|2018|   11|
|2018-11-15 05:48:...|   5| 15|  46|      5|2018|   11|
+--------------------+----+---+----+-------+----+-----+
only showing top 5 rows



In [None]:
# read in song data to use for songplays table
songplays_df =

In [85]:

# extract columns from joined song and log datasets to create songplays table 
# artist|      auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status|ts|userAgent|userId
songplays_table = spark.sql('''
                        SELECT monotonically_increasing_id() AS songplay_id, 
                        se.timestamp                         AS start_time, 
                        se.userId                            AS user_id, 
                        se.level, 
                        ss.song_id, 
                        ss.artist_id, 
                        se.sessionId                         AS session_id, 
                        se.location, 
                        se.userAgent                         AS user_agent
                        FROM df_ts_table as se
                        JOIN stagingsongs_table as ss
                        ON ss.artist_name = se.artist
                        ''')

In [87]:
songplays_table.show()

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-19 15:36:...|     49| paid|SOFSOCN12A8C143F5D|ARXR32B1187FB57099|       724|San Francisco-Oak...|Mozilla/5.0 (Wind...|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [88]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.format('parquet').save('songplays.parquet')