# Import libraries

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import databricks.koalas as ks



In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


# create Spark session with no hadoop-AWS package

In [3]:

spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

# Load data From S3

In [4]:
song_data = 'song_data/A/B/C/*.json'
log_data ='data/*.json'

In [5]:
kdf = ks.read_json('data/song_data/A/B/C/*.json')

In [6]:
kdf.dtypes


artist_id            object
artist_latitude     float64
artist_location      object
artist_longitude    float64
artist_name          object
duration            float64
num_songs             int64
song_id              object
title                object
year                  int64
dtype: object

In [7]:
kdf.to_spark().printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
kdf.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
1,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003
4,AREVWGE1187B9B890A,-13.442,Noci (BA),-41.9952,Bitter End,282.43546,1,SOFCHDR12AB01866EF,Living Hell,0


In [8]:
#song_id, title, artist_id, year, duration
songs_table = (ks.sql("""select
                        DISTINCt
                        row_number() over (ORDER BY year,title,artist_id) id,
                        song_id,
                        title,
                        artist_id,
                        year,
                        duration
                        FROM {kdf}
"""))
songs_table.head()
## Or you can use reset_index to create Id insted of row_number
## EX: songs_table.reset_index().head()

Unnamed: 0,id,song_id,title,artist_id,year,duration
0,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771
1,2,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
2,3,SOFCHDR12AB01866EF,Living Hell,AREVWGE1187B9B890A,0,282.43546
3,4,SOSWKAV12AB018FC91,Midnight Star,ARULZCI1241B9C8611,0,335.51628
4,5,SOLYIBD12A8C135045,Music is what we love,AR051KA1187B98B2FF,0,261.51138


In [39]:
# write songs table to parquet files partitioned by year and artist
(songs_table
    .to_spark()
    .write
    .partitionBy("year","artist_id")
    .parquet('songs/'))

In [10]:

# extract columns to create artists table
#artist_id, name, location, lattitude, longitude
artists_table = (ks.sql(""" SELECT
                            DISTINCT
                            row_number() over (ORDER BY artist_name) id,
                            artist_id,
                            artist_name,
                            artist_location,
                            artist_latitude,
                            artist_longitude
                            FROM {kdf}
                            
                            
"""))

artists_table.head()

Unnamed: 0,id,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,1,AR558FS1187FB45658,40 Grit,,,
1,2,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952
2,3,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
3,4,ARVBRGZ1187FB4675A,Gwen Stefani,,,
4,5,ARLTWXK1187FB5A3F8,King Curtis,"Fort Worth, TX",32.74863,-97.32925


In [14]:
# write artists table to parquet files
(artists_table
     .to_spark()
     .write
     .parquet('artists/'))

In [11]:
log_data ='data/*.json'
log_kdf = ks.read_json(log_data)
log_kdf.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [5]:
log_kdf.dtypes

artist            object
auth              object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
method            object
page              object
registration     float64
sessionId          int64
song              object
status             int64
ts                 int64
userAgent         object
userId            object
dtype: object

In [12]:
    
# filter by actions for song plays
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
df = (ks.sql(""" SELECT * 
                 FROM {log_kdf}
                 WHERE page='NextSong' """))
df.head()


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [13]:

# extract columns for users table
# user_id, first_name, last_name, gender, level
artists_table = ks.sql(""" SELECT 
                           DISTINCT
                           userId,
                           firstName,
                           lastName,
                           gender,
                           level 
                           FROM {df}""")
    
artists_table.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free


In [9]:
# write users table to parquet files
(artists_table
     .to_spark()
     .write
     .parquet('users/'))



In [12]:
df.dtypes

artist            object
auth              object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
method            object
page              object
registration     float64
sessionId          int64
song              object
status             int64
ts                 int64
userAgent         object
userId            object
dtype: object

In [58]:
# create timestamp column from original timestamp column
# get_timestamp = udf()
# df['timestamp']=ks.to_datetime(df['ts'][0],unit='ns')
# ks.to_datetime(1490195805433502912, unit='ns')
# from datetime import datetime

