In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType
from zipfile import ZipFile

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

#print(os.environ['AWS_ACCESS_KEY_ID'])
#print(os.environ['AWS_SECRET_ACCESS_KEY'])

### def main()

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
input_data = "s3a://udacity-dend/"
output_data = "output"

In [4]:
input_data = "s3a://udacity-dend/"
input_data = input_data + "song_data"
print(input_data)

s3a://udacity-dend/song_data


### def process_song_data(spark, input_data, output_data)

In [5]:
# get filepath to song data file
song_data_zip = 'data/song-data.zip'
song_data = 'data/song_data'

In [27]:
with ZipFile(song_data_zip, 'r') as zip_ref:
    zip_ref.extractall('data/')

In [6]:
print(song_data+'/*/*/*/*.json')
print(input_data+'/*/*/*/*.json')

data/song_data/*/*/*/*.json
s3a://udacity-dend/song_data/*/*/*/*.json


In [7]:
# read song data file
dfsongdata = spark.read.json(song_data+'/*/*/*/*.json')
#dfsongdata = spark.read.json(input_data+'/*/*/*/*.json')
#dfsongdata = spark.read.json('s3a://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json')

In [8]:
# read song data file
#df = spark.read.text(song_data_zip)

In [9]:
dfsongdata.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
dfsongdata.limit(10).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005
5,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
6,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
7,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
8,ARI2JSK1187FB496EF,51.50632,"London, England",-0.12714,Nick Ingman;Gavyn Wright,111.62077,1,SODUJBS12A8C132150,Wessex Loses a Bride,0
9,AR0RCMP1187FB3F427,30.08615,"Beaumont, TX",-94.10158,Billie Jo Spears,133.32853,1,SOGXHEG12AB018653E,It Makes No Difference Now,1992


In [12]:
# extract columns to create songs table
dfsongdata.createOrReplaceTempView("song_data")
songs_table = spark.sql("""
                SELECT DISTINCT song_id, title, artist_id, year, duration FROM song_data
            """)
songs_table.limit(10).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGNCJP12A58A80271,Do You Finally Need A Friend,ARB29H41187B98F0EF,1972,342.56934
1,SOOJPRH12A8C141995,Loaded Like A Gun,ARBGXIG122988F409D,0,173.19138
2,SOFCHDR12AB01866EF,Living Hell,AREVWGE1187B9B890A,0,282.43546
3,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934
4,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
5,SOTUKVB12AB0181477,Blessed Assurance,AR7ZKHQ1187B98DD73,1993,270.602
6,SOMVWWT12A58A7AE05,Knocked Out Of The Park,ARQ9BO41187FB5CF1F,0,183.17016
7,SOBEBDG12A58A76D60,Kassie Jones,ARI3BMM1187FB4255E,0,220.78649
8,SOILPQQ12AB017E82A,Sohna Nee Sohna Data,AR1ZHYZ1187FB3C717,0,599.24853
9,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024


In [13]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songs'), partitionBy=['year', 'artist_id'])

In [14]:
# extract columns to create artists table
artists_table = spark.sql("""
                SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude FROM song_data
            """)
artists_table.limit(10).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,
2,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
3,ARMBR4Y1187B9990EB,David Martin,California - SF,37.77916,-122.42005
4,ARD0S291187B9B7BF5,Rated R,Ohio,,
5,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
6,ARKRRTF1187B9984DA,Sonora Santanera,,,
7,ARHHO3O1187B989413,Bob Azzam,,,
8,ARJIE2Y1187B994AB7,Line Renaud,,,
9,ARGIWFO1187B9B55B7,Five Bolt Main,,,


In [15]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(os.path.join(output_data, 'artists'))

### def process_log_data(spark, input_data, output_data)

In [9]:
# get filepath to log data file
log_data_zip = 'data/log-data.zip'
log_data = 'data/log_data'

In [16]:
with ZipFile(log_data_zip, 'r') as zip_ref:
    zip_ref.extractall('data/log_data')

In [17]:
# read song data file
dflogdata = spark.read.json(log_data+'/*.json')

In [10]:
dflogdata = spark.read.json(log_data+'/')

In [11]:
dflogdata.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
input_data = "s3a://udacity-dend/"
input_data = input_data + "log_data/"
print(input_data)

s3a://udacity-dend/log_data/


In [13]:
dflogdata = spark.read.json(input_data)

AnalysisException: 'Unable to infer schema for JSON. It must be specified manually.;'

In [19]:
# filter by actions for song plays
dflogdata.filter(dflogdata.page == 'NextSong')
dflogdata.createOrReplaceTempView("log_data")

In [20]:
# extract columns for users table    
users_table = spark.sql("""
                SELECT DISTINCT userId, firstName, lastName, gender, level FROM log_data WHERE userId IS NOT NULL
            """)
users_table.limit(10).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free
5,63,Ayla,Johnson,F,free
6,37,Jordan,Hicks,F,free
7,6,Cecilia,Owens,F,free
8,15,Lily,Koch,F,paid
9,27,Carlos,Carter,M,free


In [21]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet(os.path.join(output_data, 'users'))

In [22]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
dflogdata_timestamp = dflogdata.withColumn("start_time", get_timestamp(dflogdata.ts))

In [23]:
dflogdata_timestamp = dflogdata_timestamp.select("start_time")
dflogdata_timestamp.select("start_time").limit(10).toPandas()

Unnamed: 0,start_time
0,2018-11-15 00:30:26.796
1,2018-11-15 00:41:21.796
2,2018-11-15 00:45:41.796
3,2018-11-15 01:57:51.796
4,2018-11-15 03:29:37.796
5,2018-11-15 03:44:09.796
6,2018-11-15 03:44:20.796
7,2018-11-15 05:34:34.796
8,2018-11-15 05:37:57.796
9,2018-11-15 05:48:55.796


In [24]:
dflogdata_timestamp.createOrReplaceTempView("logdata_timestamp")
dflogdata_timestamp = spark.sql("""
                SELECT DISTINCT start_time FROM logdata_timestamp
            """)
dflogdata_timestamp.limit(10).toPandas()

Unnamed: 0,start_time
0,2018-11-15 16:12:16.796
1,2018-11-21 06:18:12.796
2,2018-11-21 18:49:23.796
3,2018-11-14 15:20:15.796
4,2018-11-05 16:31:59.796
5,2018-11-13 18:00:26.796
6,2018-11-13 22:23:24.796
7,2018-11-30 04:32:02.796
8,2018-11-30 06:12:54.796
9,2018-11-16 12:46:18.796


In [25]:
# extract columns to create time table
#dflogdata.createOrReplaceTempView("log_data")
time_table = dflogdata_timestamp.withColumn('hour',hour(dflogdata_timestamp.start_time)) \
    .withColumn('day',dayofmonth(dflogdata_timestamp.start_time)) \
    .withColumn('week',weekofyear(dflogdata_timestamp.start_time)) \
    .withColumn('month',month(dflogdata_timestamp.start_time)) \
    .withColumn('year',year(dflogdata_timestamp.start_time)) \
    .withColumn('weekday',dayofweek(dflogdata_timestamp.start_time)) 
time_table.limit(10).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 16:12:16.796,16,15,46,11,2018,5
1,2018-11-21 06:18:12.796,6,21,47,11,2018,4
2,2018-11-21 18:49:23.796,18,21,47,11,2018,4
3,2018-11-14 15:20:15.796,15,14,46,11,2018,4
4,2018-11-05 16:31:59.796,16,5,45,11,2018,2
5,2018-11-13 18:00:26.796,18,13,46,11,2018,3
6,2018-11-13 22:23:24.796,22,13,46,11,2018,3
7,2018-11-30 04:32:02.796,4,30,48,11,2018,6
8,2018-11-30 06:12:54.796,6,30,48,11,2018,6
9,2018-11-16 12:46:18.796,12,16,46,11,2018,6


In [26]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").parquet(os.path.join(output_data, 'time'), partitionBy=['year', 'month'])

In [28]:
# read in song data to use for songplays table
#song_df = spark.read.parquet(output_data + '/songs'+'/*/*/*/*.parquet')
song_df = spark.sql("""
                SELECT DISTINCT song_id, title, artist_id, year, duration FROM song_data
            """)
song_df.createOrReplaceTempView("song_df")
song_df.limit(10).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGNCJP12A58A80271,Do You Finally Need A Friend,ARB29H41187B98F0EF,1972,342.56934
1,SOOJPRH12A8C141995,Loaded Like A Gun,ARBGXIG122988F409D,0,173.19138
2,SOFCHDR12AB01866EF,Living Hell,AREVWGE1187B9B890A,0,282.43546
3,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934
4,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
5,SOTUKVB12AB0181477,Blessed Assurance,AR7ZKHQ1187B98DD73,1993,270.602
6,SOMVWWT12A58A7AE05,Knocked Out Of The Park,ARQ9BO41187FB5CF1F,0,183.17016
7,SOBEBDG12A58A76D60,Kassie Jones,ARI3BMM1187FB4255E,0,220.78649
8,SOILPQQ12AB017E82A,Sohna Nee Sohna Data,AR1ZHYZ1187FB3C717,0,599.24853
9,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024


In [6]:
#song_id, title, artist_id, year, duration
song_df_parq=spark.read.parquet("output/songs/")
song_df_parq.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [5]:
song_df_parq=spark.read.option("mergeSchema", "true").parquet("output/songs/*/*/*.parquet")
song_df_parq.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)



In [30]:
song_df_parq.limit(10).toPandas()

Unnamed: 0,song_id,title,duration
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281
1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,186.48771
2,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),267.7024
4,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),279.97995
5,SOUDSGM12AC9618304,Insatiable (Instrumental Version),266.39628
6,SOPEGZN12AB0181B3D,Get Your Head Stuck On Your Neck,45.66159
7,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,207.77751
8,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,511.16363
9,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),173.66159


In [8]:
song_df_parq.createOrReplaceTempView("song_df_parq")
song_df = spark.sql("""
                SELECT song_id, title, artist_id, year, duration FROM song_df_parq
            """)
song_df.limit(10).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771
2,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
4,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,0,279.97995
5,SOUDSGM12AC9618304,Insatiable (Instrumental Version),ARNTLGG11E2835DDB9,0,266.39628
6,SOPEGZN12AB0181B3D,Get Your Head Stuck On Your Neck,AREDL271187FB40F44,0,45.66159
7,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
8,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
9,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159


In [None]:
# extract columns from joined song and log datasets to create songplays table 
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = spark.sql("""
                SELECT TIMESTAMP WITHOUT TIME ZONE 'epoch' + (log_data.ts / 1000)  * INTERVAL '1 second' AS start_time, 
                    log_data.userId, log_data.level, song_df.song_id, song_df.artist_id, log_data.sessionId, log_data.location, log_data.userAgent 
                FROM log_data 
                JOIN song_df ON log_data.song = song_df.title
            """)
songplays_table.limit(10).toPandas()

In [30]:
#start_time_window = Window.orderBy(F.col("start_time"))
#.withColumn('songplay_id', F.row_number().over(start_time_window)) \
songplays_table = dflogdata.withColumn("start_time", get_timestamp(dflogdata.ts)).sort("start_time") \
    .join(song_df, (dflogdata.song == song_df.title), 'inner') \
    .select(monotonically_increasing_id().alias("songplay_id"),
                col("start_time"),
                col("userId").alias("user_id"),
                col("level"),
                col("song_id"),
                col("artist_id"),
                col("sessionId").alias('session_id'),
                col("location"),
                col("userAgent").alias("user_agent")
           ) \
    .withColumn("month", month(col("start_time"))) \
    .withColumn("year", year(col("start_time")))

songplays_table.sort("songplay_id").limit(10).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,month,year
0,558345748480,2018-11-14 05:06:03.796,10,free,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
1,936302870528,2018-11-19 09:14:20.796,24,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",11,2018
2,1159641169920,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
3,1451698946048,2018-11-27 22:35:59.796,80,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,992,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018


In [26]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songplays'), partitionBy=['year', 'month'])

In [34]:
count_songplays_table = songplays_table.count()
print('The data in "{}" table have {} record(s)'.format('songplays', count_songplays_table))

The data in "songplays" table have 4 record(s)


In [None]:
song_df_parq=spark.read.parquet("output/songs/*/*/*.parquet")
song_df_parq.printSchema()