In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [None]:
input_data = "s3a://udacity-dend/"
output_data = ""

In [None]:
song_data = input_data + "song_data/A/A/A/*.json"

In [None]:
df = spark.read.json(song_data)
df.printSchema()
df.limit(5).toPandas()

In [None]:
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"])
songs_table.limit(5).toPandas()

In [None]:
songs_table.write.partitionBy("year", "artist_id").format("parquet").save(output_data + "songs_table.parquet")

In [None]:
artists_table = df.selectExpr(["artist_id", "artist_name AS name", \
                               "artist_location AS location", "artist_latitude AS latitude", "artist_longitude AS longtitude"])
artists_table.show(5)

In [None]:
artists_table.write.format("parquet").save("artists_table.parquet")

In [None]:
log_data = input_data + "log_data/*/*/*.json"

In [None]:
df = spark.read.json(log_data)
df.printSchema()
df.limit(5).toPandas()

In [None]:
df = df.filter(df.page == 'NextSong')
df.limit(5).toPandas()

In [None]:
users_table = df.selectExpr(["userId AS user_id", "firstName AS first_name", "lastName AS last_name", "gender", "level"])
users_table.limit(5).toPandas()

In [None]:
users_table.write.format("parquet").save("users_table.parquet")

In [None]:
from pyspark.sql.types import TimestampType, DateType
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
df = df.withColumn('timestamp', get_timestamp(df.ts))
df.printSchema()
df.limit(5).toPandas()

In [None]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0), DateType())
df = df.withColumn('datetime', get_datetime(df.ts))
df.printSchema()
df.limit(5).toPandas()

In [None]:
time_table = df.selectExpr(["timestamp AS start_time", \
                            "hour(timestamp) AS hour", \
                            "dayofmonth(datetime) AS day", \
                            "weekofyear(datetime) AS week", \
                            "month(datetime) AS month", "year(datetime) AS year"])
time_table.limit(5).toPandas()

In [None]:
time_table.write.partitionBy("year", "month").format("parquet").save("time_table.parquet")

In [None]:
song_df = spark.read.parquet("songs_table.parquet")
song_df.limit(5).toPandas()

In [None]:
songplays_table = df \
    .join(song_df, [df.song == song_df.title, df.length == song_df.duration], "left") \
    .selectExpr(["timestamp AS start_time", \
                "userId AS user_id", \
                "level", \
                "song_id", \
                "artist_id", \
                "sessionId AS session_id", \
                "location", \
                "userAgent AS user_agent", \
                "month(datetime) AS month", \
                 "year(datetime) AS year"])
songplays_table.limit(5).toPandas()

In [None]:
songplays_table.write.partitionBy("year", "month").format("parquet").save("songplays_table.parquet")