# get_timestamp = ks.to_datetime(df['ts'])
# timestamp = get_timestamp.apply(lambda x:ks.to_date(x,format='%y%m%d'))
# print(timestamp)

df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.241826
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.242481
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.242741
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,1970-01-01 00:25:42.253449
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,1970-01-01 00:25:42.260935


In [27]:
df.dtypes

artist                   object
auth                     object
firstName                object
gender                   object
itemInSession             int64
lastName                 object
length                  float64
level                    object
location                 object
method                   object
page                     object
registration            float64
sessionId                 int64
song                     object
status                    int64
ts                        int64
userAgent                object
userId                   object
timestamp        datetime64[ns]
dtype: object

In [14]:
   
#     # create datetime column from original timestamp column
#     get_datetime = udf()
df['datetime']=ks.to_datetime(df['ts']) 
    
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.241826
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.242481
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1970-01-01 00:25:42.242741
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,1970-01-01 00:25:42.253449
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,1970-01-01 00:25:42.260935


In [29]:
# extract columns to create time table
#start_time, hour, day, week, month, year, weekday

time_table =(ks.sql("""
            SELECT
            DISTINCT
           datetime as start_time,
           extract(day from datetime) as day,
           extract(week from datetime) as week,
           extract(month from datetime) as month,
           extract(year from datetime) as year,
           extract(hour from datetime) as hour
           from {df}
                        """
    ))
time_table.head()
    


Unnamed: 0,start_time,day,week,month,year,hour
0,1970-01-01 00:25:42.295698,1,1,1,1970,0
1,1970-01-01 00:25:42.766329,1,1,1,1970,0
2,1970-01-01 00:25:42.798065,1,1,1,1970,0
3,1970-01-01 00:25:42.169615,1,1,1,1970,0
4,1970-01-01 00:25:42.172734,1,1,1,1970,0


In [31]:
ks.set_option('compute.ops_on_diff_frames', True)

In [32]:
#            extract(weekday from datetime) as weekday
time_table['weekday']=df.datetime.dt.weekday
time_table.head()

Unnamed: 0,start_time,day,week,month,year,hour,weekday
0,1970-01-01 00:25:42.295698,1,1,1,1970,0,3
1,1970-01-01 00:25:42.766329,1,1,1,1970,0,3
2,1970-01-01 00:25:42.798065,1,1,1,1970,0,3
3,1970-01-01 00:25:42.169615,1,1,1,1970,0,3
4,1970-01-01 00:25:42.172734,1,1,1,1970,0,3


In [75]:
# write time table to parquet files partitioned by year and month
(time_table
    .to_spark()
    .write
    .partitionBy('year','month')
    .parquet('time/'))



In [35]:
# read in song data to use for songplays table
song_df = ks.read_json('data/song_data/*/*/*/*.json')
song_df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


In [36]:
log_kdf["ts"]= ks.to_datetime(log_kdf['ts'])
log_kdf.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1970-01-01 00:25:42.241826,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1970-01-01 00:25:42.242481,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1970-01-01 00:25:42.242741,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1970-01-01 00:25:42.247071,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1970-01-01 00:25:42.252577,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [37]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = (ks.sql(""" SELECT 
                             DISTINCT
                             row_number() over (ORDER BY e.userId) songplay_id,
                             e.ts AS start_time,
                             extract(month from e.ts) as month,
                             extract(year from e.ts) as year,
                             e.userId AS user_id,
                             e.level AS level,
                             s.song_id AS song_id,
                             s.artist_id AS artist_id,
                             e.sessionId as session_id,
                             e.location AS location,
                             e.userAgent AS user_agent
                             FROM {log_kdf} as e join {song_df} as s ON
                             (e.artist = s.artist_name AND 
                             e.song = s.title AND 
                             e.length= s.duration)
                             WHERE e.page='NextSong'

             """))
songplays_table.head()

Unnamed: 0,songplay_id,start_time,month,year,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,1970-01-01 00:25:42.837407,1,1970,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [38]:

# write songplays table to parquet files partitioned by year and month
(songplays_table
    .to_spark()
    .write
    .partitionBy("year","month") 
    .parquet('songplayes/')
    )
