## Import Libraries

In [1]:
import configparser
import os

import logging
import boto3
from botocore.exceptions import ClientError

import pandas as pd

from pyspark.sql import SparkSession
from pyspark import SparkContext

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

## Make sure you have an AWS secret and access key

- Create a new IAM user in your AWS account
- Give it AdministratorAccess, From Attach existing policies directly Tab
- Take note of the access key and secret
- Edit the file dwh.cfg in the same folder as this notebook and fill
<font color='red'>
<BR>
[AWS]<BR>
KEY= YOUR_AWS_KEY<BR>
SECRET= YOUR_AWS_SECRET<BR>

## Read the Configuration file

In [2]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

## Read the AWS User Access Key ID and Secret Key

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

KEY= config['AWS']['AWS_ACCESS_KEY_ID']
SECRET= config['AWS']['AWS_SECRET_ACCESS_KEY']

## Create a Local Spark Session

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

In [6]:
spark

## Read the Song data JSON file stored in S3 bucket

In [7]:
INPUT_DATA = config['S3']['SONG_DATA']

song_data = INPUT_DATA + "/A/A/*/*.json"
song_data

's3a://udacity-dend/song_data/A/A/*/*.json'

## Read the JSON file into a Spark DataFrame

In [47]:

song_f1 = spark.read.json(song_data)
song_f1.limit(2).toPandas()

TypeError: json() got an unexpected keyword argument 'inferSchema'

## Read the User log data JSON file stored in S3 bucket

In [9]:
INPUT_DATA = config['S3']['LOG_DATA']

log_data = INPUT_DATA + "/2018/*/*.json"
log_data

's3a://udacity-dend/log_data/2018/*/*.json'

### Filter only events with user selecting 'NextSong'

In [10]:
log_f1 = spark.read.json(log_data)
log_f1 = log_f1
log_f1.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


## Read the DataWareHouse Configuration details to create a Redshift Cluster

In [11]:
DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_ENDPOINT           = config.get("DWH", "DWH_ENDPOINT")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")
DWH_ROLE_ARN           = config.get("IAM_ROLE", "ARN")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)


pd.DataFrame({"Param":
["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME", "DWH_ROLE_ARN"],
              "Value":
[DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME, DWH_ROLE_ARN]
             })


Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole
9,DWH_ROLE_ARN,arn:aws:iam::501327643673:role/dwhRole


## Create clients for EC2, S3, IAM, and RedShift

In [12]:
ec2 = boto3.resource("ec2", 
                     region_name="us-east-1",
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET)

s3 = boto3.resource("s3",
                    region_name="us-east-1",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

iam = boto3.client("iam",
                    region_name="us-east-1",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

redshift = boto3.client("redshift",
                          region_name="us-east-1",
                          aws_access_key_id=KEY,
                          aws_secret_access_key=SECRET)

## Filter the columns from the songs data dataframe to create dimension table

### Schema of the DataFrame

In [13]:
song_f1.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### Create songs DataFrame for creating songs dimension table

- songs table: columns
- song_id, title, artist_id, year, duration

In [14]:
songs_table = song_f1.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOHQZIB12A6D4F9FAF|N****_ What's Up ...|ARWAFY51187FB5C4EF|2006|196.85832|
|SOCOHAX12A8C13B6B2|Walking With The ...|ARE5F2F1187B9AB7E9|1966|152.16281|
|SOKNGDE12AB017CA4D| Step Into Your Skin|ARE4SDM1187FB4D7E4|   0|139.72853|
|SOQBZDP12AB0180E28|   Depths Of Bavaria|ARWRO6T1187B98C5D6|2008| 257.4624|
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOTUBOA12AB017E8B5|             Mi Papá|AR2WBR31187B9AA24E|   0|300.61669|
|SOYQDUJ12A8C13F773|Shine On (Acousti...|ARWDPT81187B99C656|   0|173.63546|
|SOLHFZF12AB018AB3E|Pinned To The Gro...|ARHRP6Q1187FB3F016|2002|239.98649|
|SORWXUP12A58A79E65|Groovin' In The M...|ARI0PUX1187FB3F215|1991|301.81832|
|SOIRGRL12AB0186BC2|            Exchange|ARLDW6Z1187FB3F2A0|1995|368.16934|
|SOIGHOD12A8

### Create artists DataFrame for creating artists dimension table

- artists table: columns
- artist_id, name, location, lattitude, longitude

In [15]:
artists_table = song_f1.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).dropDuplicates()
artists_table.limit(10).show()

+------------------+-----------------+--------------------+---------------+----------------+
|         artist_id|      artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+-----------------+--------------------+---------------+----------------+
|AR1S3NH1187B98C2BC|        Anthony B|Clarks Town, Jamaica|           null|            null|
|ARPIKA31187FB4C233|       The Action|            New York|       40.71455|       -74.00712|
|ARYL56G11C8A41634E|    Mick Flannery|                    |           null|            null|
|AR1XL241187FB3F4AB|Nortec Collective|                    |           null|            null|
|ARMI4NV1187B99D55D|          Man Man|    Philadelphia, PA|       39.95227|       -75.16237|
|ARMMJ2R1187B993A72|         S.O.A.P.|             Denmark|       56.27609|         9.51695|
|ARGS47D1187FB40225| Peter And Gordon|     London, England|           null|            null|
|ARKKQBD1187FB3715B|          Juniper|   San Francisco, CA|       37.7

## Filter the columns from the user log data dataframe to create dimension table

### Schema of the user log dataframe

In [16]:
log_f1.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Create users DataFrame for creating users dimension table

- users - users in the app
- user_id, first_name, last_name, gender, level

In [17]:
users = log_f1.select(['userId', 'firstName', 'lastName', 'gender', 'level']).dropDuplicates()
users.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,57,Katherine,Gay,F,free
1,84,Shakira,Hunt,F,free
2,22,Sean,Wilson,F,free
3,52,Theodore,Smith,M,free
4,80,Tegan,Levine,F,paid


### Create time DataFrame for creating time dimension table

- time - timestamps of records in songplays broken down into specific units
- start_time, hour, day, week, month, year, weekday

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, TimestampType, DateType

from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql import functions as f

from datetime import datetime

In [19]:
time = log_f1.select(['ts'])
time.limit(5).toPandas()

Unnamed: 0,ts
0,1542241826796
1,1542242481796
2,1542242741796
3,1542247071796
4,1542252577796


In [20]:
def format_datetime(ts):
    return datetime.fromtimestamp(ts/1000.0)

In [21]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0),TimestampType())

