In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
import pyspark.sql.functions as F
import datetime
import os
import configparser

In [2]:
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

Make sure that your AWS credentials are loaded as env vars

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [20]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

# Load data from S3

In [36]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://wejdan-udacity-dend/output_data/"

process_song_data

In [45]:
song_data = input_data + 'song_data/*/*/*/*.json'
log_data= input_data + 'log_data/*/*/*.json'
print(log_data)
print(song_data)

s3a://udacity-dend/log_data/*.json
s3a://udacity-dend/song_data/*/*/*/*.json


In [50]:
log_df = spark.read.json('s3a://udacity-dend/log_data/*.json')

In [None]:
song_df = spark.read.json(song_data)

In [8]:
song_df.printSchema()
song_df.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,233.22077,1,SOVIYJY12AF72A4B00,The Dead Next Door (Digitally Remastered 99),1983
1,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,287.92118,1,SOVYXYL12AF72A3373,Rebel Yell (1999 Digital Remaster),1983
2,ARQ846I1187B9A7083,,,,Yvonne S. Moriarty / Walt Fowler / Ladd McInto...,196.04853,1,SOEPTVC12A67ADD0DA,"To Zucchabar [""Gladiator"" - Music from the Mot...",0
3,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,247.53587,1,SOLQYSZ12AB0181F97,Mony Mony (Live),1987
4,AR3TZ691187FB3DBB1,,,,Russell Watson / Pino Palladino / Robbie McInt...,273.44934,1,SOVPFJK12A6701CB16,Barcelona - (Friends until the end),2000


In [7]:
song_df.createOrReplaceTempView("song_df")

## extract columns to create songs table

In [8]:
songs_table = spark.sql("""
    SELECT DISTINCT 
    row_number() over (ORDER BY year,artist_id)  id,
    song_id, 
    title, 
    artist_id, 
    year, 
    duration
    FROM song_df
""")
#other solution for the id
# song_tabel.withColumn("id",  monotonicallyIncreasingId())
songs_table.toPandas()

Unnamed: 0,id,song_id,title,artist_id,year,duration
0,1,SOLYIBD12A8C135045,Music is what we love,AR051KA1187B98B2FF,0,261.51138
1,2,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
2,3,SOLLHMX12AB01846DC,The Emperor Falls,AR1Y2PT1187FB5B9CE,0,484.62322
3,4,SOILPQQ12AB017E82A,Sohna Nee Sohna Data,AR1ZHYZ1187FB3C717,0,599.24853
4,5,SOBKWDJ12A8C13B2F3,Wild Rose (Back 2 Basics Mix),AR36F9J1187FB406F1,0,230.71302
5,6,SOZCTXZ12AB0182364,Setanta matins,AR5KOSW1187FB35FF4,0,269.58322
6,7,SOBCOSW12A8C13D398,Rumba De Barcelona,AR7SMBG1187B9B9066,0,218.38322
7,8,SOZHPGD12A8C1394FE,Baby Come To Me,AR9AWNF1187B9AB0B4,0,236.93016
8,9,SOLEYHO12AB0188A85,Got My Mojo Workin,ARAGB2O1187FB3A161,0,338.23302
9,10,SOOJPRH12A8C141995,Loaded Like A Gun,ARBGXIG122988F409D,0,173.19138


In [9]:
print((songs_table.count(), len(songs_table.columns)))

(71, 6)


### write songs table to parquet files partitioned by year and artist

In [10]:
#parquet file shoud be the S3 location
songs_table.write.partitionBy("year", "artist_id").parquet('data/output/songs/', mode = "overwrite")

## extract columns to create artists table

In [11]:
artists_table = spark.sql("""
    SELECT DISTINCT 
    artist_id, 
    artist_name as name, 
    artist_location as location, 
    artist_latitude as lattitude, 
    artist_longitude as longitude
    FROM song_df
""")
artists_table =artists_table.withColumn("id",  F.monotonically_increasing_id())

In [12]:
artists_table.toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude,id
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712,60129542144
1,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,,68719476736
2,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.41770,-80.11278,120259084288
3,ARMBR4Y1187B9990EB,David Martin,California - SF,37.77916,-122.42005,146028888064
4,ARD0S291187B9B7BF5,Rated R,Ohio,,,146028888065
5,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158,223338299392
6,ARKRRTF1187B9984DA,Sonora Santanera,,,,231928233984
7,ARHHO3O1187B989413,Bob Azzam,,,,231928233985
8,ARJIE2Y1187B994AB7,Line Renaud,,,,257698037760
9,ARGIWFO1187B9B55B7,Five Bolt Main,,,,283467841536


write artists table to parquet files

In [13]:
artists_table.write.parquet('data/output/artists/', mode = "overwrite")

