### List of things to be implemented:

1. Pre-filter the raw data by discarding nulls on read

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, from_unixtime, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, DecimalType, DoubleType, IntegerType, LongType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_SECURITY']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECURITY']['AWS_SECRET_ACCESS_KEY']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']

'AKIAT7ISBRWEAQ355IYK'

In [4]:
def create_spark_session():
    """
    This method returns the SparkSession object. It adds the Apache Hadoop AWS library as configuration parameter.
    
    Returns:
        spark (pyspark.sql.session.SparkSession)
    """
    
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    
    return spark


In [5]:
spark = create_spark_session()

In [13]:
# Schema definitons to read raw data

# Schema song-data
sch_song_data = StructType([ \
    StructField("artist_id", StringType(), True), \
    StructField("artist_latitude", DecimalType(),True), \
    StructField("artist_location", StringType(), True), \
    StructField("artist_longitude", DecimalType(), True), \
    StructField("artist_name", StringType(), True), \
    StructField("duration", DoubleType(), True), \
    StructField("num_songs", IntegerType(), True), \
    StructField("song_id", StringType(), True), \
    StructField("title", StringType(), True), \
    StructField("year", IntegerType(), True) \
  ])

# Schema log-data
sch_log_data = StructType([ \
    StructField("artist", StringType(), True), \
    StructField("auth", StringType(),True), \
    StructField("firstName", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("itemInSession", LongType(), True), \
    StructField("lastName", StringType(), True), \
    StructField("length", DoubleType(), True), \
    StructField("level", StringType(), True), \
    StructField("location", StringType(), True), \
    StructField("method", StringType(), True), \
    StructField("page", StringType(), True), \
    StructField("registration", DoubleType(), True), \
    StructField("sessionId", LongType(), True), \
    StructField("song", StringType(), True), \
    StructField("status", StringType(), True), \
    StructField("ts", DoubleType(), True), \
    StructField("userAgent", StringType(), True), \
    StructField("userId", StringType(), True) \
  ])

In [14]:
# get filepath to song data files
song_data_file = "data/song-data/song_data/A/A/A/TRAAAAW128F429D538.json"
song_data      = "./data/song-data/song_data/*/*/*/*.json"
song_data_s3   = "s3a://udacity-dend/song_data/A/B/C/*.json"

# get filepath to log data files
log_data_file = "data/log-data/2018-11-01-events.json"
log_data = "./data/log-data/*.json"

In [17]:
# read song data files
df_song = spark.read.json(song_data, schema = sch_song_data).na.drop("all").show()

# read log data files
df_log = spark.read.json(log_data, schema = sch_log_data)

# song and log data joined on artist name

df_songs_logs = df_log.\
    join(df_song, df_song.artist_name == df_log.artist, how = "inner")

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|             41|   Morris Plains, NJ|             -74|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|             4

AttributeError: 'NoneType' object has no attribute 'artist_name'

In [7]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: decimal(10,0) (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: decimal(10,0) (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [8]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: string (nullable = true)
 |-- ts: double (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [9]:
# processing song data

# extract columns to create songs table
songs_table = df_song.select("song_id", "title", "artist_id", "year", "duration")
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [10]:
# extract columns to create artists table

artists_table = df_song.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
artists_table.show(5)

+------------------+--------------------+-----------------+---------------+----------------+
|         artist_id|         artist_name|  artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+-----------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|             41|             -74|
|ARPBNLO1187FB3D52F|            Tiny Tim|     New York, NY|             41|             -74|
|ARDNS031187B9924F0|          Tim Wilson|          Georgia|             33|             -83|
+------------------+--------------------+-----------------+---------------+----------------+
only showing top 5 rows



In [11]:
# extract columns to create users table

users_table = df_log.select("userId", "firstName", "lastName", "gender", "userAgent")
users_table.show(5)

+------+---------+--------+------+--------------------+
|userId|firstName|lastName|gender|           userAgent|
+------+---------+--------+------+--------------------+
|    26|     Ryan|   Smith|     M|"Mozilla/5.0 (X11...|
|    26|     Ryan|   Smith|     M|"Mozilla/5.0 (X11...|
|    26|     Ryan|   Smith|     M|"Mozilla/5.0 (X11...|
|     9|    Wyatt|   Scott|     M|Mozilla/5.0 (Wind...|
|    12|   Austin| Rosales|     M|Mozilla/5.0 (Wind...|
+------+---------+--------+------+--------------------+
only showing top 5 rows



In [60]:
# extract columns to create songplays table
start_time = from_unixtime(df_songs_logs.ts/1000).alias("start_time")

songplays_table = df_songs_logs.\
    select(start_time, "userId", "level", "sessionId", "location", "userAgent", "artist_id", "song_id").\
    withColumn("songplay_id", monotonically_increasing_id()).\
    where(col("page") == "NextSong")

In [61]:
songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- songplay_id: long (nullable = false)



In [62]:
songplays_table.show()

+-------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+-----------+
|         start_time|userId|level|sessionId|            location|           userAgent|         artist_id|           song_id|songplay_id|
+-------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+-----------+
|2018-11-15 20:32:47|    44| paid|      619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|ARIK43K1187B9AE54C|SOBONFF12A6D4F84D8|          0|
|2018-11-21 21:56:47|    15| paid|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|AR5KOSW1187FB35FF4|SOZCTXZ12AB0182364|          1|
|2018-11-14 13:11:26|    34| free|      495|Milwaukee-Waukesh...|Mozilla/5.0 (Maci...|ARPFHN61187FB575F6|SOWQTQZ12A58A7B63E|          2|
|2018-11-14 20:16:39|   101| free|      603|New Orleans-Metai...|"Mozilla/5.0 (Win...|ARVBRGZ1187FB4675A|SORRZGD12A6310DBC3|          3|
|2018-11-28 23:22:57|    24| paid|      9

In [77]:
# extract columns to time songplays table

df_time = songplays_table.select("start_time").\
    withColumn("year", year(to_timestamp(col("start_time")))).\
    withColumn("month", month(to_timestamp(col("start_time")))).\
    withColumn("day", dayofmonth(to_timestamp(col("start_time")))).\
    withColumn("week", weekofyear(to_timestamp(col("start_time")))).\
    withColumn("weekday", dayofweek(to_timestamp(col("start_time"))))
    

In [87]:
def write_to_parquet(dataframe, path, mode = "errorifexists", partition_cols = None):
    """
    This method can be used to write dataframes as parquet files specifying the output path (local), 
    the mode and the partition columns
    
    Arguments:
        dataframe:
        path (str):
        mode (str, optional):
        partition_cols (str, optional):
    """
    
    dataframe.write.parquet(path, mode, partition_cols)

In [None]:
df_time.write.parquet(path = "./data/output-data/time.parquet", mode = "overwrite", partitionBy=None)

In [88]:
write_to_parquet(dataframe = df_time, path = "./data/output-data/time.parquet", mode = "overwrite", partition_cols = None)