In [89]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [90]:
spark = create_spark_session()

In [46]:
df = spark.read.json("s3a://karikari-udacity/song-data/*/*/*/*.json")

In [45]:
df.head()

Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)

## Extract and Create songs_table and save in S3 as parquet files

In [51]:
songs_table = df.select('song_id','title','artist_id','year','duration').dropDuplicates(['song_id'])

In [58]:
songs_table.show(n=5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [56]:
songs_table.write.partitionBy('year','artist_id').parquet('s3a://karikari-udacity/outputs/songs.parquet','overwrite')

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



## Extract and Create artists_table and save in S3 as parquet files

In [54]:
artists_table = df.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude').dropDuplicates(['artist_id'])

In [55]:
artists_table.show(n=5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |           null|            null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington|        38.8991|         -77.029|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [59]:
artists_table.write.parquet('s3a://karikari-udacity/outputs/artists.parquet','overwrite')

## Read log data from S3

In [61]:
log_data = spark.read.json("s3a://karikari-udacity/log-data/*.json")

In [70]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [67]:
df = log_data.where(log_data['page'] == 'NextSong')


In [71]:
songplays_table = df.select('ts','userId','level','song','sessionId','location','userAgent')

## Extract Columns for Users table

In [72]:
users_table = df.select('userId','firstName','lastName','gender','level').dropDuplicates(['userId'])

In [74]:
users_table.write.parquet('s3a://karikari-udacity/outputs/users.parquet','overwrite')

In [78]:
 # create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x)/1000)))
df = df.withColumn('timestamp', get_timestamp(df.ts)) 

In [79]:
 # create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
df = df.withColumn("datetime", get_datetime(df.ts))



In [80]:
time_table = df.select(
        col('datetime').alias('start_time'),
        hour('datetime').alias('hour'),
        dayofmonth('datetime').alias('day'),
        weekofyear('datetime').alias('week'),
        month('datetime').alias('month'),
        year('datetime').alias('year') 
)

In [81]:
time_table = time_table.dropDuplicates(['start_time'])

In [82]:
time_table.write.partitionBy('year', 'month').parquet('s3a://karikari-udacity/outputs/time.parquet', 'overwrite')

In [83]:
song_df = spark.read.json("s3a://karikari-udacity/song-data/*/*/*/*.json")

In [84]:
df = df.join(song_df, song_df.title == df.song)

In [96]:
songplays_table = df.select(
        col('ts').alias('start_time'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('song_id').alias('song_id'),
        col('artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
        col('year').alias('year'),
        month('datetime').alias('month')
    )

In [100]:
songplays_with_id = songplays_table.withColumn('songplay_id', monotonically_increasing_id())

In [101]:
songplays_with_id.show(n=5)

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+-----------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|songplay_id|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+-----------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|   0|   11|          0|
|1542171963796|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2003|   11|          1|
|1542618860796|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2003|   11|          2|
|1543358159796|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2003|   11|     

In [102]:
songplays_with_id.write.partitionBy('year', 'month').parquet('s3a://karikari-udacity/outputs/songplays.parquet', 'overwrite')