### IMPORT LIBRARIES

In [1]:
import configparser
import datetime
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import monotonically_increasing_id

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1592810241253_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### CONFIG KEYS

In [2]:
# Not necessary to run if we use the notebook directly in AWS
config = configparser.ConfigParser()
config.read('dl.cfg')

config.sections()
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'AWS'
Traceback (most recent call last):
  File "/usr/lib64/python3.6/configparser.py", line 959, in __getitem__
    raise KeyError(key)
KeyError: 'AWS'



### CREATE A SPARK SESSION

In [3]:
#Create a spark session with hadoop-aws package
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

print('Creating spark session on AWS')
spark = create_spark_session()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creating spark session on AWS

### PROCESS SONG DATA

In [4]:
# data filepath
# user need to have AdministratorAccess and AmazonS3FullAccess permissions
input_data = "s3a://udacity-dend/"
song_input_data = "s3a://udacity-dend/song-data/A/A/A/*"
log_input_data = "s3a://udacity-dend/log_data/2018/11/*"
output_data = "s3a://aws-udacity-spark/"

# read song data file
print('Read song data from json file')
song_data = spark.read.json(song_input_data)
song_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Read song data from json file
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

In [5]:
# extract columns to create songs table
print('Extract columns to create song table')
songs_table = song_data['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table = songs_table.dropDuplicates(['song_id'])
    
songs_table.printSchema()
songs_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract columns to create song table
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

24

In [6]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").parquet(path = output_data + "/songs/songs.parquet", mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# extract columns to create artist table
print('Extract columns to create artist table')
artist_table = song_data['artist_id','artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artist_table = artist_table.dropDuplicates(['artist_id'])
    
artist_table.printSchema()
artist_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract columns to create artist table
root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

24

In [8]:
# write artist table to parquet files
artist_table.write.parquet(path = output_data + "/artists/artists.parquet", mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### PROCESS LOG DATA

In [9]:
# read log data file
print('Read log data from json file')
log_data = spark.read.json(log_input_data)
log_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Read log data from json file
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [10]:
# filter by actions for song plays
songplays_table = log_data.filter(log_data.page == 'NextSong')['ts', 'userId', 'level','sessionId', 'location', 'userAgent']
#songplays_table.collect()
songplays_table.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(ts=1542241826796, userId='26', level='free', sessionId=583, location='San Jose-Sunnyvale-Santa Clara, CA', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"')]

In [11]:
# extract columns to create users table
print('Extract columns to create users table')
users_table = log_data['userId', 'firstName', 'lastName', 'gender', 'level']
users_table = users_table.dropDuplicates(['userId'])
    
users_table.printSchema()
users_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract columns to create users table
root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

98

In [12]:
# write users table to parquet files
users_table.write.parquet(path = output_data + "/users/users.parquet", mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000), TimestampType())
log_data = log_data.withColumn("timestamp", get_timestamp(log_data.ts))
log_data.printSchema()
log_data.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=58

In [14]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.datetime.fromtimestamp(int(x) / 1000.0)))
log_data = log_data.withColumn("datetime", get_datetime(log_data.ts))
log_data.printSchema()
log_data.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', re

In [15]:
#log_data = log_data.drop('datetime')
#log_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# extract columns to create time table
print('Extract columns to create time table')
time_table = log_data.select(
    col('timestamp').alias('start_time'),
    hour('timestamp').alias('hour'),
    dayofmonth('timestamp').alias('day'),
    weekofyear('timestamp').alias('week'),
    month('timestamp').alias('month'),
    year('timestamp').alias('year'),
    dayofweek('timestamp').alias('weekday')
)
time_table = time_table.dropDuplicates(['start_time'])
    
time_table.printSchema()
time_table.count()
time_table.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract columns to create time table
root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

[Row(start_time=datetime.datetime(2018, 11, 30, 4, 32, 2, 796000), hour=4, day=30, week=48, month=11, year=2018, weekday=6)]

In [17]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(path = output_data + "/time/time.parquet", mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# create some temporal views
log_data.createOrReplaceTempView("events")
song_data.createOrReplaceTempView("songs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# extract columns to create songplays table
songplays_table = spark.sql(
"""
SELECT  DISTINCT(e.ts)  AS start_time, 
        e.userId        AS user_id, 
        e.level         AS level, 
        s.song_id       AS song_id, 
        s.artist_id     AS artist_id, 
        e.sessionId     AS session_id, 
        e.location      AS location, 
        e.userAgent     AS user_agent
FROM events e
JOIN songs  s   ON (e.song = s.title AND e.artist = s.artist_name AND e.length = s.duration)
AND e.page  =  'NextSong'
""")

songplays_table.printSchema()
songplays_table.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'



In [20]:
# adding songplay_id column
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(path = output_data + "/songplays/songplays.parquet", mode = "overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'Partition column `year` not found in schema struct<ts:bigint,userId:string,level:string,sessionId:bigint,location:string,userAgent:string,songplay_id:bigint>;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 843, in parquet
    self._jwrite.parquet(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Partition column `year` not found in schema struct<ts:bigint,userId:string,level:string,sessionId:bigint,location:string,userAgent:string,songplay_id:bigint>;'



In [22]:
#Unzip folder for jupyter lab try
# import zipfile
# path_to_zip_file = 'data/log-data.zip'
# directory_to_extract_to = 'data/log-data'
# zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
# zip_ref.extractall(directory_to_extract_to)
# zip_ref.close()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…