# Udacity Data Engineering - Project 4 - Data Lake

Provide a functional Data Lake with songs users are listening to the  analytics team.

## Create Spark Session and configs

In [1]:
# import libraries
import os
import configparser
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime

In [2]:
# Aws env config
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['SECRET_ACCESS_KEY']

my_aws_path = config['AWS']['S3_FOLDER']

In [3]:
# create sparksession
spark = SparkSession \
    .builder \
    .appName("Pyspark DataLake") \
    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

## Song Data ETL

Read Song Data

In [None]:
songs_df = spark.read.json("s3a://{}/song_data/*/*/*/*.json".format(my_aws_path))

Understanding Data

In [12]:
songs_df.take(5)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

In [13]:
songs_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [14]:
songs_df.describe('duration').show()

+-------+------------------+
|summary|          duration|
+-------+------------------+
|  count|                71|
|   mean|239.72967605633804|
| stddev|106.56277912134071|
|    min|          29.54404|
|    max|         599.24853|
+-------+------------------+



### Extract Data for Song Table

In [15]:
song_table_df = songs_df.select(['song_id','title','artist_id','year','duration']).dropDuplicates()
song_table_df.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOQHXMF12AB0182363|     Young Boy Blues|ARGSJW91187B9B1D6B|   0|218.77506|
|SOGDBUF12A8

Write File to S3

In [16]:
song_table_df.count()

71

In [33]:
song_table_df.write \
    .partitionBy('year', 'artist_id') \
    .mode('overwrite') \
    .parquet('s3a://{}/song_table/'.format(my_aws_path))

KeyboardInterrupt: 

### Extract Data for Artist Table

In [17]:
artist_table_df = \
    songs_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') \
    .dropDuplicates()
artist_table_df.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR3JMC51187B9AE49D|     Backstreet Boys|         Orlando, FL|       28.53823|       -81.37739|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|ARWB3G61187FB49404|         Steve Morse|      Hamilton, Ohio|           null|            null|
|AR47JEX1187B995D81|        SUE THOMPSON|          Nevada, MO|       37.83721|       -94.35868|
|ARHHO3O1187B989413|           Bob Azzam|                    |           null|            null|
|ARAGB2O1187FB3A161|Pucho & His Latin...|                    |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARGSAFR1269FB35070|          Blingtones

Write to S3

In [18]:
artist_table_df.count()

69

In [39]:
artist_table_df.write \
    .mode('overwrite') \
    .parquet('s3a://{}/artist_table/'.format(my_aws_path))

KeyboardInterrupt: 

## Process Log Data

In [34]:
logs_df = spark.read.json("s3a://{}/log-data/".format(my_aws_path))
song_plays_df = logs_df.filter(logs_df.page == "NextSong")

In [35]:
song_plays_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Users Table

In [6]:
users_table_df = song_plays_df.select(['userId', 'firstName', 'lastName', 'gender', 'level']).dropDuplicates()
users_table_df.show()

+------+---------+---------+------+-----+
|userId|firstName| lastName|gender|level|
+------+---------+---------+------+-----+
|    57|Katherine|      Gay|     F| free|
|    84|  Shakira|     Hunt|     F| free|
|    22|     Sean|   Wilson|     F| free|
|    52| Theodore|    Smith|     M| free|
|    80|    Tegan|   Levine|     F| paid|
|    15|     Lily|     Koch|     F| paid|
|    37|   Jordan|    Hicks|     F| free|
|    98|   Jordyn|   Powell|     F| free|
|    48|   Marina|   Sutton|     F| free|
|    17| Makinley|    Jones|     F| free|
|    45| Dominick|   Norris|     M| free|
|    43|   Jahiem|    Miles|     M| free|
|     3|    Isaac|   Valdez|     M| free|
|    62|   Connar|   Moreno|     M| free|
|     5|   Elijah|    Davis|     M| free|
|    44|   Aleena|    Kirby|     F| paid|
|    50|      Ava| Robinson|     F| free|
|    68|   Jordan|Rodriguez|     F| free|
|    56|   Cienna|  Freeman|     F| free|
|    63|     Ayla|  Johnson|     F| free|
+------+---------+---------+------

In [None]:
users_table_df.write \
    .mode('overwrite') \
    .parquet('s3a://{}/users_table/'.format(my_aws_path))

### Time Table

In [36]:
get_timestamp = udf(lambda x: datetime.timestamp(datetime.fromtimestamp(x/1000)))
song_plays_df = song_plays_df.withColumn('ts_', get_timestamp(song_plays_df.ts))

In [33]:
song_plays_df.select(song_plays_df.ts).show()

