# Prototype S3 to Spark Datalake Project for Udacity Data Engineer ND

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark

In [0]:
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

In [0]:
!tar xf spark-2.4.3-bin-hadoop2.7.tgz

In [4]:
!pip install py4j

Collecting py4j
[?25l  Downloading https://files.pythonhosted.org/packages/04/de/2d314a921ef4c20b283e1de94e0780273678caac901564df06b948e4ba9b/py4j-0.10.8.1-py2.py3-none-any.whl (196kB)
[K     |████████████████████████████████| 204kB 2.8MB/s 
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.8.1


In [0]:
import os

In [0]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7/"

In [0]:
import findspark
findspark.init()

In [0]:
import pyspark
from pyspark.sql import SparkSession

import getpass

In [0]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

In [9]:
aws_access_key = getpass.getpass()
aws_secret_access_key = getpass.getpass()

··········
··········


In [0]:
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key

In [0]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [12]:
spark

## Read Song Data Files

In [0]:
song_raw = spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")

In [14]:
song_raw.take(2)

[Row(artist_id='ARTC1LV1187B9A4858', artist_latitude=51.4536, artist_location="Goldsmith's College, Lewisham, Lo", artist_longitude=-0.01802, artist_name='The Bonzo Dog Band', duration=301.40036, num_songs=1, song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', year=1972),
 Row(artist_id='ARA23XO1187B9AF18F', artist_latitude=40.57885, artist_location='Carteret, New Jersey', artist_longitude=-74.21956, artist_name='The Smithereens', duration=192.522, num_songs=1, song_id='SOKTJDS12AF72A25E5', title='Drown In My Own Tears (24-Bit Digitally Remastered 04)', year=0)]

## Read Events Data Files

In [0]:
event_raw = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-12-events.json")

In [16]:
event_raw.take(2)

[Row(artist=None, auth='Logged In', firstName='Celeste', gender='F', itemInSession=0, lastName='Williams', length=None, level='free', location='Klamath Falls, OR', method='GET', page='Home', registration=1541077528796.0, sessionId=438, song=None, status=200, ts=1541990217796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='53'),
 Row(artist='Pavement', auth='Logged In', firstName='Sylvie', gender='F', itemInSession=0, lastName='Cruz', length=99.16036, level='free', location='Washington-Arlington-Alexandria, DC-VA-MD-WV', method='PUT', page='NextSong', registration=1540266185796.0, sessionId=345, song='Mercy:The Laundromat', status=200, ts=1541990258796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', userId='10')]

### Set tables to Spark SQL

In [0]:
event_raw.createOrReplaceTempView("stg_event_raw")

In [0]:
song_raw.createOrReplaceTempView("stg_song_raw")

# ETL Section

## Dim Tables

users - users in the app
  * user_id, first_name, last_name, gender, level

songs - songs in music database
  * song_id, title, artist_id, year, duration

artists - artists in music database
  * artist_id, name, location, lattitude, longitude

time - timestamps of records in songplays broken down into specific units
  * start_time, hour, day, week, month, year, weekday

In [0]:
# users table
users_tbl = spark.sql("""
    select distinct userId as user_id
          ,firstName as first_name
          ,lastName as last_name
          ,gender
          ,level
    from stg_event_raw
    where userId is not null
        and userId <> ''
""")

In [0]:
# songs table
songs_tbl = spark.sql("""
    select distinct song_id
          ,title
          ,artist_id
          ,year
          ,duration
    from stg_song_raw
    where song_id is not null
        and song_id <> ''
""")

In [0]:
# artists table
artists_tbl = spark.sql("""
    select distinct artist_id
          ,artist_name as name
          ,artist_location as location
          ,artist_latitude as latitude
          ,artist_longitude as longitude
    from stg_song_raw
    where artist_id is not null
        and artist_id <> ''
""")

In [23]:
# time table
spark.sql("""
    select start_time
          ,hour(start_time) as hour
          ,dayofmonth(start_time) as day
    from
    (
        select distinct to_timestamp(ts/1000) as start_time
        from stg_event_raw
        where ts is not null
    )
""").show(5)

+--------------------+----+---+
|          start_time|hour|day|
+--------------------+----+---+
|2018-11-12 15:55:...|  15| 12|
|2018-11-12 15:57:...|  15| 12|
|2018-11-12 19:45:...|  19| 12|
|2018-11-12 05:42:...|   5| 12|
|2018-11-12 20:50:...|  20| 12|
+--------------------+----+---+
only showing top 5 rows



In [0]:
time_tbl = spark.sql("""
    select start_time
          ,hour(start_time) as hour
          ,dayofmonth(start_time) as day
          ,weekofyear(start_time) as week
          ,month(start_time) as month
          ,year(start_time) as year
          ,dayofweek(start_time) as weekday
    from
    (
        select distinct to_timestamp(ts/1000) as start_time
        from stg_event_raw
        where ts is not null
    )
""")

In [34]:
time_tbl.count()

212

In [35]:
time_tbl.show(3)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-12 15:55:...|  15| 12|  46|   11|2018|      2|
|2018-11-12 15:57:...|  15| 12|  46|   11|2018|      2|
|2018-11-12 19:45:...|  19| 12|  46|   11|2018|      2|
+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



## Fact Table

songplays - records in log data associated with song plays i.e. records with page NextSong

  * songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [25]:
spark.sql("""
    select monotonically_increasing_id() as songplay_id
          ,to_timestamp(log.ts/1000) as start_time
          ,log.userId as user_id
          ,log.level 
          ,log.sessionId as session_id
          ,log.location
          ,log.userAgent as user_agent
    from stg_event_raw as log
    where page = 'NextSong'
""").limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,session_id,location,user_agent
0,0,2018-11-12 02:37:38.796,10,free,345,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,1,2018-11-12 02:37:44.796,53,free,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
2,2,2018-11-12 02:42:21.796,53,free,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
3,3,2018-11-12 02:45:52.796,53,free,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
4,4,2018-11-12 02:47:22.796,29,paid,389,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


In [0]:
songplays_tbl = spark.sql("""
    select monotonically_increasing_id() as songplay_id
          ,to_timestamp(log.ts/1000) as start_time
          ,month(to_timestamp(log.ts/1000)) as month
          ,year(to_timestamp(log.ts/1000)) as year
          ,log.userId as user_id
          ,log.level 
          ,song.song_id
          ,song.artist_id
          ,log.sessionId as session_id
          ,log.location
          ,log.userAgent as user_agent
    from stg_event_raw as log
    join stg_song_raw as song
        on log.artist = song.artist_name
        and log.song = song.title
    where page = 'NextSong'
""")

In [31]:
songplays_tbl.show(2)

+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|month|year|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+



# Write to Parquet

Requirements:  Each of the five tables are written to parquet files in a separate analytics directory on S3. Each table has its own folder within the directory. Songs table files are partitioned by year and then artist. Time table files are partitioned by year and month. Songplays table files are partitioned by year and month.

In [0]:
output_data = "s3a://xuren-data-eng-nd/spark_dl/"
songplays_tbl.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')

In [0]:
users_tbl.write.mode('overwrite').parquet(output_data+'users_table/')

In [0]:
artists_tbl.write.mode('overwrite').parquet(output_data+'artists_table/')

In [0]:
songs_tbl.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data+'songs_table/')

In [0]:
time_tbl.write.mode("overwrite").partitionBy("year", "month").parquet(output_data+'time_table/')