In [1]:
import configparser
import boto3
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl
from pyspark.sql.types import StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Timestamp

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')


In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://millionsong-project/analytics/"

#### Songs Data

In [5]:
song_data_path = input_data + 'song_data/A/A/*/*.json'

In [7]:
df = spark.read.json(song_data_path)

In [7]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
df.createOrReplaceTempView('songs')

In [11]:
songs = spark.sql('''
                    SELECT DISTINCT song_id,
                           title,
                           artist_id,
                           year,
                           duration
                    FROM songs
                    WHERE song_id IS NOT NULL AND 
                          artist_id IS NOT NULL
''')

In [12]:
songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [13]:
songs_schema = R([
    Fld('song_id', Str(), nullable=False),
    Fld('title', Str()),
    Fld('artist_id', Str(), nullable=False),
    Fld('year', Int()),
    Fld('duration', Dbl())
])

In [14]:
songs = spark.createDataFrame(songs.rdd, songs_schema)

In [15]:
songs.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBBWUL12A8C139989,I'm Falling In Love,AR3WWZM1187B996646,1997,230.66077
1,SOLPPOD12A6701D227,Me And The Farmer,ARBVX4Y1187FB42E5E,1987,176.79628
2,SOXUMIO12AB01817EB,Push Dis,AR4BILX1187B989036,0,379.34975
3,SOLOOSA12AC4688A3C,Corazon Partio Club Mix Edit,ARQATCR1187FB4D3E6,0,270.0273
4,SORIMDD12A6D4F8B32,Dream/Memory?,ARC5JAZ1187B98EA82,2003,209.65832


In [9]:
artists_schema = R([Fld('artist_id', Str(), nullable=False),
                   Fld('name', Str()),
                   Fld('location', Str()),
                   Fld('latitude', Dbl()),
                   Fld('longitude', Dbl())])

In [10]:
artists = spark.sql('''
                       SELECT DISTINCT artist_id, 
                              artist_name AS name,
                              artist_location AS location,
                              artist_latitude AS latitude, 
                              artist_longitude AS longitude
                       FROM SONGS
                       WHERE artist_id IS NOT NULL
''')

In [11]:
artists = spark.createDataFrame(artists.rdd, artists_schema)

In [12]:
artists.printSchema()

root
 |-- artist_id: string (nullable = false)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [13]:
artists.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARB57BN1187B9B5EAF,36 Crazyfists,"Anchorage, AK",61.21756,-149.85776
1,ARMI4NV1187B99D55D,Man Man,"Philadelphia, PA",39.95227,-75.16237
2,ARKCTSM11F4C83C839,Igor Stravinsky,,,
3,AR8YYNB1187B9A4BB3,Assemblage 23,,,
4,ARDDQKN1187FB50651,Rednex,,,


#### Save songs and artists as parquet

#### Songs

In [8]:
songs_dir = output_data + 'songs'

In [25]:
songs.write.partitionBy('year', 'artist_id').parquet(songs_dir)

#### Artists

In [12]:
artists_dir = output_data + 'artists'

In [15]:
artists.write.parquet(artists_dir)

### Events data

In [18]:
logs_path = input_data + 'log_data/*/*/*.json'

In [19]:
df_log = spark.read.json(logs_path)

In [20]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [47]:
df_log = df_log.filter(df_log.page=='NextSong')

In [48]:
# users
# user_id, first_name, last_name, gender, level
df_log.createOrReplaceTempView('events')

In [49]:
users = spark.sql('''
            SELECT DISTINCT CAST(userId AS INT) AS user_id,
                            firstName AS first_name,
                            lastName AS last_name,
                            gender,
                            level
            FROM events
            WHERE userId IS NOT NULL
''')

In [87]:
users_schema = R([
    Fld('user_id', Int(), nullable=False),
    Fld('first_name', Str()),
    Fld('last_name', Str()),
    Fld('gender', Str()),
    Fld('level', Str())
])

In [88]:
users = spark.createDataFrame(users.rdd, users_schema)

In [28]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [29]:
users.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,49,Chloe,Cuevas,F,free
2,29,Jacqueline,Lynch,F,free
3,15,Lily,Koch,F,paid
4,8,Kaylee,Summers,F,free


#### Write Users dataframe

In [25]:
users_path = output_data + "users"

In [30]:
users.write.parquet(users_path)

#### Time dataframe

In [31]:
# time
# start_time, hour, day, week, month, year, weekday
ts_ = spark.sql('''
                SELECT to_timestamp(ts/1000) AS start_time
                FROM events
''')

ts_.createOrReplaceTempView('times')

In [32]:
time = spark.sql('''
                SELECT start_time,
                       hour(start_time) AS hour,
                       day(start_time) AS day,
                       weekofyear(start_time) AS week,
                       month(start_time) AS month,
                       year(start_time) AS year,
                       weekday(start_time) AS weekday
                FROM times 
''')

In [33]:
time.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [34]:
time.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,3
1,2018-11-15 00:41:21.796,0,15,46,11,2018,3
2,2018-11-15 00:45:41.796,0,15,46,11,2018,3
3,2018-11-15 01:57:51.796,1,15,46,11,2018,3
4,2018-11-15 03:29:37.796,3,15,46,11,2018,3


#### Write Time dataframe

In [35]:
time_path = output_data + 'time'

In [36]:
time.write.partitionBy('year', 'month').parquet(time_path)

#### Songplays
- read artists and songs dataframes

In [10]:
songs = spark.read.parquet(songs_dir)

In [13]:
artists = spark.read.parquet(artists_dir)

In [15]:
songs.createOrReplaceTempView('songs_data')

In [16]:
artists.createOrReplaceTempView('artists')

In [22]:
# song plays
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
# also extracting year, month from start_time for partitioning
songplays = spark.sql('''
                        SELECT to_timestamp(events.ts/1000) AS start_time,
                               events.userId AS user_id,
                               events.level,
                               songs_data.song_id,
                               artists.artist_id,
                               events.sessionId AS session_id,
                               artists.location,
                               events.userAgent AS user_agent
                        FROM artists 
                        JOIN songs_data ON artists.artist_id = songs_data.artist_id
                        JOIN events ON artists.name = events.artist
''')

In [23]:
songplays.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [24]:
songplays.limit(5).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-17 13:38:10.796,73,paid,SOGXFIF12A58A78CC4,AR5LZJD1187FB4C5E5,518,NC,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,2018-11-05 11:30:51.796,95,paid,SOGXFIF12A58A78CC4,AR5LZJD1187FB4C5E5,222,NC,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."
2,2018-11-29 21:31:34.796,49,paid,SOZCRVP12A81C21F40,AR4503S1187FB43199,1041,,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...
3,2018-11-04 22:58:36.796,69,free,SOHHANU12A58A77C66,ARMQHX71187B9890D3,249,"Atlanta, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,2018-11-08 12:00:00.796,72,paid,SOHQZIB12A6D4F9FAF,ARWAFY51187FB5C4EF,117,"Atlanta, GA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...


In [37]:
songplays_path = output_data + 'songplays'

In [40]:
songplays.createOrReplaceTempView('songplays')

In [43]:
songplays = spark.sql('''
            SELECT *, year(start_time) AS year,
                    month(start_time) AS month
            FROM songplays
''')

In [44]:
songplays.write.partitionBy('year', 'month').parquet(songplays_path)