In [20]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.window import Window
from pyspark.sql.types import StructType as R, StructField as Fld, \
    DoubleType as Dbl, LongType as Long, StringType as Str, IntegerType as Int, TimestampType as timestamp

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark=create_spark_session()
spark

In [4]:
input_data = 'data/'
output_data = 'output/'

# Song Data

In [12]:
song_data = input_data + 'song_data/*/*/*/'

song_schema = R([
        Fld('artist_id', Str()),
        Fld('artist_latitude', Dbl()),
        Fld('artist_location', Str()),
        Fld('artist_longitude', Dbl()),
        Fld('artist_name', Str()),
        Fld('duration', Dbl()),
        Fld('num_songs', Int()),
        Fld('song_id', Str()),
        Fld('title', Str()),
        Fld('year', Int())
    ])

song_df = spark.read.json(song_data, schema=song_schema)
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



## Songs Table

In [46]:
songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates(["song_id"])
songs_table.show(5)
songs_table.printSchema()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



In [35]:
songs_table.write.parquet(output_data + 'songs/', mode = "overwrite", partitionBy = ["year", "artist_id"])

## Artists Table

In [47]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).dropDuplicates(["artist_id"])
artists_table.show(5)
artists_table.printSchema()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |           null|            null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington|        38.8991|         -77.029|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: 

In [37]:
artists_table.write.parquet(output_data + 'artists/', mode = "overwrite")

# Log Data

In [41]:
log_data = input_data + "log_data/"

log_schema = R([
    Fld('artist', Str()),
    Fld('auth', Str()),
    Fld('firstName', Str()),
    Fld('gender', Str()),
    Fld('itemInSession', Str()),
    Fld('lastName', Str()),
    Fld('length', Dbl()),
    Fld('level', Str()),
    Fld('location', Str()),
    Fld('method', Str()),
    Fld('page', Str()),
    Fld('registration', Dbl()),
    Fld('sessionId', Str()),
    Fld('song', Str()),
    Fld('status', Str()),
    Fld('ts', Long()),
    Fld('userAgent', Str()),
    Fld('userId', Str()),
    ])
    
log_df = spark.read.json(log_data, schema=log_schema)
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [42]:
log_df =log_df.where('page="NextSong"')
log_df

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: string, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: string, song: string, status: string, ts: bigint, userAgent: string, userId: string]

## Users Table

In [48]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).dropDuplicates(["userId"])
users_table.show(5)
users_table.printSchema()

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    51|     Maia|   Burke|     F| free|
|     7|   Adelyn|  Jordan|     F| free|
|    15|     Lily|    Koch|     F| paid|
|    54|    Kaleb|    Cook|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [44]:
users_table.write.parquet(output_data + 'users/' + 'users.parquet', mode = "overwrite")

## Time Table

In [45]:
from pyspark.sql.types import TimestampType
get_timestamp = udf(lambda x: datetime.fromtimestamp((x / 1000)), TimestampType())
log_df = log_df.withColumn("timestamp", get_timestamp(col("ts")))
log_df.select('timestamp').show(2)

+--------------------+
|           timestamp|
+--------------------+
|2018-11-15 00:30:...|
|2018-11-15 00:41:...|
+--------------------+
only showing top 2 rows



In [49]:
time_table = log_df.select(
    col('timestamp').alias('start_time'),
    hour('timestamp').alias('hour'),
    dayofmonth('timestamp').alias('day'),
    weekofyear('timestamp').alias('week'),
    month('timestamp').alias('month'),
    year('timestamp').alias('year'),
    date_format(col("timestamp"), "E").alias("weekday")
).dropDuplicates(["start_time"])
time_table.show(5)
time_table.printSchema()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-02 12:30:...|  12|  2|  44|   11|2018|    Fri|
|2018-11-03 19:01:...|  19|  3|  44|   11|2018|    Sat|
|2018-11-04 06:33:...|   6|  4|  44|   11|2018|    Sun|
|2018-11-04 19:19:...|  19|  4|  44|   11|2018|    Sun|
|2018-11-05 16:31:...|  16|  5|  45|   11|2018|    Mon|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [50]:
time_table.write.parquet(output_data + 'time/', mode = "overwrite", partitionBy = ["year", "month"])

## Songplays Table

In [51]:
song_df.createOrReplaceTempView("song_data")
log_df.createOrReplaceTempView("log_data")
    
songplays_table = spark.sql("""
                            SELECT monotonically_increasing_id() as songplay_id,
                                timestamp as start_time,
                                userId as user_id,
                                level,
                                song_id,
                                artist_id,
                                sessionId as session_id,
                                location,
                                userAgent as user_agent,
                                year(timestamp) as year, 
                                month(timestamp) as month
                            FROM log_data log
                            JOIN song_data song
                            ON (log.song = song.title
                            AND log.length = song.duration
                            AND log.artist = song.artist_name)
                            """)
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11


In [52]:
songplays_table.write.parquet(output_data + 'songplays/', mode = "overwrite", partitionBy = ['year','month'])