# Data Lake on S3

In [None]:
from pyspark.sql import SparkSession
import databricks.koalas as ks
import os
import configparser
import pandas as pd


# Make sure that your AWS credentials are loaded as env vars

In [None]:
config = configparser.ConfigParser()

config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [None]:
spark = SparkSession.builder\
                    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load data from S3

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://project4dend/"
    

In [None]:
kdf = ks.read_json("data/song_data/A/B/C/*.json")# s3a://udacity-dend/song_data/*/*/*/*.json to read the whole data

In [None]:
kdf.head()

In [66]:
kdf.dtypes

artist_id            object
artist_latitude     float64
artist_location      object
artist_longitude    float64
artist_name          object
duration            float64
num_songs             int64
song_id              object
title                object
year                  int64
dtype: object

In [67]:
kdf.to_spark().printSchema()
kdf.head()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
1,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003
4,AREVWGE1187B9B890A,-13.442,Noci (BA),-41.9952,Bitter End,282.43546,1,SOFCHDR12AB01866EF,Living Hell,0


# saving the data

In [41]:
kdf.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


In [None]:
    # extracting columns to create songs table


songs_table = (ks.sql('''
               SELECT 
               DISTINCT
               row_number() over (ORDER BY year,title,artist_id) id,
               title,
               artist_id,
               year,
               duration
               FROM 
                   {kdf}''')
              )

songs_table

In [43]:
songs_table.shape


(71, 5)

In [69]:
##writing users table to parquet files partitioned year and artist_id

(songs_table
 .to_spark()
 .write
 .partitionBy("year", "artist_id")
 .parquet('songs/')
)

AnalysisException: 'path file:/home/workspace/songs already exists.;'

In [70]:
 # extracting columns to create artist table

# artist_id, name, location, lattitude, longitude
artists_table = (ks.sql('''
               SELECT 
               DISTINCT
               artist_id,
               artist_name,
               artist_location,
               artist_latitude,
               artist_longitude
               FROM 
                   {kdf}''')
              )

artists_table

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
1,ARJIE2Y1187B994AB7,Line Renaud,,,
2,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644
3,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952
5,AR051KA1187B98B2FF,Wilks,,,
6,ARULZCI1241B9C8611,Luna Orbit Project,,,
7,AR8IEZO1187B99055E,Marc Shaiman,,,
8,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
9,ARLTWXK1187FB5A3F8,King Curtis,"Fort Worth, TX",32.74863,-97.32925


In [46]:
artists_table.shape


(69, 5)

In [71]:
##writing artist table to parquet files

(artists_table
 .to_spark()
 .write
 .parquet('artists/')
)
    

AnalysisException: 'path file:/home/workspace/artists already exists.;'

In [72]:
##reading the json for the log_data
kdfLog = ks.read_json("data/log_data/*.json")


In [73]:
## datatype by spark
kdfLog.dtypes
kdfLog.to_spark().printSchema()
kdfLog.head()


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [74]:
# extracting columns to create users table

##user_id, first_name, last_name, gender, level
users_table = (ks.sql('''
                SELECT
                DISTINCT
                userId,
                firstName,
                lastName,
                gender,
                level
                FROM
                    {kdfLog}
                ''')
              )
users_table.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free


In [50]:
##writing users table to parquet files
(users_table
 .to_spark()
 .write.parquet('users_table/')
)


AnalysisException: 'path file:/home/workspace/users_table already exists.;'

In [75]:
## creating datetime column from original timestamp column
ts = ks.to_datetime(kdfLog.ts, unit='ms')
ts.head()

0   2018-11-15 00:30:26.796
1   2018-11-15 00:41:21.796
2   2018-11-15 00:45:41.796
3   2018-11-15 01:57:51.796
4   2018-11-15 03:29:37.796
Name: ts, dtype: datetime64[ns]

In [76]:
ts.dt.hour.head()


0    0
1    0
2    0
3    1
4    3
Name: ts, dtype: int64

In [77]:
# extracting columns to create time table

