In [1]:
from pyspark.sql import SparkSession

def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

import os
import configparser

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID'] #.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY'] #.get('AWS', 'AWS_SECRET_ACCESS_KEY')

spark = create_spark_session()
input_data = "s3a://udacity-dend/"


In [2]:
# get filepath to song data file
song_data = os.path.join(input_data, "song_data/A/*/*/*.json")
print(song_data)



s3a://udacity-dend/song_data/A/*/*/*.json


In [2]:
# read song data file
song_data = os.path.join(input_data, "song_data/A/A/A/TRAAAAK128F9318786.json")
#song_data = os.path.join(input_data, "song_data/A/A/A/*.json")
df = spark.read.json(song_data)
print(df.count())
df.printSchema()

1
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [3]:
df.limit(5).show()


+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude| artist_name|duration|num_songs|           song_id| title|year|
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+----+
|ARJNIUY12298900C91|           null|               |            null|Adelitas Way|213.9424|        1|SOBLFFE12AF72AA5BA|Scream|2009|
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+----+



In [3]:
output_data = "s3a://test-rp/"

# extract columns to create songs table
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']
    
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs.parquet'), 'overwrite')

# extract columns to create artists table
artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
    
# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists.parquet'), 'overwrite')

In [4]:
pf = spark.read.parquet("songs.parquet/*/*/*.parquet")
pf2 = spark.read.parquet("artists.parquet/*.parquet")

pf.limit(5).show()
pf2.limit(5).show()

+------------------+------+--------+
|           song_id| title|duration|
+------------------+------+--------+
|SOBLFFE12AF72AA5BA|Scream|213.9424|
+------------------+------+--------+

+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARJNIUY12298900C91|Adelitas Way|               |           null|            null|
+------------------+------------+---------------+---------------+----------------+



In [22]:
import pandas as pd

# get filepath to log data file
log_data = os.path.join(input_data,"log_data/2018/11/*.json")
#log_data = os.path.join(input_data,"log_data/*/*/*.json")
    
# read log data file
df = spark.read.json(log_data)

pd.set_option('max_colwidth', 200)
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0,12


In [23]:
output_data = ""

# filter by actions for song plays  ## 'song_id', 'artist_id'
song_plays_df = df['ts', 'userId', 'level','sessionId', 'location', 'userAgent', 'song']

# extract columns for users table    
users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']
    
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

song_plays_df.limit(5).show()
users_table.limit(5).show()

+-------------+------+-----+---------+--------------------+--------------------+---------------+
|           ts|userId|level|sessionId|            location|           userAgent|           song|
+-------------+------+-----+---------+--------------------+--------------------+---------------+
|1542241826796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|  Sehr kosmisch|
|1542242481796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|The Big Gundown|
|1542242741796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|       Marry Me|
|1542247071796|     9| free|      563|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|           null|
|1542252577796|    12| free|      521|New York-Newark-J...|Mozilla/5.0 (Wind...|           null|
+-------------+------+-----+---------+--------------------+--------------------+---------------+

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|  

In [9]:
pf3 = spark.read.parquet("users.parquet/*.parquet")

pf3.limit(5).show()

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|     9|    Wyatt|   Scott|     M| free|
|    12|   Austin| Rosales|     M| free|
+------+---------+--------+------+-----+



In [24]:
from datetime import datetime
from pyspark.sql.functions import udf
import  pyspark.sql.functions as F
from pyspark.sql import types as T

get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df = df.withColumn('datetime', get_timestamp(df.ts))
    
    # extract columns to create time table
    #time_table  = df['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']
time_table = df.select([df.ts.alias('start_time'),
                            F.hour(df.datetime).alias('hour'),
                            F.dayofmonth(df.datetime).alias('day'),
                            F.weekofyear(df.datetime).alias('week'),
                            F.month(df.datetime).alias('month'),
                            F.year(df.datetime).alias('year'),
                            F.date_format(df.datetime, 'u').alias('weekday')])

time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1542241826796,0,15,46,11,2018,4
1,1542242481796,0,15,46,11,2018,4
2,1542242741796,0,15,46,11,2018,4
3,1542247071796,1,15,46,11,2018,4
4,1542252577796,3,15,46,11,2018,4


In [25]:
time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time.parquet'), 'overwrite')

# read in song data to use for songplays table
song_df = spark.read.parquet("songs.parquet")

In [26]:
song_df.limit(5).show()
song_plays_df.limit(5).show()

+------------------+------+--------+----+------------------+
|           song_id| title|duration|year|         artist_id|
+------------------+------+--------+----+------------------+
|SOBLFFE12AF72AA5BA|Scream|213.9424|2009|ARJNIUY12298900C91|
+------------------+------+--------+----+------------------+

+-------------+------+-----+---------+--------------------+--------------------+---------------+
|           ts|userId|level|sessionId|            location|           userAgent|           song|
+-------------+------+-----+---------+--------------------+--------------------+---------------+
|1542241826796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|  Sehr kosmisch|
|1542242481796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|The Big Gundown|
|1542242741796|    26| free|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|       Marry Me|
|1542247071796|     9| free|      563|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|           null|
|1542252577796|

In [27]:
# extract columns from joined song and log datasets to create songplays table 
df = song_plays_df.join(song_df, song_df.title == song_plays_df.song)

df.head()

Row(ts=1542298745796, userId='97', level='paid', sessionId=605, location='Lansing-East Lansing, MI', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', song='Scream', song_id='SOBLFFE12AF72AA5BA', title='Scream', duration=213.9424, year=2009, artist_id='ARJNIUY12298900C91')

In [41]:
from pyspark.sql.functions import monotonically_increasing_id
songplays_table = df.select([df.ts.alias('start_time'), 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent'])
songplays_table.select(monotonically_increasing_id().alias('songplay_id')).collect()
songplays_table.show()
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')

+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|   start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|1542298745796|    97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|      605|Lansing-East Lans...|"Mozilla/5.0 (X11...|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+



In [42]:
pf5 = spark.read.parquet("songplays.parquet/*.parquet")

pf5.limit(5).show()

+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|   start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|1542298745796|    97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|      605|Lansing-East Lans...|"Mozilla/5.0 (X11...|
+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+

