# Data Lake on S3

In [2]:
from pyspark.sql import SparkSession
import databricks.koalas as ks

import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [3]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [4]:
spark = SparkSession.builder\
                    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load data from S3

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://project4dend/"
    

In [5]:
kdf = ks.read_json("data/song_data/A/B/C/*.json")# s3a://udacity-dend/song_data/*/*/*/*.json to read the whole data

In [6]:
kdf.dtypes

artist_id            object
artist_latitude     float64
artist_location      object
artist_longitude    float64
artist_name          object
duration            float64
num_songs             int64
song_id              object
title                object
year                  int64
dtype: object

In [8]:
kdf.to_spark().printSchema()
kdf.head()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
1,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003
4,AREVWGE1187B9B890A,-13.442,Noci (BA),-41.9952,Bitter End,282.43546,1,SOFCHDR12AB01866EF,Living Hell,0


# saving the data

In [13]:


songs_table = (ks.sql('''
               SELECT 
               DISTINCT
               row_number() over (ORDER BY year,title,artist_id) id,
               title,
               artist_id,
               year,
               duration
               FROM 
                   {kdf}''')
              )

songs_table.to_spark().withColumn("id", monotonicallyIncreasingId())
songs_table

Unnamed: 0,id,title,artist_id,year,duration
0,1,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771
1,2,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
2,3,Living Hell,AREVWGE1187B9B890A,0,282.43546
3,4,Midnight Star,ARULZCI1241B9C8611,0,335.51628
4,5,Music is what we love,AR051KA1187B98B2FF,0,261.51138
5,6,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,0,279.97995
6,7,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.162
7,8,Prognosis,ARWB3G61187FB49404,2000,363.85914
8,9,Intro,AR558FS1187FB45658,2003,75.67628
9,10,Native Soul,AR0IAWL1187B9A96D0,2003,197.19791


In [14]:
songs_table.shape

(12, 5)

### Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.




### Spark Write DataFrame to Parquet file format

Using spark.write.parquet() function we can write Spark DataFrame to Parquet file.




### Spark parquet partition – Improving performance

> Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

> Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as year followed by artist_id hence, it creates a artist_id folder inside the year folder.

In [15]:
    
(songs_table
 .to_spark()
 .write
 .partitionBy("year", "artist_id")
 .parquet('songs/')
)

## Deployment

1. Install `awscli`

2. run `aws configure` 
    * AWS Access Key ID : 
    * AWS Secret Access Key : 
    * Default region name: `us-west-2`
    * Default output format : `json`
    
3. **copy all the necessary files to an s3 bucket**

    * Ex: `aws s3 cp <filename> s3://<bucket_name>`


4. **Run EMR create script with the etl job**

```
aws emr create-cluster --name "Spark cluster with step" \
    --release-label emr-5.30.1 \
    --applications Name=Spark \
    --log-uri s3://dendsparktutorial/logs/ \
    --ec2-attributes KeyName=emr-key \
    --instance-type m5.xlarge \
    --instance-count 3 \
    --bootstrap-actions Path=s3://dendsparktutorial/emr_bootstrap.sh \
    --steps Type=Spark,Name="Spark program",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://dendsparktutorial/src/koalas_etl.py] \
    --use-default-roles \
    --auto-terminate
```