# Data Lakes using AWS S3 and Apache Spark

In [32]:
import configparser
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, from_unixtime, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import IntegerType, TimestampType, DateType

## Creating local SparkSession

In [12]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

# get AWS credentials from dl.cfg file
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [None]:
# local data extract
song_data = "song_data/*/*/*/*.json"
log_data = "log_data/*.json"

# s3 bucket extraction
#song_data = "data/song_data/*/*/*/*.json"
#log_data = "s3a://udacity-dend/log-data/*/*/*.json"

In [30]:
# local data load/store
output_data = "sparkify_data"

# s3 bucket data load/store
#output_data = "s3a://udacitydends"

In [9]:
!unzip data/song-data.zip 

Archive:  data/song-data.zip
caution: filename not matched:  log_data


In [11]:
!unzip data/log-data.zip -d log_data

Archive:  data/log-data.zip
  inflating: log_data/2018-11-01-events.json  
  inflating: log_data/2018-11-02-events.json  
  inflating: log_data/2018-11-03-events.json  
  inflating: log_data/2018-11-04-events.json  
  inflating: log_data/2018-11-05-events.json  
  inflating: log_data/2018-11-06-events.json  
  inflating: log_data/2018-11-07-events.json  
  inflating: log_data/2018-11-08-events.json  
  inflating: log_data/2018-11-09-events.json  
  inflating: log_data/2018-11-10-events.json  
  inflating: log_data/2018-11-11-events.json  
  inflating: log_data/2018-11-12-events.json  
  inflating: log_data/2018-11-13-events.json  
  inflating: log_data/2018-11-14-events.json  
  inflating: log_data/2018-11-15-events.json  
  inflating: log_data/2018-11-16-events.json  
  inflating: log_data/2018-11-17-events.json  
  inflating: log_data/2018-11-18-events.json  
  inflating: log_data/2018-11-19-events.json  
  inflating: log_data/2018-11-20-events.json  
  inflating: log_data/2018-11-21

In [None]:
!rm -fr sparkify_data

In [13]:
df = spark.read.json(song_data)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [17]:
songs_df = df.select('song_id', 'title', 'artist_id', 'year', 'duration').where(df.song_id != '').dropDuplicates(['song_id'])
songs_df.head(4)

[Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934),
 Row(song_id='SOMZWCG12A8C13C480', title="I Didn't Mean To", artist_id='ARD7TVE1187B99BFB1', year=0, duration=218.93179),
 Row(song_id='SOUPIRU12A6D4FA1E1', title='Der Kleine Dompfaff', artist_id='ARJIE2Y1187B994AB7', year=0, duration=152.92036),
 Row(song_id='SOXVLOJ12AB0189215', title='Amor De Cabaret', artist_id='ARKRRTF1187B9984DA', year=0, duration=177.47546)]

In [18]:
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').where(df.artist_id != '').dropDuplicates(['artist_id'])
artists_table.head(4)

[Row(artist_id='AR9AWNF1187B9AB0B4', artist_name='Kenny G featuring Daryl Hall', artist_location='Seattle, Washington USA', artist_latitude=None, artist_longitude=None),
 Row(artist_id='AR0IAWL1187B9A96D0', artist_name='Danilo Perez', artist_location='Panama', artist_latitude=8.4177, artist_longitude=-80.11278),
 Row(artist_id='AR0RCMP1187FB3F427', artist_name='Billie Jo Spears', artist_location='Beaumont, TX', artist_latitude=30.08615, artist_longitude=-94.10158),
 Row(artist_id='AREDL271187FB40F44', artist_name='Soul Mekanik', artist_location='', artist_latitude=None, artist_longitude=None)]

In [19]:
df = spark.read.json(log_data)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [20]:
df = df[df.page == 'NextSong']

In [22]:
users_table = df.sort(df.ts.desc()).select('userId', 'firstName', 'lastName', 'gender', 'level').where(df.userId != '').dropDuplicates(['userId'])
users_table.head(4)

[Row(userId='51', firstName='Maia', lastName='Burke', gender='F', level='free'),
 Row(userId='7', firstName='Adelyn', lastName='Jordan', gender='F', level='free'),
 Row(userId='15', firstName='Lily', lastName='Koch', gender='F', level='paid'),
 Row(userId='54', firstName='Kaleb', lastName='Cook', gender='M', level='free')]

In [27]:
get_timestamp = udf(lambda x: int(x / 1000.0), IntegerType())
df = df.withColumn("timestamp", get_timestamp(df.ts))
df = df.withColumn("start_time", from_unixtime(df.timestamp))
time_table = df.dropDuplicates(['start_time']).select('start_time').withColumn('hour', hour(df.start_time)) \
                .withColumn('day', date_format(df.start_time,'d')).withColumn('week', weekofyear(df.start_time)) \
                .withColumn('month', month(df.start_time)).withColumn('year', year(df.start_time)) \
                .withColumn('weekday', date_format(df.start_time,'u'))
time_table.head(4)

[Row(start_time='2018-11-15 07:56:18', hour=7, day='15', week=46, month=11, year=2018, weekday='4'),
 Row(start_time='2018-11-15 16:51:56', hour=16, day='15', week=46, month=11, year=2018, weekday='4'),
 Row(start_time='2018-11-15 18:31:38', hour=18, day='15', week=46, month=11, year=2018, weekday='4'),
 Row(start_time='2018-11-14 00:41:15', hour=0, day='14', week=46, month=11, year=2018, weekday='3')]

In [31]:
songdf = spark.read.parquet(output_data + "/songs_table/songs_table.parquet")
song_df = df.join(songdf, df.song==songdf.title, how='left')
songplays_table = song_df.select('start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id()).withColumn('month', month(df.start_time)).withColumn('year', year(df.start_time))
songplays_table.head(4)

[Row(start_time='2018-11-15 00:30:26', userId='26', level='free', song_id=None, artist_id=None, sessionId=583, location='San Jose-Sunnyvale-Santa Clara, CA', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', songplay_id=0, month=11, year=2018),
 Row(start_time='2018-11-15 00:41:21', userId='26', level='free', song_id=None, artist_id=None, sessionId=583, location='San Jose-Sunnyvale-Santa Clara, CA', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', songplay_id=1, month=11, year=2018),
 Row(start_time='2018-11-15 00:45:41', userId='26', level='free', song_id=None, artist_id=None, sessionId=583, location='San Jose-Sunnyvale-Santa Clara, CA', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"',

## Storing the result into Parquet files

In [None]:
songs_table.write.partitionBy(['year', 'artist_id']).parquet(output_data + "/songs_table/songs_table.parquet")

In [None]:
artists_table.write.parquet(output_data + "/artists_table/artists_table.parquet")

In [None]:
users_table.write.parquet(output_data + "/users_table/users_table.parquet")

In [None]:
time_table.write.partitionBy(['year', 'month']).parquet(output_data + "/time_table/time_table.parquet")

In [None]:
songplays_table.write.partitionBy(['year','month']).parquet(output_data + "/songplays_table/songplays_table.parquet")