In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.sql.types import *

import boto3
import configparser
import os

In [14]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

s3_target_bucket = "s3://jjudacitydatalake"

# Create spark session

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

spark

# Set up configuration

In [4]:
# location of log data files (later to be replaced with s3 folder path)
log_data_path = './data/log_data'

# location of song data files (later to be replaced with s3 folder path, small subset)
song_data_path = './data/song_data/*/*/*/*.json'

# Reading data

In [5]:
log_data = spark.read.json(log_data_path)

log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


# Examine schema
## I prefer to specify the schema explicitly as to avoid any errors when loading

In [6]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
# copy data types from automatically inferred schema but modify some fields
log_data_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", LongType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()),
    StructField("sessionId", LongType()),
    StructField("song", StringType()),
    StructField("status", LongType()),
    StructField("ts", LongType()),
    StructField("userAgent", StringType()),
    StructField("userId", StringType()),
])

# and reload data
log_data = spark.read\
    .schema(log_data_schema)\
    .option('mode', 'DROPMALFORMED')\
    .json(log_data_path)\
    .withColumn("ts", F.to_timestamp(F.col("ts") / 1000.)) # convert milliseconds to seconds and then convert to timestamp
    
print(log_data.limit(5).toPandas())

print(log_data.printSchema())

# there's some entries that have userId = "" causing the type to be string
# log_data.select("userId").dropDuplicates().sort("userId").show(5)

        artist       auth firstName gender  itemInSession lastName     length  \
0     Harmonia  Logged In      Ryan      M              0    Smith  655.77751   
1  The Prodigy  Logged In      Ryan      M              1    Smith  260.07465   
2        Train  Logged In      Ryan      M              2    Smith  205.45261   
3         None  Logged In     Wyatt      M              0    Scott        NaN   
4         None  Logged In    Austin      M              0  Rosales        NaN   

  level                               location method      page  registration  \
0  free     San Jose-Sunnyvale-Santa Clara, CA    PUT  NextSong  1.541017e+12   
1  free     San Jose-Sunnyvale-Santa Clara, CA    PUT  NextSong  1.541017e+12   
2  free     San Jose-Sunnyvale-Santa Clara, CA    PUT  NextSong  1.541017e+12   
3  free              Eureka-Arcata-Fortuna, CA    GET      Home  1.540872e+12   
4  free  New York-Newark-Jersey City, NY-NJ-PA    GET      Home  1.541060e+12   

   sessionId             s

In [8]:
song_data = spark.read.json(song_data_path)
print(song_data.limit(5).toPandas())

song_data.printSchema()



            artist_id  artist_latitude    artist_location  artist_longitude  \
0  ARDR4AC1187FB371A1              NaN                                  NaN   
1  AREBBGV1187FB523D2              NaN        Houston, TX               NaN   
2  ARMAC4T1187FB3FA4C         40.82624  Morris Plains, NJ         -74.47995   
3  ARPBNLO1187FB3D52F         40.71455       New York, NY         -74.00712   
4  ARDNS031187B9924F0         32.67828            Georgia         -83.22295   

                                         artist_name   duration  num_songs  \
0  Montserrat Caballé;Placido Domingo;Vicente Sar...  511.16363          1   
1       Mike Jones (Featuring CJ_ Mello & Lil' Bran)  173.66159          1   
2                          The Dillinger Escape Plan  207.77751          1   
3                                           Tiny Tim   43.36281          1   
4                                         Tim Wilson  186.48771          1   

              song_id                                   

In [9]:
song_data_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", LongType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("year", LongType()),
])

# re-read with explicitly specified schema
song_data = spark.read \
    .schema(song_data_schema) \
    .option('mode', 'DROPMALFORMED') \
    .json(song_data_path)

song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


# Writing queries

We need to create the following:

Fact Table

    songplays - records in log data associated with song plays i.e. records with page NextSong
        songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

Dimension Tables

    users - users in the app
        user_id, first_name, last_name, gender, level
    songs - songs in music database
        song_id, title, artist_id, year, duration
    artists - artists in music database
        artist_id, name, location, lattitude, longitude
    time - timestamps of records in songplays broken down into specific units
        start_time, hour, day, week, month, year, weekday

