In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

print("**"+config['FILES']['SONG_DATA_PATH']+"**")
print("**"+config['AWS']['AWS_ACCESS_KEY_ID']+"**")

log_path = config['FILES']['LOG_DATA_PATH']
print(log_path)

**"s3://udacity-spark-project/song-data/A/A/A/TRAAAAK128F9318786.json"**
**AKIAY4Z2D55M235INTU6**
"s3a://udacity-dend/log_data/2018/11/2018-11-19-events.json"


In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()


In [None]:
#df1 = spark.read.json("s3a://udacity-dend/song-data/*/*/*/*.json")
#df1.show(5)

In [5]:
df_staging_songs = spark.read.json("s3a://udacity-dend/song-data/A/A/A/TRAAAAK128F9318786.json")
df_staging_songs = df_staging_songs.withColumnRenamed("year","song_year")
print('Staging_Songs Count --> ', df_staging_songs.count())
df_staging_songs.show(5)

Staging_Songs Count -->  1
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+---------+
|         artist_id|artist_latitude|artist_location|artist_longitude| artist_name|duration|num_songs|           song_id| title|song_year|
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+---------+
|ARJNIUY12298900C91|           null|               |            null|Adelitas Way|213.9424|        1|SOBLFFE12AF72AA5BA|Scream|     2009|
+------------------+---------------+---------------+----------------+------------+--------+---------+------------------+------+---------+



In [5]:
df_staging_songs.createOrReplaceTempView("staging_songs")

songs_table = spark.sql("""select
    distinct song_id,
    title,
    artist_id,
    song_year,
    duration
from
staging_songs""")

songs_table.show()
songs_table.write.mode('overwrite').partitionBy("song_year","artist_id").parquet("/home/workspace/data/out/songs_table.parquet") 

print('Songs_table Count --> ', songs_table.count())


artist_table = spark.sql("""select
    distinct artist_id,
    artist_name,
    artist_location,
    artist_latitude,
    artist_longitude
from
staging_songs""")
      
artist_table.show()
artist_table.write.mode('overwrite').parquet("/home/workspace/data/out/artist_table.parquet") 

print('artist_table Count --> ', artist_table.count())


+------------------+------+------------------+---------+--------+
|           song_id| title|         artist_id|song_year|duration|
+------------------+------+------------------+---------+--------+
|SOBLFFE12AF72AA5BA|Scream|ARJNIUY12298900C91|     2009|213.9424|
+------------------+------+------------------+---------+--------+

Songs_table Count -->  1
+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARJNIUY12298900C91|Adelitas Way|               |           null|            null|
+------------------+------------+---------------+---------------+----------------+

artist_table Count -->  1


In [6]:
df_staging_events = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-19-events.json")
print('Before NextSong Filter --> ',df_staging_events.count())

df_staging_events = df_staging_events.filter("page == 'NextSong'")
print('After NextSong Filter --> ',df_staging_events.count())

df_staging_events.printSchema()
df_staging_events.show(5)

Before NextSong Filter -->  327
After NextSong Filter -->  276
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              a

In [7]:
from pyspark.sql import types as t

@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts/1000)
    
df_staging_events = df_staging_events.withColumn("timestamp",get_timestamp(df_staging_events.ts))
df_staging_events.printSchema()
df_staging_events.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|             

In [8]:
@udf(t.StringType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S')

df_staging_events = df_staging_events.withColumn("datetime",get_timestamp(df_staging_events.ts))
df_staging_events = df_staging_events.withColumn("year",year(df_staging_events.datetime)).withColumn("month",month(df_staging_events.datetime))
df_staging_events.printSchema()
df_staging_events.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+-

In [9]:
df_staging_events.createOrReplaceTempView("staging_events")
#staging_events = spark.sql("Select * from staging_events where page='NextSong'").toPandas()

users_table = spark.sql("""select 
    distinct userid,
    firstname,
    lastname,
    gender,
    level
from
staging_events
""")
users_table.show()

users_table.write.mode('overwrite').parquet("/home/workspace/data/out/users_table.parquet") 
print('users_table Count --> ', users_table.count())

+------+---------+---------+------+-----+
|userid|firstname| lastname|gender|level|
+------+---------+---------+------+-----+
|    37|   Jordan|    Hicks|     F| free|
|    63|     Ayla|  Johnson|     F| free|
|    15|     Lily|     Koch|     F| paid|
|    74|   Braden|   Parker|     M| free|
|    75|   Joseph|Gutierrez|     M| free|
|    61|   Samuel| Gonzalez|     M| free|
|    88| Mohammad|Rodriguez|     M| free|
|    69| Anabelle|  Simpson|     F| free|
|    52| Theodore|    Smith|     M| free|
|    49|    Chloe|   Cuevas|     F| paid|
|    41|  Brayden|    Clark|     M| free|
|    67|     Colm|  Santana|     M| free|
|     7|   Adelyn|   Jordan|     F| free|
|    24|    Layla|  Griffin|     F| paid|
|    35|    Molly|   Taylor|     F| free|
|    54|    Kaleb|     Cook|     M| free|
|    17| Makinley|    Jones|     F| free|
|    45| Dominick|   Norris|     M| free|
|    58|    Emily|   Benson|     F| paid|
|    71|   Ayleen|     Wise|     F| free|
+------+---------+---------+------

In [24]:
df_songplay = df_staging_events.join(df_staging_songs, df_staging_events.song == df_staging_songs.title, "left")
df_songplay.printSchema()
df_songplay = df_songplay.select("datetime","year","month", "userId", "level","song_id","artist_id","sessionId","location","useragent")
df_songplay = df_songplay.withColumn("songplay_id", F.monotonically_increasing_id())
df_songplay.show()

df_songplay.write.mode('overwrite').partitionBy("year","month").parquet("/home/workspace/data/out/songplay_table.parquet") 

print('df_songplay Count --> ', df_songplay.count())

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artis

In [25]:
time_table = spark.sql("""SELECT 
    distinct datetime as start_time,
    hour(datetime) as hour,
    dayofyear(datetime) as day,
    weekofyear(datetime) as week,
    month(datetime) as month,
    year(datetime) as year,
    dayofweek(datetime) as weekday
from
staging_events
where page='NextSong'
""")

time_table.show()
time_table.write.mode('overwrite').partitionBy("year","month").parquet("/home/workspace/data/out/time_table.parquet") 
print('time_table Count --> ', time_table.count())


+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-19 21:48:53|  21|323|  47|   11|2018|      2|
|2018-11-19 03:16:41|   3|323|  47|   11|2018|      2|
|2018-11-19 10:53:17|  10|323|  47|   11|2018|      2|
|2018-11-19 04:39:59|   4|323|  47|   11|2018|      2|
|2018-11-19 10:46:18|  10|323|  47|   11|2018|      2|
|2018-11-19 04:56:47|   4|323|  47|   11|2018|      2|
|2018-11-19 04:32:05|   4|323|  47|   11|2018|      2|
|2018-11-19 06:21:04|   6|323|  47|   11|2018|      2|
|2018-11-19 16:29:46|  16|323|  47|   11|2018|      2|
|2018-11-19 21:17:39|  21|323|  47|   11|2018|      2|
|2018-11-19 23:47:16|  23|323|  47|   11|2018|      2|
|2018-11-19 03:14:29|   3|323|  47|   11|2018|      2|
|2018-11-19 05:47:22|   5|323|  47|   11|2018|      2|
|2018-11-19 08:35:58|   8|323|  47|   11|2018|      2|
|2018-11-19 21:44:59|  21|323|  47|   11|2018|      2|
|2018-11-1