In [25]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, udf, col, to_timestamp, monotonically_increasing_id, abs
from pyspark.sql.types import TimestampType as Stamp
import pyspark.sql.functions as F

In [8]:

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [9]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate() 

In [10]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://tien-bucket/"

In [11]:
log_data = os.path.join(input_data,'log-data/*/*/*.json')

In [12]:
df = spark.read.json(log_data)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [17]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp((x / 1000)), Stamp())
df = df.withColumn("timestamp", get_timestamp(col("ts")))

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp((x / 1000)), Stamp())
dfs = df.withColumn("datetime", get_timestamp(col("ts")))

time_table = dfs.select(
                    F.col("timestamp").alias("start_time"),
                    F.hour("timestamp").alias('hour'),
                    F.dayofmonth("timestamp").alias('day'),
                    F.weekofyear("timestamp").alias('week'),
                    F.month("timestamp").alias('month'), 
                    F.year("timestamp").alias('year'), 
                    F.date_format(F.col("timestamp"), "E").alias("weekday")
                )

In [18]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [19]:
song_data =  os.path.join(input_data,'song_data/A/*/*/*.json')

# read song data file
song_df = spark.read.json(song_data)

In [22]:
song_df.createOrReplaceTempView("song_data")
dfs.createOrReplaceTempView("log_data")

songplays_table = spark.sql("""
                                SELECT monotonically_increasing_id() as songplay_id,
                                log.timestamp as start_time,
                                year(log.timestamp) as year,
                                month(log.timestamp) as month,
                                log.userId as user_id,
                                log.level as level,
                                song.song_id as song_id,
                                song.artist_id as artist_id,
                                log.sessionId as session_id,
                                log.location as location,
                                log.userAgent as user_agent
                                FROM log_data log
                                JOIN song_data song
                                ON (log.song = song.title
                                AND log.length = song.duration
                                AND log.artist = song.artist_name)
                                """)

In [24]:
songplays_table.show()

+-----------+--------------------+----+-----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|year|month|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+----+-----+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-21 21:56:...|2018|   11|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|          1|2018-11-05 17:49:...|2018|   11|     73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|       255|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|
|          2|2018-11-13 22:39:...|2018|   11|     55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|       415|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|
|          3|2018-11-16 14:21:...|2018|   11|     85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80

In [None]:
songplay = dfs.join(song_df, (song_df.title == dfs.song) & (dfs.artist == song_df.artist_name))

In [None]:
songplay.count()

In [None]:
# songplay = songplay.select(
#     "song_id", col("userId").alias("user_id"), "level","artist_id", col("sessionId").alias("session_id"), "location", col("userAgent"
# ).alias("user_agent"), "year", month("timestamp").alias("month"), "ts")

In [None]:
# songplay = songplay.selectExpr("ts as start_time")
# songplay.select(monotonically_increasing_id().alias('songplay_id')).collect()

In [None]:
# songplay.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')