## Import packages and load ASW credentials

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

## Create Spark Session

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
print(os.environ["AWS_ACCESS_KEY_ID"])
print(os.environ["AWS_SECRET_ACCESS_KEY"])

AKIA3M62BPFRKHT4HB6D
6+IeYc9XSfahU4bNAp9qfU7938jZDaiZoFBL7pEJ


## Load Song Data from S3

#### Create songs table and artists table

In [8]:
# read song data file. process data as a JSON file 
song_df = spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")

In [6]:
#see schema and first five rows
song_df.printSchema()
song_df.show(5)
#Latitude and longitude are strings. Should be doubles

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|3

In [7]:
# extract columns to create songs table
#see songs_table and schema
col_names = ['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table = song_df.select(col_names).dropDuplicates()
songs_table.show(5)
songs_table.printSchema()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOIGHOD12A8C13B5A1|        Indian Angel|ARY589G1187B9A9F4E|2004|171.57179|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



##### Columns look good. Each row unique and has appropriate data types. 
##### Next, write to parquet file

In [9]:
# write songs table to parquet files partitioned by year and artist
# writing to own S3 bucket, so create a S3 bucket before doing this.


songs_table.write.partitionBy('year', 'artist_id').parquet('s3a://sparkify-datalake-project/songs')

In [9]:
# extract columns to create artists table
col_names = ['artist_id', 'artist_name as name', 'artist_location as location', 'artist_latitude as latitude', 'artist_longitude as longitutde']
artists_table = song_df.selectExpr(*col_names).dropDuplicates()

In [13]:
artists_table.createOrReplaceTempView('artists')
spark.sql(
    """
    SELECT (*)
    FROM artists
    LIMIT 5
    """
).show()


+------------------+-------------+---------------+--------+----------+
|         artist_id|         name|       location|latitude|longitutde|
+------------------+-------------+---------------+--------+----------+
|ARSVTNL1187B992A91|Jonathan King|London, England|51.50632|  -0.12714|
|ARXR32B1187FB57099|          Gob|               |    null|      null|
|ARZKCQM1257509D107|   Dataphiles|               |    null|      null|
|ARC1IHZ1187FB4E920| Jamie Cullum|               |    null|      null|
|AR1KTV21187B9ACD72|     Cristina|California - LA|34.05349|-118.24532|
+------------------+-------------+---------------+--------+----------+



In [11]:
# write artists table to parquet files

artists_table.write.parquet('s3a://sparkify-datalake-project/artists')

## Load Log Data from S3

In [5]:
# read log data file
log_df = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-12-events.json")

In [6]:
# filter by actions for song plays
log_df = log_df.filter(log_df.page == 'NextSong')

In [14]:
log_df.show(5)

log_df.createOrReplaceTempView('log')
spark.sql(
    """
    SELECT page, COUNT(*) as count
    FROM log
    GROUP BY page
    """
).show()


spark.sql(
    """
    SELECT COUNT(*)
    FROM log
    """
).show()



spark.sql(
    """
    SELECT level, COUNT(*)
    FROM log
    GROUP BY level
    """
).show()

+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth| firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Pavement|Logged In|    Sylvie|     F|            0|    Cruz| 99.16036| free|Washington-Arling...|   PUT|NextSong|1.540266185796E12|      345|Mercy:The Laundromat|   200|1541990258796|"Mozilla/5.0 (Mac...|    10|
|Barry Tuckwell/Ac...|Logged In|   Celeste|     F|            1|Williams|277.15873| free|   Klamath 

In [14]:
# extract columns for users table    
col = ['userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level']
users_table = log_df.selectExpr(*col).dropDuplicates()

In [38]:
# write users table to parquet files
users_table.write.parquet('s3a://sparkify-datalake-project/users')

In [25]:
from pyspark.sql.types import TimestampType
from datetime import datetime

In [26]:
to_timestamp = udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), TimestampType())

In [19]:
# create timestamp column from original timestamp column
time_df = log_df.withColumn('start_time', to_timestamp('ts'))

In [20]:
# extract columns to create time table
time_table = time_df.select('start_time').dropDuplicates() \
    .withColumn('hour', hour('start_time')) \
    .withColumn('day', dayofmonth('start_time')) \
    .withColumn('week', weekofyear('start_time')) \
    .withColumn('month', month('start_time')) \
    .withColumn('year', year('start_time')) \
    .withColumn('weekday', date_format('start_time', 'EEEE'))

In [21]:
# write time table to parquet files partitioned by year and month

time_table.write.parquet('s3a://sparkify-datalake-project/time')

### Join Log and Songs Tables to get Songplays

In [21]:
from pyspark.sql.functions import monotonically_increasing_id

In [27]:
log_df = log_df.withColumn('start_time', to_timestamp('ts')).withColumn("songplay_id", monotonically_increasing_id())
    
songplays_df = log_df.join(song_df, (song_df.artist_name == log_df.artist) & (song_df.title == log_df.song), 'left')

songplays_df.show(2)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+-----------+---------+---------------+---------------+----------------+-----------+--------+---------+-------+-----+----+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|songplay_id|artist_id|artist_latitude|artist_location|artist_longitude|artist_name|duration|num_songs|song_id|title|year|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+-----------+---------+--

In [29]:
# extract columns from joined song and log datasets to create songplays table 
cols = ["songplay_id", "start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionID as session_id", "location", "userAgent as user_agent"]
songplays_table = songplays_df.selectExpr(*cols).\
        withColumn('year', year('start_time')).\
        withColumn('month', month('start_time'))
    
    
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,2018-11-12 02:37:38.796,10,free,,,345,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
1,1,2018-11-12 02:37:44.796,53,free,,,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
2,2,2018-11-12 02:42:21.796,53,free,,,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
3,3,2018-11-12 02:45:52.796,53,free,,,438,"Klamath Falls, OR","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
4,4,2018-11-12 02:47:22.796,29,paid,,,389,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11


In [34]:
songplays_table.createOrReplaceTempView('songplays')
spark.sql(
    """
    SELECT *
    FROM songplays
    LIMIT 6
    """
).show()

+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-12 02:37:...|     10| free|   null|     null|       345|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|          1|2018-11-12 02:37:...|     53| free|   null|     null|       438|   Klamath Falls, OR|"Mozilla/5.0 (Win...|2018|   11|
|          2|2018-11-12 02:42:...|     53| free|   null|     null|       438|   Klamath Falls, OR|"Mozilla/5.0 (Win...|2018|   11|
|          3|2018-11-12 02:45:...|     53| free|   null|     null|       438|   Klamath Falls, OR|"Mozilla/5.0 (Win...|2018|   11|
|          4|2018-11-12 02:47:...|     29| paid|   null|     null|       389|Atlant

In [29]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [36]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year', 'month').parquet('s3a://sparkify-datalake-project/songplays')