In [1]:
import boto3
import configparser
import os
import json

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import unix_timestamp, from_unixtime, year, month, dayofmonth, \
                                  dayofyear, hour, weekofyear, date_format, \
                                  monotonically_increasing_id

In [2]:
spark = SparkSession \
    .builder \
    .appName('DEND P4') \
    .getOrCreate()

## Connect to AWS

In [41]:
config = configparser.ConfigParser()
config.read('dl.cfg')

access_key = config.get('IAM', 'AWS_ACCESS_KEY_ID')
secret_key = config.get('IAM', 'AWS_SECRET_ACCESS_KEY')

In [4]:
s3 = boto3.resource('s3')

content_object = s3.Object('udacity-dend', 'song_data/A/B/C/TRABCEI128F424C983.json')
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
# print(json_content)

In [5]:
s3.Bucket('udacity-dend').download_file('song_data/A/B/C/TRABCEI128F424C983.json', 'songs_data.json')

In [6]:
content_object2 = s3.Object('udacity-dend', 'log-data/2018/11/2018-11-01-events.json')
file_content2 = content_object2.get()['Body'].read().decode('utf-8')
# print(file_content2)

In [7]:
s3.Bucket('udacity-dend').download_file('log-data/2018/11/2018-11-01-events.json', 'logs_data.json')

## Songs Data

In [8]:
df_songs = spark.read.json('songs_data.json')
df_songs.show()

+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|              title|year|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|ARJIE2Y1187B994AB7|           null|               |            null|Line Renaud|152.92036|        1|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|   0|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+



In [9]:
df_songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



#### Songs Table

In [10]:
songs_table = df_songs.select('year', 'artist_id', 'song_id', 'title', 'duration')
songs_table.show()

+----+------------------+------------------+-------------------+---------+
|year|         artist_id|           song_id|              title| duration|
+----+------------------+------------------+-------------------+---------+
|   0|ARJIE2Y1187B994AB7|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|152.92036|
+----+------------------+------------------+-------------------+---------+



In [11]:
songs_table.write.partitionBy('year', 'artist_id').parquet('songs.parquet', mode='overwrite')

In [12]:
# a = spark.read.parquet('song_parquet.parquet')
# a.show()

#### Artists Table

In [13]:
artists_table = df_songs.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')
artists_table.show()

+------------------+-----------+---------------+---------------+----------------+
|         artist_id|artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-----------+---------------+---------------+----------------+
|ARJIE2Y1187B994AB7|Line Renaud|               |           null|            null|
+------------------+-----------+---------------+---------------+----------------+



In [14]:
artists_table.write.partitionBy('artist_id').parquet('artists.parquet', mode='overwrite')

## Logs Data

In [15]:
df_logs = spark.read.json('logs_data.json')
df_logs.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|                null|Logged In|   Walter|     M|            0|    Frye|     null| free|San Francisco-Oak...|   GET|    Home|1.540919166796E12|       38|                null|   200|1541105830796|"Mozilla/5.0 (Mac...|    39|
|                null|Logged In|   Kaylee|     F|            0| Summers|     null| free|Phoenix-Mesa-Sco

In [16]:
df_logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



#### Time Table

In [17]:
df_time = df_logs.withColumn('ts', from_unixtime(df_logs.ts/1000, 'yyyy-MM-dd HH:mm:ss')) \
                  .select('ts')
df_time.show()

+-------------------+
|                 ts|
+-------------------+
|2018-11-01 13:57:10|
|2018-11-01 14:01:46|
|2018-11-01 14:01:46|
|2018-11-01 14:02:12|
|2018-11-01 14:05:52|
|2018-11-01 14:08:16|
|2018-11-01 14:11:13|
|2018-11-01 14:17:33|
|2018-11-01 14:24:53|
|2018-11-01 14:28:54|
|2018-11-01 14:42:00|
|2018-11-01 14:50:15|
|2018-11-01 14:52:05|
|2018-11-01 14:55:25|
|2018-11-01 15:23:14|
+-------------------+



In [18]:
# df_time.printSchema()

In [19]:
df_time = df_time.select(unix_timestamp(df_time.ts).alias('timestamp'), \
                       hour(df_time.ts).alias('hour'), \
                       dayofmonth(df_time.ts).alias('day'), \
                       weekofyear(df_time.ts).alias('weekofyear'), \
                       month(df_time.ts).alias('month'), \
                       year(df_time.ts).alias('year'), \
                       date_format(df_time.ts, 'EEEE').alias('weekday'))

df_time.show()

