In [161]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql.types import TimestampType, DateType
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession 
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format

## Import AWS Key and Secret into OS variables

In [162]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['IAM']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['IAM']['AWS_SECRET_ACCESS_KEY']
key = os.getenv("AWS_ACCESS_KEY_ID")
secret = os.getenv("AWS_SECRET_ACCESS_KEY")

# Create Spark Session

In [163]:
spark = SparkSession \
        .builder \
        .appName("Sparkify_Lake") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.fast.upload","true")\
        .getOrCreate()

In [164]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", key)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret)

# Load Songs and Logs Data

In [165]:
load_songs = "s3a://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json"
load_logs =  "s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json"

In [166]:
## Spark reads the song data and prints it's schema
song_data = spark.read.json(load_songs)
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [167]:
## Spark reads the log data and prints it's schema
log_data = spark.read.json(load_logs)
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Process Song and Artist Table

### Extract columns to create songs table 

In [168]:
# song_id, title, artist_id, year, duration
songs_table = song_data[['song_id', 'title', 'artist_id', 'year','duration']]
songs_table.show(3)

+------------------+------+------------------+----+--------+
|           song_id| title|         artist_id|year|duration|
+------------------+------+------------------+----+--------+
|SOBLFFE12AF72AA5BA|Scream|ARJNIUY12298900C91|2009|213.9424|
+------------------+------+------------------+----+--------+



### Extract columns to create artists table

In [169]:
# artist_id, name, location, latitude, longitude
artists_table = song_data[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
artists_table.show(3)

+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARJNIUY12298900C91|Adelitas Way|               |           null|            null|
+------------------+------------+---------------+---------------+----------------+



### Write songs table to parquet files partitioned by year and artist

In [170]:
songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet("songs.parquet")
songs_par = spark.read.parquet('songs.parquet')

In [174]:
songs_par.createOrReplaceTempView("songs_view")
s_view = spark.sql("SELECT * FROM songs_view").show(3)

+------------------+------+--------+----+------------------+
|           song_id| title|duration|year|         artist_id|
+------------------+------+--------+----+------------------+
|SOBLFFE12AF72AA5BA|Scream|213.9424|2009|ARJNIUY12298900C91|
+------------------+------+--------+----+------------------+



### Write artists table to parquet files

In [172]:
artists_table.write.mode("overwrite").parquet("artists.parquet")
artists_par = spark.read.parquet('artists.parquet')

In [175]:
artists_par.createOrReplaceTempView("artists_view")
a_view = spark.sql("SELECT * FROM artists_view").show(3)

+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARJNIUY12298900C91|Adelitas Way|               |           null|            null|
+------------------+------------+---------------+---------------+----------------+