+----------------+
|              ts|
+----------------+
|1.542241826796E9|
|1.542242481796E9|
|1.542242741796E9|
|1.542253449796E9|
|1.542260935796E9|
|1.542261224796E9|
|1.542261356796E9|
|1.542261662796E9|
|1.542262057796E9|
|1.542262233796E9|
|1.542262434796E9|
|1.542262456796E9|
|1.542262679796E9|
|1.542262728796E9|
|1.542262893796E9|
|1.542263158796E9|
|1.542263378796E9|
|1.542265716796E9|
|1.542265929796E9|
|1.542266927796E9|
+----------------+
only showing top 20 rows



In [44]:
song_plays_df = song_plays_df.withColumn('datetime', from_unixtime(song_plays_df.ts/1000))
song_plays_df.select(song_plays_df.datetime).show()

+-------------------+
|           datetime|
+-------------------+
|2018-11-15 00:30:26|
|2018-11-15 00:41:21|
|2018-11-15 00:45:41|
|2018-11-15 03:44:09|
|2018-11-15 05:48:55|
|2018-11-15 05:53:44|
|2018-11-15 05:55:56|
|2018-11-15 06:01:02|
|2018-11-15 06:07:37|
|2018-11-15 06:10:33|
|2018-11-15 06:13:54|
|2018-11-15 06:14:16|
|2018-11-15 06:17:59|
|2018-11-15 06:18:48|
|2018-11-15 06:21:33|
|2018-11-15 06:25:58|
|2018-11-15 06:29:38|
|2018-11-15 07:08:36|
|2018-11-15 07:12:09|
|2018-11-15 07:28:47|
+-------------------+
only showing top 20 rows



In [48]:
time_table_cols = [
    song_plays_df.ts_, hour(song_plays_df.datetime).alias('hour'), dayofmonth(song_plays_df.datetime).alias('day'), 
    weekofyear(song_plays_df.datetime).alias('week'), month(song_plays_df.datetime).alias('month'), 
    year(song_plays_df.datetime).alias('year'), date_format(song_plays_df.datetime, 'u').alias('weekday')]
time_table = song_plays_df.select(time_table_cols).dropDuplicates()
time_table.show()

+----------------+----+---+----+-----+----+-------+
|             ts_|hour|day|week|month|year|weekday|
+----------------+----+---+----+-----+----+-------+
|1.542281719796E9|  11| 15|  46|   11|2018|      4|
|1.542292969796E9|  14| 15|  46|   11|2018|      4|
|1.542803140796E9|  12| 21|  47|   11|2018|      3|
|1.542807656796E9|  13| 21|  47|   11|2018|      3|
|1.542831615796E9|  20| 21|  47|   11|2018|      3|
|1.542185570796E9|   8| 14|  46|   11|2018|      3|
|1.542185663796E9|   8| 14|  46|   11|2018|      3|
|1.543429989796E9|  18| 28|  48|   11|2018|      3|
|1.543445467796E9|  22| 28|  48|   11|2018|      3|
|1.541437361796E9|  17|  5|  45|   11|2018|      1|
|1.542141280796E9|  20| 13|  46|   11|2018|      2|
|1.542149171796E9|  22| 13|  46|   11|2018|      2|
|1.542150360796E9|  23| 13|  46|   11|2018|      2|
|1.543582073796E9|  12| 30|  48|   11|2018|      5|
|1.543585687796E9|  13| 30|  48|   11|2018|      5|
|1.543601616796E9|  18| 30|  48|   11|2018|      5|
|1.542356417

In [None]:
time_table.write \
    .partitionBy('year', 'month') \
    .mode('overwrite') \
    .parquet('s3a://{}/time_table/'.format(my_aws_path))

### Songplays table

In [60]:
songplays_songs_df = song_plays_df.join(
    songs_df, (song_plays_df.song == songs_df.title) & (song_plays_df.artist == songs_df.artist_name), how='left')
songplays_table = songplays_songs_df.select(
    'ts_', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent').dropDuplicates()
songplays_table.show()

+----------------+------+-----+-------+---------+---------+--------------------+--------------------+
|             ts_|userId|level|song_id|artist_id|sessionId|            location|           userAgent|
+----------------+------+-----+-------+---------+---------+--------------------+--------------------+
|1.542285036796E9|    80| paid|   null|     null|      611|Portland-South Po...|"Mozilla/5.0 (Mac...|
|1.542291430796E9|    97| paid|   null|     null|      605|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|1.542315638796E9|    44| paid|   null|     null|      619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|
|1.542808324796E9|    37| free|   null|     null|      715|         Salinas, CA|"Mozilla/5.0 (Mac...|
|1.542165995796E9|    80| paid|   null|     null|      548|Portland-South Po...|"Mozilla/5.0 (Mac...|
|1.542172087796E9|    10| free|   null|     null|      484|Washington-Arling...|"Mozilla/5.0 (Mac...|
|1.542179665796E9|    80| paid|   null|     null|      548|Portland-South Po...|"M

In [None]:
songplays_table.write \
    .mode('overwrite') \
    .parquet('s3a://{}/songplays_table/'.format(my_aws_path))