+----------+----+---+----------+-----+----+--------+
| timestamp|hour|day|weekofyear|month|year| weekday|
+----------+----+---+----------+-----+----+--------+
|1541105830|  13|  1|        44|   11|2018|Thursday|
|1541106106|  14|  1|        44|   11|2018|Thursday|
|1541106106|  14|  1|        44|   11|2018|Thursday|
|1541106132|  14|  1|        44|   11|2018|Thursday|
|1541106352|  14|  1|        44|   11|2018|Thursday|
|1541106496|  14|  1|        44|   11|2018|Thursday|
|1541106673|  14|  1|        44|   11|2018|Thursday|
|1541107053|  14|  1|        44|   11|2018|Thursday|
|1541107493|  14|  1|        44|   11|2018|Thursday|
|1541107734|  14|  1|        44|   11|2018|Thursday|
|1541108520|  14|  1|        44|   11|2018|Thursday|
|1541109015|  14|  1|        44|   11|2018|Thursday|
|1541109125|  14|  1|        44|   11|2018|Thursday|
|1541109325|  14|  1|        44|   11|2018|Thursday|
|1541110994|  15|  1|        44|   11|2018|Thursday|
+----------+----+---+----------+-----+----+---

In [20]:
df_time.write.partitionBy('year', 'month').parquet('time.parquet', mode='overwrite')

#### Users Table

In [21]:
users_table = df_logs.select('userId','firstName','lastName', 'gender', 'level') \
                     .drop_duplicates(subset=['userId'])
users_table.show()

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|   101|   Jayden|     Fox|     M| free|
|     8|   Kaylee| Summers|     F| free|
|    26|     Ryan|   Smith|     M| free|
|    10|   Sylvie|    Cruz|     F| free|
|    39|   Walter|    Frye|     M| free|
+------+---------+--------+------+-----+



In [22]:
users_table.write.partitionBy('userId').parquet('users.parquet', mode='overwrite')

#### Songplays Table

In [23]:
songplays_table = df_logs.join(df_songs, df_logs.song == df_songs.title, how='left') \
                 .withColumn('id', monotonically_increasing_id()).alias('id') \
                 .select('id', df_logs.ts, \
                         df_logs.userId, df_logs.level, df_songs.song_id, df_songs.artist_id, \
                         df_logs.sessionId, df_logs.location, df_logs.userAgent) \
                 .drop_duplicates(subset=['sessionId']) \
                 .withColumn('year', year(from_unixtime(df_logs.ts/1000, 'yyyy-MM-dd HH:mm:ss'))).alias('year') \
                 .withColumn('month', month(from_unixtime(df_logs.ts/1000, 'yyyy-MM-dd HH:mm:ss'))).alias('month')
songplays_table.show()

+---+-------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
| id|           ts|userId|level|song_id|artist_id|sessionId|            location|           userAgent|year|month|
+---+-------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
| 10|1541108520796|    10| free|   null|     null|        9|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|  1|1541106106796|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
| 14|1541110994796|   101| free|   null|     null|      100|New Orleans-Metai...|"Mozilla/5.0 (Win...|2018|   11|
|  0|1541105830796|    39| free|   null|     null|       38|San Francisco-Oak...|"Mozilla/5.0 (Mac...|2018|   11|
| 11|1541109015796|    26| free|   null|     null|      169|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
+---+-------------+------+-----+-------+---------+---------+--------------------+-------

In [24]:
songplays_table.write.partitionBy('year', 'month').parquet('songplays.parquet', mode='overwrite')

In [25]:
# a = spark.read.parquet('songplays.parquet')
# a.show()

## Upload Parquet Files to S3

In [36]:
s3_bucket = 'dend-parquet'
s3_bucket_region = 'us-west-2'
folder = 'songplays.parquet'
key_name = folder + '/'

In [38]:
s3_connect = boto3.client('s3', s3_bucket_region)
bucket = s3_connect.put_object(Bucket=s3_bucket, Key=key_name)
print("Bucket:", bucket)

Bucket: {'ResponseMetadata': {'RequestId': 'EC2A43285EED0CED', 'HostId': 'E6JitwC2iodyogW143ABDQPRQafuvbZyCdYt0eeXNz8hz+SLxpgSiihFiDtGdZG6lgNyaNnHNf4=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'E6JitwC2iodyogW143ABDQPRQafuvbZyCdYt0eeXNz8hz+SLxpgSiihFiDtGdZG6lgNyaNnHNf4=', 'x-amz-request-id': 'EC2A43285EED0CED', 'date': 'Mon, 25 Nov 2019 21:24:41 GMT', 'etag': '"d41d8cd98f00b204e9800998ecf8427e"', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 1}, 'ETag': '"d41d8cd98f00b204e9800998ecf8427e"'}
