In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [3]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
.config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
.getOrCreate()

In [4]:
song_data = spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")

In [5]:
songs_table = song_data.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

In [6]:
#songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet("s3a://sparkify01/parquet/"+ "songs")

In [7]:
artists_table = song_data.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])\
.withColumnRenamed('artist_name', 'name') \
.withColumnRenamed('artist_location', 'location') \
.withColumnRenamed('artist_latitude', 'latitude') \
.withColumnRenamed('artist_longitude', 'longitude') \
.dropDuplicates()

In [8]:
#artists_table.write.mode("overwrite").parquet("s3a://sparkify01/parquet/" + "artists")

In [9]:
log_data = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-12-events.json")

In [10]:
log_data = log_data.filter(log_data.page == "NextSong")

In [11]:
users_table = log_data.select(['userId', 'firstName', 'lastName', 'gender', 'level'])\
.withColumnRenamed('userId', 'user_id') \
.withColumnRenamed('firstName', 'first_name') \
.withColumnRenamed('lastName', 'last_name') \
.dropDuplicates()

In [12]:
#users_table.write.mode("overwrite").parquet("s3a://sparkify01/parquet/"+ "users")

In [13]:
from pyspark.sql.types import *
get_timestamp = udf(lambda x: x / 1000, TimestampType())
log_data = log_data.withColumn("timestamp", get_timestamp(log_data.ts))

In [14]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())
log_data = log_data.withColumn("start_time", get_datetime(log_data.timestamp))

In [15]:
time_table = log_data.withColumn("hour", hour("start_time")) \
        .withColumn("day", dayofmonth("start_time")) \
        .withColumn("week", weekofyear("start_time")) \
        .withColumn("month", month("start_time")) \
        .withColumn("year", year("start_time")) \
        .withColumn("weekday", dayofweek("start_time"))\
.select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

In [16]:
#time_table.write.mode("overwrite").partitionBy("year", "month").parquet("s3a://sparkify01/parquet/" + "time")

In [17]:
songs_data = spark.read.parquet("s3a://sparkify01/parquet/songs/")

In [18]:
joined_table = log_data.join(songs_data, log_data.song == songs_data.title, how='inner')
songplays_table = joined_table.select("start_time",col("userId").alias("user_id"),"level","song_id","artist_id",col("sessionId").alias("session_id"),"location",col("userAgent").alias("user_agent"))\
                                   .withColumn("songplay_id", monotonically_increasing_id())

In [19]:
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner")\
                        .select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent", "year", "month")

In [20]:
#songplays_table.drop_duplicates().write.mode("overwrite").partitionBy("year", "month").parquet("s3a://sparkify01/parquet/" + "songplays")

In [21]:
#%run etl-test.py

In [22]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