In [10]:
time = log_data.select(F.col("ts").alias("start_time"))\
        .distinct()\
        .withColumn("hour", F.hour("start_time"))\
        .withColumn("day", F.dayofweek("start_time"))\
        .withColumn("week", F.weekofyear("start_time"))\
        .withColumn("month", F.month("start_time"))\
        .withColumn("year", F.year("start_time"))\
        .withColumn("weekday", F.udf(lambda x: 0 if x in [7, 1] else 1, LongType())(F.col("day")))

time.show(5)

time.write.mode("overwrite").save('./data/time_table.parquet')

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 16:12:...|  16|  5|  46|   11|2018|      1|
|2018-11-21 06:18:...|   6|  4|  47|   11|2018|      1|
|2018-11-21 18:49:...|  18|  4|  47|   11|2018|      1|
|2018-11-14 15:20:...|  15|  4|  46|   11|2018|      1|
|2018-11-05 16:31:...|  16|  2|  45|   11|2018|      1|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [11]:
artists = song_data.select('artist_id',\
                           F.col('artist_name').alias("name"),\
                           F.col('artist_location').alias("location"),\
                           F.col('artist_latitude').alias('latitude'),\
                           F.col('artist_longitude').alias('longitude'))\
                    .drop_duplicates(subset = ['artist_id'])\

artists.show(5)

artists.write.mode("overwrite").save('./data/artists_table.parquet')

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |    null|     null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington| 38.8991|  -77.029|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



In [12]:
songs = song_data.select('song_id',\
                        'title',\
                        'artist_id',\
                         'year',\
                         'duration')\
                .drop_duplicates(subset = ['song_id'])

songs.show(5)
                  
songs.write.mode("overwrite").save('./data/songs_table.parquet')

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [13]:
users = log_data.select(F.col("userId").alias("user_id"),\
                       F.col("firstName").alias("first_name"),
                       F.col("lastName").alias("last_name"),
                       F.col("gender").alias("gender"),
                       F.col("level").alias("level"))\
                    .drop_duplicates(subset = ['user_id'])

users.show(5)

users.write.mode("overwrite").save(f'{s3_target_bucket}/users_table.parquet')

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     51|      Maia|    Burke|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     54|     Kaleb|     Cook|     M| free|
|    101|    Jayden|      Fox|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



NameError: name 's3_target_bucket' is not defined

In [15]:
# and lastly, the last table:
# songplays - records in log data associated with song plays i.e. records with page NextSong
#        songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

songplays = log_data.join(song_data, \
              [log_data.artist == song_data.artist_name, log_data.song == song_data.title])\
            .filter(F.col('page') == 'NextSong')\
            .select(F.col("ts").alias("start_time"),\
                    F.col("userId").alias("user_id"),\
                    "level",\
                    "song_id",\
                    "artist_id",\
                    F.col("sessionId").alias("session_id"),\
                    "location",\
                    F.col("userAgent").alias("user_agent"),\
                    F.year("ts").alias("year"),\
                    F.month("ts").alias("month"))

print(songplays.limit(5).toPandas())
songplays.write.mode("overwrite").save('./data/songplays_table.parquet')

songplays.write.mode("overwrite").save('./data/songplays_table.parquet')


               start_time user_id level             song_id  \
0 2018-11-21 21:56:47.796      15  paid  SOZCTXZ12AB0182364   

            artist_id  session_id                            location  \
0  AR5KOSW1187FB35FF4         818  Chicago-Naperville-Elgin, IL-IN-WI   

                                          user_agent  year  month  
0  "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...  2018     11  


# Reading from s3

Now that we can read and write locally, set up code to read from AWS S3 remotely

In [None]:
s3_song_data_path = "s3a://udacity-dend/song_data/A/A/A/*.json"
s3_log_data_path = "s3a://udacity-dend/log_data/*/*/*.json"

s3_target_bucket = "s3a://jjudacitydatalake"
s3 = boto3.resource('s3')

In [None]:
full_log_data = spark.read.schema(log_data_schema).option('mode', 'DROPMALFORMED').json(f"{s3_log_data_path}")
full_log_data.show(5)

In [None]:
full_song_data = spark.read.schema(song_data_schema).option('mode', 'DROPMALFORMED').json(f"{s3_song_data_path}")
full_song_data.show(5)