In [1]:
import configparser
import os

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType

song_schema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("title",Str()),
    Fld("year",Int()),
])

In [4]:
# read one record for debugging

# song_location = 's3a://udacity-dend/song_data/*/*/*/*.json'
song_location = 's3a://udacity-dend/song_data/A/B/C/*.json'

song_df = spark.read.json(song_location, schema = song_schema)
song_df.printSchema()
song_df.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,title,year
0,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,A Whiter Shade Of Pale (Live @ Fillmore West),0
1,ARIOZCU1187FB3A3DC,,"Hamlet, NC",,JOHN COLTRANE,220.44689,1,Giant Steps (Alternate Version_ Take 5_ Altern...,0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,Streets On Fire (Explicit Album Version),0
3,AR5S9OB1187B9931E3,34.05349,"Los Angeles, CA",-118.24532,Bullet Boys,156.62975,1,All Day & All Of The Night,0
4,AR5T40Y1187B9996C6,,"Lulea, Sweden",,The Bear Quartet,249.3122,1,I Remember Nights Wide Open,1998


In [7]:
from pyspark.sql.functions import monotonically_increasing_id

songs = song_df.select(['title','artist_id','year','duration'])
songs = songs.withColumn('song_id', monotonically_increasing_id())
songs.limit(5).toPandas()

Unnamed: 0,title,artist_id,year,duration,song_id
0,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771,0
1,Giant Steps (Alternate Version_ Take 5_ Altern...,ARIOZCU1187FB3A3DC,0,220.44689,1
2,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,0,279.97995,2
3,All Day & All Of The Night,AR5S9OB1187B9931E3,0,156.62975,3
4,I Remember Nights Wide Open,AR5T40Y1187B9996C6,1998,249.3122,4


In [8]:
artists = song_df.select(['artist_id',
                           'artist_name',
                           'artist_location',
                           'artist_latitude',
                           'artist_longitude']).dropDuplicates()
artists.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
1,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
2,ARJIE2Y1187B994AB7,Line Renaud,,,
3,ARVBRGZ1187FB4675A,Gwen Stefani,,,
4,ARCKOJF1241B9C75B4,Eddie Sierra,,,


In [9]:
log_data = "s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json"

df = spark.read.json(log_data)

In [10]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540345000000.0,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540345000000.0,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [11]:
df = df.filter(df.page == 'NextSong')

In [12]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
1,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Eriatarka,200,1541106673796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Becoming Insane,200,1541107053796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [13]:
users = df.select(['userId','firstName','lastName','gender','level']).dropDuplicates()

In [14]:
users.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,101,Jayden,Fox,M,free
1,8,Kaylee,Summers,F,free
2,26,Ryan,Smith,M,free
3,10,Sylvie,Cruz,F,free


In [15]:
from pyspark.sql.functions import from_unixtime, col

df = df.withColumn('timestamp', from_unixtime(col('ts') / 1000))

In [16]:
df.select('timestamp').limit(5).toPandas()

Unnamed: 0,timestamp
0,2018-11-01 21:01:46
1,2018-11-01 21:05:52
2,2018-11-01 21:08:16
3,2018-11-01 21:11:13
4,2018-11-01 21:17:33


In [17]:
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

times =  df.select("timestamp").dropDuplicates()
times = times.withColumn("hour", hour(col("timestamp")))\
            .withColumn("day", dayofmonth(col("timestamp"))) \
            .withColumn("week", weekofyear(col("timestamp"))) \
            .withColumn("month", month(col("timestamp"))) \
            .withColumn("year", year(col("timestamp"))) \
            .withColumn("weekday", date_format(col("timestamp"), 'E'))

In [18]:
times.limit(5).toPandas()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-01 21:42:00,21,1,44,11,2018,Thu
1,2018-11-01 21:28:54,21,1,44,11,2018,Thu
2,2018-11-01 21:17:33,21,1,44,11,2018,Thu
3,2018-11-01 21:01:46,21,1,44,11,2018,Thu
4,2018-11-01 21:11:13,21,1,44,11,2018,Thu


In [19]:
# need songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [20]:
songs.printSchema()
artists.printSchema()
times.printSchema()

root
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: long (nullable = false)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

root
 |-- timestamp: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)