time_table = (ks.sql('''
                SELECT
                ts as start_time,
                HOUR(ts) as hour,
                DAY(ts) as day,
                EXTRACT(week FROM ts) as week,
                MONTH(ts) as month,
                YEAR(ts) as year,
                WEEKDAY(ts) as weekday
                FROM
                    {ts}
                ''')
              )
time_table.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,3
1,2018-11-15 00:41:21.796,0,15,46,11,2018,3
2,2018-11-15 00:45:41.796,0,15,46,11,2018,3
3,2018-11-15 01:57:51.796,1,15,46,11,2018,3
4,2018-11-15 03:29:37.796,3,15,46,11,2018,3


In [54]:
##writing time table to parquet files

(time_table
 .to_spark()
 .write
 .partitionBy('year', 'month')
 .parquet("time_table/")
)


AnalysisException: 'path file:/home/workspace/time_table already exists.;'

In [78]:
##reading in song data to use for songplays table

song_data = 'data/song_data/A/B/C/*.json'
song_df = ks.read_json(song_data)

In [79]:
song_df.head()


Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
1,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003
4,AREVWGE1187B9B890A,-13.442,Noci (BA),-41.9952,Bitter End,282.43546,1,SOFCHDR12AB01866EF,Living Hell,0


In [80]:
# extract columns to create songplays_table table

##songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = (ks.sql('''
                SELECT
                row_number() over (ORDER BY userId) AS songplay_id,
                to_timestamp(ts / 1000) AS start_time,
                YEAR(to_timestamp(ts / 1000)) AS year,
                MONTH(to_timestamp(ts / 1000)) AS month,
                userId AS user_id,
                dfl.level,
                sdf.song_id,
                sdf.artist_id,
                sessionId AS session_id,
                location,
                userAgent AS user_agent
                FROM {kdfLog} dfl JOIN {song_df} sdf
                ON dfl.artist = sdf.artist_name
                WHERE page = 'NextSong' 
                ''')
                )
songplays_table.head()

Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-14 20:16:39.796,2018,11,101,free,SORRZGD12A6310DBC3,ARVBRGZ1187FB4675A,603,"New Orleans-Metairie, LA","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
1,2,2018-11-06 16:04:44.796,2018,11,2,free,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,126,"Plymouth, IN","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
2,3,2018-11-28 23:22:57.796,2018,11,24,paid,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
3,4,2018-11-14 13:11:26.796,2018,11,34,free,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,495,"Milwaukee-Waukesha-West Allis, WI",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...
4,5,2018-11-10 07:47:51.796,2018,11,44,paid,SOWQTQZ12A58A7B63E,ARPFHN61187FB575F6,350,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...


In [41]:
##writing songplays table to parquet files

(songplays_table
 .to_spark()
 .write
 .partitionBy('year', 'month')
 .parquet("song_play_table/")
)


(12, 5)

### Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.




### Spark Write DataFrame to Parquet file format

Using spark.write.parquet() function we can write Spark DataFrame to Parquet file.




### Spark parquet partition – Improving performance

> Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

> Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as year followed by artist_id hence, it creates a artist_id folder inside the year folder.

1. Install `awscli`

2. run `aws configure` 
    * AWS Access Key ID : 
    * AWS Secret Access Key : 
    * Default region name: `us-west-2`
    * Default output format : `json`
    
3. **copy all the necessary files to an s3 bucket**

    * Ex: `aws s3 cp <filename> s3://<bucket_name>`


4. **Run EMR create script with the etl job**

```
aws emr create-cluster --name "Spark cluster with step" \
    --release-label emr-5.30.1 \
    --applications Name=Spark \
    --log-uri s3://dendsparktutorial/logs/ \
    --ec2-attributes KeyName=emr-key \
    --instance-type m5.xlarge \
    --instance-count 3 \
    --bootstrap-actions Path=s3://dendsparktutorial/emr_bootstrap.sh \
    --steps Type=Spark,Name="Spark program",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://dendsparktutorial/src/koalas_etl.py] \
    --use-default-roles \
    --auto-terminate
```