process_log_data

In [29]:
# read log data file
log_df = spark.read.json(log_data)

Py4JJavaError: An error occurred while calling o356.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: ABF81DDB874DF1B7, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: FGSj9zY9EQZ6XZY4kUsherbu0OHvW3uuYtoVsaXYNcV9fpbVxvSztmXPh3igJCMSDxdqJenwv8o=
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
	at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
	at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
	at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1657)
	at org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:245)
	at org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:255)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:549)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [15]:
log_df.printSchema()
log_df.limit(5).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [16]:
#filter by actions for song plays
log_df = log_df.where("page = 'NextSong'")
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [17]:
log_df.createOrReplaceTempView("log_df")

extract columns for users table 

In [18]:
users_table = spark.sql("""
            SELECT DISTINCT
            userId    as user_id,
            firstName as first_name,
            lastName  as last_name,
            gender    as gender,
            level
            from log_df           
""")
users_table =users_table.withColumn("id",  F.monotonically_increasing_id())

In [19]:
users_table.toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level,id
0,98,Jordyn,Powell,F,free,17179869184
1,34,Evelin,Ayala,F,free,51539607552
2,85,Kinsley,Young,F,paid,68719476736
3,38,Gianna,Jones,F,free,77309411328
4,85,Kinsley,Young,F,free,85899345920
5,63,Ayla,Johnson,F,free,103079215104
6,37,Jordan,Hicks,F,free,103079215105
7,6,Cecilia,Owens,F,free,120259084288
8,15,Lily,Koch,F,paid,154618822656
9,27,Carlos,Carter,M,free,154618822657


write users table to parquet files

In [20]:
users_table.write.parquet('data/output/users', mode = "overwrite")

create timestamp column from original timestamp column

In [21]:
spark.udf.register("date_time", lambda x: str(datetime.datetime.fromtimestamp(x / 1000.0)))

<function __main__.<lambda>(x)>

In [22]:
time_table = spark.sql('''
                  SELECT DISTINCT 
                  date_time,
                  date_time AS start_time,
                  hour(date_time) AS hour,
                  dayofmonth(date_time) AS day,
                  weekofyear(date_time) AS week,
                  month(date_time) AS month,
                  year(date_time) AS year,
                  dayofweek(date_time) AS weekday
                  FROM (SELECT date_time(ts) AS date_time FROM log_df) 
                  ORDER BY year, month
          '''
          )
time_table.toPandas()

Unnamed: 0,date_time,start_time,hour,day,week,month,year,weekday
0,2018-11-15 11:04:44.796000,2018-11-15 11:04:44.796000,11,15,46,11,2018,5
1,2018-11-15 17:16:12.796000,2018-11-15 17:16:12.796000,17,15,46,11,2018,5
2,2018-11-15 19:42:40.796000,2018-11-15 19:42:40.796000,19,15,46,11,2018,5
3,2018-11-21 08:57:22.796000,2018-11-21 08:57:22.796000,8,21,47,11,2018,4
4,2018-11-14 03:46:36.796000,2018-11-14 03:46:36.796000,3,14,46,11,2018,4
5,2018-11-14 09:56:29.796000,2018-11-14 09:56:29.796000,9,14,46,11,2018,4
6,2018-11-14 11:23:20.796000,2018-11-14 11:23:20.796000,11,14,46,11,2018,4
7,2018-11-14 12:18:35.796000,2018-11-14 12:18:35.796000,12,14,46,11,2018,4
8,2018-11-14 23:10:05.796000,2018-11-14 23:10:05.796000,23,14,46,11,2018,4
9,2018-11-28 14:23:48.796000,2018-11-28 14:23:48.796000,14,28,48,11,2018,4


In [23]:
time_table.write.partitionBy("year", "month").parquet('data/output/time/', mode = "overwrite")

read in song data to use for songplays table

In [24]:
song_df = spark.read.json("/home/workspace/data/song-data/*/*/*/*.json")
song_df.createOrReplaceTempView("song_df")

In [25]:
songplays_table = spark.sql("""
        SELECT DISTINCT
        date_time(ts)        as start_time,
        year(date_time(ts))  as year,
        month(date_time(ts)) as month,
        l.userId             as user_id,
        l.level              as level,
        s.song_id            as song_id,
        s.artist_id          as artist_id,
        l.sessionId          as session_id,
        l.location           as location,
        l.userAgent          as user_agent
        FROM log_df   AS l
        JOIN song_df  AS s
        ON (l.artist = s.artist_name
        AND l.song   = s.title)  
""")

songplays_table.toPandas()

Unnamed: 0,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-21 21:56:47.796000,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [26]:
songplays_table.write.partitionBy("year", "month").parquet('data/output/songplays/', mode = "overwrite")