### Lessons Learnt:
- Use hadoop aws jar file in config parameter while creating SparkSession
- Have AWS credentials with S3 access in config file
- Do not use quotes in AWS Credentials
- Use s3a to access S3 buckets. (s3 and s3n will need different drivers/jars)
- if any error, Restart the kernel  before executing again

In [1]:
import configparser
import os
from pyspark.sql.functions import col as Fcol, from_unixtime 
from pyspark.sql.functions import year as Fyear
from pyspark.sql.functions import month as Fmonth
import pyspark.sql.functions as F

### Setup AWS credentials

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

### Initiate Spark Session

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

### Location for Input and Output data

In [4]:
input_song_data = "s3a://udacity-dend/song_data/A/A/A/*.json"
input_log_data = "s3a://udacity-dend/log_data/2018/11/*.json"
output_data = "./spark-warehouse"

### Read Input data into dataframe

In [5]:
song_data_json_df = spark.read.json(input_song_data)

In [21]:
#song_data_json_df.printSchema()

In [22]:
song_data_json_df.count()

24

### Create a Temp view on song data

In [6]:
song_data_json_df.createOrReplaceTempView("song_data_view")

## Songs Dimension Table ##

#### Extract columns for loading songs table

In [7]:
songs_table = spark.sql("select distinct song_id, title, artist_id, year, duration from song_data_view")

### Write into songs table in parquet mode

In [8]:
songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs'))

In [9]:
song_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'songs'))

In [10]:
song_data_from_parquet.show(5)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOKTJDS12AF72A25E5|Drown In My Own T...|  192.522|   0|ARA23XO1187B9AF18F|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|129.85424|2001|ARSVTNL1187B992A91|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|301.40036|1972|ARTC1LV1187B9A4858|
|SORRNOC12AB017F52B|The Last Beat Of ...|337.81506|2004|ARSZ7L31187FB4E610|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|118.07302|2005|AR73AIO1187B9AD57B|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



## Artists Dimension Table

### Extract columns for loading artists table

In [11]:
# extract columns to create artists table
artists_table = spark.sql('''
    select distinct artist_id
        , artist_name as name
        , artist_location as location
        , artist_latitude as lattitude
        , artist_longitude as longitude 
    from song_data_view
    ''') 

### Write into artists table in parquet mode

In [12]:
# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists'), 'overwrite')

### Data Loading validation - Read from artists parquet file

In [14]:
artist_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'artists'))
artist_data_from_parquet.show(5)

+------------------+--------------------+--------------------+---------+---------+
|         artist_id|                name|            location|lattitude|longitude|
+------------------+--------------------+--------------------+---------+---------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|  51.4536| -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey| 40.57885|-74.21956|
|AR5LMPY1187FB573FE|   Chaka Khan_ Rufus|         Chicago, IL| 41.88415|-87.63241|
|ARSVTNL1187B992A91|       Jonathan King|     London, England| 51.50632| -0.12714|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|     null|     null|
+------------------+--------------------+--------------------+---------+---------+
only showing top 5 rows



In [15]:
log_data_json_df = spark.read.json(input_log_data)

In [16]:
log_data_json_df.count()

8056

In [17]:
log_data_df = log_data_json_df
log_data_df = log_data_df.filter(col("page")=="NextSong")
log_data_df = log_data_df.withColumn('start_time', from_unixtime(log_data_df.ts / 1000).cast('timestamp').alias('start_time'))

In [18]:
log_data_df.select("ts", "start_time").show(5)

+-------------+-------------------+
|           ts|         start_time|
+-------------+-------------------+
|1542241826796|2018-11-15 00:30:26|
|1542242481796|2018-11-15 00:41:21|
|1542242741796|2018-11-15 00:45:41|
|1542253449796|2018-11-15 03:44:09|
|1542260935796|2018-11-15 05:48:55|
+-------------+-------------------+
only showing top 5 rows



In [19]:
log_data_df.createOrReplaceTempView("log_data_view")

In [20]:
time_table = spark.sql("\
select distinct start_time \
, hour(start_time) as hour \
, day(start_time) as day \
, weekofyear(start_time) as week \
, month(start_time) as month \
, year(start_time) as year \
, weekday(start_time) as weekday \
from log_data_view")

In [24]:
time_table.write.partitionBy("year","month").parquet(os.path.join(output_data, 'time'),'overwrite')

In [30]:
time_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'time'))
time_data_from_parquet.show(5)

