## ETL with Spark

#### Import Libraries

In [3]:
import os
import configparser
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

#### Create spark session w/ S3 config

In [6]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [9]:
spark = SparkSession\
        .builder\
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
        .getOrCreate()

Exception: Java gateway process exited before sending its port number

In [36]:
# Update to ne s3 bucket url
output_data = 's3a://patrickudacity-songplays/'

#### Load data from S3 instead

In [37]:
song_df = spark.read.json('s3a://udacity-dend/song_data/A/A/B/TRAABJL12903CDCF1A.json')

Py4JJavaError: An error occurred while calling o552.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 4228CF52B22F2054, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: 1GeZWjlGGn02tXi3G+XyA50M0eITGpi8vkuLdxigo5qAzyYV/VCU7/g06rdiBI8mu4oB4HUXhGM=
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:557)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [4]:
song_df.count()

71

In [5]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



#### Creat songs table

In [6]:
song_df.createOrReplaceTempView("songdata")

# song_id, title, artist_id, year, duration
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songdata
""")

songs_table.limit(3).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751


#### Create artists table

In [7]:
# artist_id, name, location, lattitude, longitude
artists_table = spark.sql("""
    SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
    FROM songdata
""")

artists_table.limit(3).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,
2,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278


#### Write artists table and song table to parquet file

In [11]:
songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs_table'), 'overwrite')
artists_table.write.parquet(os.path.join(output_data, 'artists_table'), 'overwrite')

#### Load log data to dataframe

In [30]:
# Limiting to Nov 2018 data
log_df = spark.read.json('s3://udacity-dend/log_data/2018-11*.json')
log_df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [13]:
log_df.count()

8056

In [14]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
# user_id, first_name, last_name, gender, level
log_df.createOrReplaceTempView("logdata")

users_table = spark.sql("""
    SELECT DISTINCT userId, firstName, lastName, gender, level
    FROM logdata
    WHERE page = 'NextSong'
""")

In [16]:
#### Write to parquet file

In [17]:
users_table.write.parquet(os.path.join(output_data, 'users_table'), 'overwrite')

#### Create time table. Convert ts to time_stamp

In [34]:
# start_time, hour, day, week, month, year, weekday
log_df = log_df.withColumn('ts', F.to_timestamp(F.col('ts')/1000))

log_df.createOrReplaceTempView("logdata")

time_table = spark.sql("""
    SELECT DISTINCT ts as start_time, hour(ts) as hour, day(ts) as day, month(ts) as month, year(ts) as year, dayofweek(ts) as weekday
    FROM logdata
""")


In [35]:
time_table.limit(3).toPandas()

Unnamed: 0,start_time,hour,day,month,year,weekday
0,2018-11-15 14:55:46.796,14,15,11,2018,5
1,2018-11-14 18:13:04.796,18,14,11,2018,4
2,2018-11-28 09:39:01.796,9,28,11,2018,4


#### Write time table to parquet file. Partition by year / month

In [36]:
time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time_table'), 'overwrite')

#### Creat Songplay table

In [49]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplay_table = spark.sql("""
    SELECT l.ts as start_time, year(l.ts) as year, month(l.ts) as month,
           l.userId, l.level, s.song_id, s.artist_id, 
           l.sessionId, l.location, l.userAgent
    FROM logdata l
    LEFT JOIN songdata s
    ON l.artist = s.artist_name AND l.song = s.title
""")
songplay_table.limit(3).toPandas()

Unnamed: 0,start_time,year,month,userId,level,song_id,artist_id,sessionId,location,userAgent,artist,artist_name,song,title
0,2018-11-15 00:30:26.796,2018,11,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",Harmonia,,Sehr kosmisch,
1,2018-11-15 00:41:21.796,2018,11,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",The Prodigy,,The Big Gundown,
2,2018-11-15 00:45:41.796,2018,11,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",Train,,Marry Me,


In [46]:
songplay_table.count()

8056

#### Add song_play id

In [47]:
songplay_table = songplay_table.withColumn('songplay_id' , F.monotonically_increasing_id())
songplay_table.limit(2).toPandas()

Unnamed: 0,ts,year,month,userId,level,song_id,artist_id,sessionId,location,userAgent,songplay_id
0,2018-11-15 00:30:26.796,2018,11,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",0
1,2018-11-15 00:41:21.796,2018,11,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1


#### Write time table to parquet file. Partition by year / month

In [48]:
songplay_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplay_table'), 'overwrite')