In [1]:
#Import all libraries
import configparser                                                                                     # To read the cfg file and extract the credentials from there
from datetime import datetime                                                                           #Will be useful when we try to convert the ts into datetime
import os                                                                                               #What is this for?????
from pyspark.sql import SparkSession                                                                    #Useful in Creating a spark session
from pyspark.sql.functions import udf, col                                                         #Importing few functions
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import time

In [2]:
#Read from the config file. Your credentials
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
#Initiate the spark session
spark=SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

#### 1.1. Read the song_data file. (Read all json file from 1 folder)

In [4]:
#Path of the s3 bucket that holds song data
song_data_path="s3a://udacity-dend/song_data/A/A/A/*.json"

#Reading from the s3 bucket
df=spark.read.json(song_data_path)

##### 1.1.a. Retrieve song_table

In [5]:
#Fetching the columns required for song_table 
songs_table = df.select("song_id","title","artist_id","artist_name","year","duration").dropDuplicates()

##### 1.1.b.a Writing the song_table back to parquet file.

In [6]:
#Writing the songs_table in parquet currently our local folder and not on s3. Partitoning them too
songs_table.write.mode("overwrite").partitionBy("year","artist_name").parquet("song1.parquet")    #Overwrite helps us to copy the folder if it already exists

##### 1.1.b.b. Write into s3 bucket as a parquet file

In [7]:
output_data="s3a://datalake-spark-project/"


start = time.time()
songs_table.write.mode("overwrite").partitionBy("year","artist_name").parquet(output_data+"songs/")    #Overwrite helps us to copy the folder if it already exists
end = time.time()
print('The time taken to write back song_table to s3 is :',(end-start))

The time taken to write back song_table to s3 is : 224.30409908294678


##### 1.1.c We can read the parquet file. Will be useful if needed later.

In [8]:
#We can read the parquet file too

#Local
localdf=spark.read.parquet("song1.parquet")


#From S3
s3df=spark.read.parquet(output_data+ "songs/")

##### 1.2. Read Artist Table

In [9]:
artists_table = df.select("artist_id",col("artist_name").alias("name"),
                          col("artist_location").alias("location"),
                          col("artist_longitude").alias("longitude"),
                          col("artist_latitude").alias("latitude")) \
.dropDuplicates()

In [10]:
artists_table.show(2)

+------------------+-------------+---------------+---------+--------+
|         artist_id|         name|       location|longitude|latitude|
+------------------+-------------+---------------+---------+--------+
|ARSVTNL1187B992A91|Jonathan King|London, England| -0.12714|51.50632|
|ARXR32B1187FB57099|          Gob|               |     null|    null|
+------------------+-------------+---------------+---------+--------+
only showing top 2 rows



#### 2.1. Read the log_data file 

In [11]:
#Question here arises that how to read all the files after log_data
log_data_path="s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json"

df=spark.read.json(log_data_path)

##### 2.1.a. Filter By Page="NextSong"

In [12]:
# filter by actions for song plays
df = df.filter(df.page=="NextSong")

##### 2.1.b. Retrieve the user table from log file

In [13]:
# extract columns for users table    
user_table = df.select(col("userId").alias("user_id"),
                       col("firstname").alias("first_name"),
                       col("lastname").alias("last_name"),
                       "gender",
                       "level").dropDuplicates()

user_table.show(3)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     54|     Kaleb|     Cook|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     55|    Martin|  Johnson|     M| free|
+-------+----------+---------+------+-----+
only showing top 3 rows



#### 2.2. Convert the column ts to timestamp

In [14]:
#We observe the ts column is a long. we will try to convert that to datetime
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



##### 2.2.a. Convert the ts to timestamp

In [15]:
#Create a timestamp column from ts(epoch)

#Import the type timestamp
from pyspark.sql.types import TimestampType

#Create a udf function that converts the epoch to timestamp
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000),TimestampType())         #Mention timestamp() or else it will return a function in that column         
df = df.withColumn("timestamp",get_timestamp(df.ts))

##### 2.2.b. Retrieve table time. Fetch the month,hour,week column from the newly created column timestamp.

In [16]:
#Fetch the hour,month,year fro timestamp column that we just created above 
import pyspark.sql.functions as F
time_table = df.select(col("timestamp").alias("start_time"),
                   F.hour("timestamp").alias("hour"),
                   date_format(col("timestamp"),"E").alias("day"),
                   F.weekofyear("timestamp").alias("week"),
                   F.month("timestamp").alias("month"),
                   F.year("timestamp").alias("year")
                  )

time_table.show(3)

+--------------------+----+---+----+-----+----+
|          start_time|hour|day|week|month|year|
+--------------------+----+---+----+-----+----+
|2018-11-13 00:40:...|   0|Tue|  46|   11|2018|
|2018-11-13 01:12:...|   1|Tue|  46|   11|2018|
|2018-11-13 03:19:...|   3|Tue|  46|   11|2018|
+--------------------+----+---+----+-----+----+
only showing top 3 rows



#### 3. Retrieve songplays table

In [17]:
#Will join this two tables
songs_table.show(2)

#The other way is to retrieve from the parquet file
#Local -> localdf=spark.read.parquet("song1.parquet")
#From S3 -> s3df=spark.read.parquet(output_data+ "songs/")


df.limit(2).toPandas()

+------------------+--------------------+------------------+--------------------+----+---------+
|           song_id|               title|         artist_id|         artist_name|year| duration|
+------------------+--------------------+------------------+--------------------+----+---------+
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|       Jonathan King|2001|129.85424|
|SOHOZBI12A8C132E3C|         Smash It Up|AR0MWD61187B9B2B12|International Noi...|2000|195.39546|
+------------------+--------------------+------------------+--------------------+----+---------+
only showing top 2 rows



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,2018-11-13 00:40:37.796
1,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,510,A Party Song (The Walk of Shame),200,1542071549796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51,2018-11-13 01:12:29.796


In [18]:
# We will need to merge these 2 tables. So lets write a query
df.createOrReplaceTempView("log_table")
songs_table.createOrReplaceTempView("song_table")

In [19]:
songplays=spark.sql('''
select  
    l.timestamp as start_time,
    l.userId as user_id,
    l.level,
    s.song_id,
    s.artist_id,
    l.sessionId as session_id,
    l.location,
    l.useragent as user_agent,
    year(l.timestamp) as year,
    month(l.timestamp) as month

from 
    log_table as l 
inner join
    song_table as s
on 
    s.duration=l.length and s.title=l.song and s.artist_name=l.artist
''')


songplays.show()

+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+
|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|year|month|
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+