+-------------------+----+---+----+-------+----+-----+
|         start_time|hour|day|week|weekday|year|month|
+-------------------+----+---+----+-------+----+-----+
|2018-11-15 10:34:50|  10| 15|  46|      3|2018|   11|
|2018-11-15 17:56:18|  17| 15|  46|      3|2018|   11|
|2018-11-21 01:57:34|   1| 21|  47|      2|2018|   11|
|2018-11-21 05:40:26|   5| 21|  47|      2|2018|   11|
|2018-11-21 07:56:51|   7| 21|  47|      2|2018|   11|
+-------------------+----+---+----+-------+----+-----+
only showing top 5 rows



In [32]:
user_table = spark.sql(" \
select  \
      cast(userId as integer) as userId \
    , firstName \
    , lastName \
    , gender \
    , max(level) as level \
from log_data_view \
where nvl(trim(userid), '') <> '' \
group by userId, firstName, lastName, gender \
")

In [33]:
user_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))

In [35]:
users_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'users'))
users_data_from_parquet.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    14| Theodore|  Harris|     M| free|
|     5|   Elijah|   Davis|     M| free|
|    67|     Colm| Santana|     M| free|
|    77|Magdalene|  Herman|     F| free|
|     2|  Jizelle|Benjamin|     F| free|
+------+---------+--------+------+-----+
only showing top 5 rows



### Read the songs and artists dimension tables from parquet files

In [37]:
songs = spark.read.parquet(os.path.join(output_data, 'songs'))
songs.createOrReplaceTempView("songs_view")
artists = spark.read.parquet(os.path.join(output_data, 'artists'))
artists.createOrReplaceTempView("artists_view")

In [52]:
song_data_json_df = spark.read.json(input_song_data)

In [54]:
song_data_json_df.createOrReplaceTempView("song_data_view")

In [55]:
song_data_json_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### Extract columns for loading songplays table


In [103]:
songplays_table = spark.sql('''
select e.start_time 
    , case when nvl(userId, '') = '' then NULL else cast(e.userId as integer) end as userId 
    , e.level 
    , sa.song_id 
    , sa.artist_id 
    , e.sessionId
    , e.location
    , e.userAgent
    , year(start_time) as year
    , month(start_time) as month
from log_data_view e
left join 
(
    select distinct song_id, title as song_title, duration, artist_id, artist_name
    from song_data_view 
) sa
on e.song = sa.song_title
and e.length = sa.duration
and e.artist = sa.artist_name
where e.page = 'NextSong'
''')

### Write into songplays table in parquet format

In [104]:
songplays_table.write.partitionBy("year", "month").parquet(os.path.join(output_data, 'songplays'), 'overwrite')

In [105]:
songplays_parquet = spark.read.parquet(os.path.join(output_data, 'songplays'))
songplays_parquet.show(5)

+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
|         start_time|userId|level|song_id|artist_id|sessionId|            location|           userAgent|year|month|
+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
|2018-11-15 00:30:26|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 00:41:21|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 00:45:41|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 03:44:09|    61| free|   null|     null|      597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-15 05:48:55|    80| paid|   null|     null|      602|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
+-------------------+------+-----+-------+---------+---------+----------

In [107]:
song_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'songs'))
song_data_from_parquet.show(10)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOSMJFC12A8C13DE0C|Is That All There...|AR1KTV21187B9ACD72|   0|343.87546|
|SONRWUU12AF72A4283|  Into The Nightlife|ARGE7G11187FB37E05|2008|240.63955|
|SOXZYWX12A6310ED0C|     It's About Time|ARC1IHZ1187FB4E920|   0| 246.9873|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOFSOCN12A8C143F5D|      Face the Ashes|ARXR32B1187FB57099|2007|209.60608|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+-----------

In [10]:
try:
    songplays_data_from_parquet = spark.read.parquet(os.path.join(output_data, 'songplays'))
except:
    print("File do not exist")
#songplays_data_from_parquet.show(10)

In [11]:
songplays_data_from_parquet.show(5)

+-----------+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
|songplay_id|         start_time|userId|level|song_id|artist_id|sessionId|            location|           userAgent|year|month|
+-----------+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+----+-----+
|          1|2018-11-01 21:01:46|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
|          2|2018-11-01 21:05:52|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
|          3|2018-11-01 21:08:16|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
|          4|2018-11-01 21:11:13|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|
|          5|2018-11-01 21:17:33|     8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/

In [7]:
!rm -rf spark-warehouse/songplays