In [22]:
time = time.withColumn("timestamp", get_timestamp(time.ts))

In [23]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0), DateType())

In [24]:
time = time.withColumn("datetime", get_datetime(time.ts))

In [25]:
time.limit(5).show()

+-------------+--------------------+----------+
|           ts|           timestamp|  datetime|
+-------------+--------------------+----------+
|1542241826796|2018-11-14 19:30:...|2018-11-14|
|1542242481796|2018-11-14 19:41:...|2018-11-14|
|1542242741796|2018-11-14 19:45:...|2018-11-14|
|1542247071796|2018-11-14 20:57:...|2018-11-14|
|1542252577796|2018-11-14 22:29:...|2018-11-14|
+-------------+--------------------+----------+



In [26]:
time_table = time.select(col('ts').alias('start_time'),
                        f.hour(time.timestamp).alias('hour'),
                        f.dayofweek(time.datetime).alias('day'),
                        f.weekofyear(time.datetime).alias('week'),
                        f.month(time.datetime).alias('month'), 
                        f.year(time.datetime).alias('year'),
                        date_format(time.datetime, 'E').alias('weekday')).dropDuplicates()

In [27]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1542285613796,7,5,46,11,2018,Thu
1,1542290569796,9,5,46,11,2018,Thu
2,1542292733796,9,5,46,11,2018,Thu
3,1542810192796,9,4,47,11,2018,Wed
4,1542833047796,15,4,47,11,2018,Wed


In [28]:
print(time_table.count())

8023


### Create songs DataFrame for creating songplays fact table

- songs - songs in music database
- song_id, title, artist_id, year, duration

In [29]:
songs_df = songs_table

## Create the Fact table by joining all the dimension tables created above

- songplays - records in log data associated with song plays i.e. records with page NextSong
- songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [30]:
songplays = log_f1.join(song_f1,
            (log_f1.song == song_f1.title) & (log_f1.artist == song_f1.artist_name) & (log_f1.length == song_f1.duration), how='left')

In [31]:
songplays = songplays.select(log_f1.ts.alias('start_time'), log_f1.userId.alias('user_id'), log_f1.level, song_f1.song_id,
            song_f1.artist_id, log_f1.sessionId.alias('session_id'), log_f1.location, 
            log_f1.userAgent.alias('user_agent')).filter(log_f1.page == 'NextSong')

In [37]:
# Filter songplays dataframe for song_id filed not equal to NULL
songplays.filter(songplays.song_id.isNotNull()).limit(5).show()

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|1541440182796|     73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|       255|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|
|1542148779796|     55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|       415|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|
|1542378072796|     85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|       436|       Red Bluff, CA|"Mozilla/5.0 (Mac...|
|1542735998796|     49| paid|SOCHRXB12A8AE48069|ARTDQRC1187FB4EFD4|       758|San Francisco-Oak...|Mozilla/5.0 (Wind...|
|1543063380796|     73| paid|SON

In [38]:
songplays.limit(10).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1542241826796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,1542242481796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,1542242741796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
3,1542253449796,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,1542260935796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
5,1542261224796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
6,1542261356796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
7,1542261662796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
8,1542262057796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
9,1542262233796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


In [33]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET)

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

## Create a S3 Bucket for a given region

In [34]:
def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

### Create a bucket to store the dimension tables

In [42]:
S3 = create_bucket("desh1gvrk", "us-west-2")

## Save the songs DataFrame (table) in Parquet format in S3 bucket

In [43]:
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

In [None]:
# Save in Parquet format in S3 bucket
songplays.write.mode('overwrite').partitionBy('year', 'artist_id').parquet('s3a://desh1gvrk/parquet/songs')

In [None]:
# Save in Parquet format in S3 bucket
songs_table.write.mode('overwrite').parquet('s3a://desh1gvrk/parquet/songs')

## Save the Dimension (table) DataFrame in Parquet format in local directory

In [56]:
songs_table.write.mode('overwrite').parquet('songs_table')

In [58]:
artists_table.write.mode('overwrite').parquet('artists_table')

In [59]:
users.write.mode('overwrite').parquet('users')

In [60]:
time_table.write.mode('overwrite').parquet('time_table')

## Save the Fact (table) DataFrame in Parquet format in local directory

In [62]:
songplays.write.mode('overwrite').partitionBy('artist_id', 'song_id').parquet('songplays')