In [1]:
import configparser
import datetime
import calendar
import os
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['IAM_USER']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['IAM_USER']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

In [6]:
output_data = 's3a://data-lake-pmb2/'

In [7]:
songs_table = spark.read.parquet(output_data + 'songs.parquet')
artists_table = spark.read.parquet(output_data + 'artists.parquet')

In [8]:
log_data = 'data/log_data/*'

df = spark.read.json(log_data)

# filter by actions for song plays
df = df.where(df.page == 'NextSong')

# extract columns for users table    
users_table = df.select('userId',
                        'firstName',
                        'lastName',
                        'gender',
                        'level') \
                      .withColumnRenamed('userId', 'user_id') \
                      .withColumnRenamed('firstName', 'first_name') \
                      .withColumnRenamed('lastName', 'last_name') \
                      .distinct()

# create start_time column from original timestamp column
df = df.withColumn(
    'start_time',
    F.to_timestamp(F.from_unixtime((col('ts') / 1000) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast('Timestamp')
)

In [9]:
def get_weekday(date):
    """
    Gets day of week from date.
    Parameters: date
    Returns: day of week
    """
    date = date.strftime("%m-%d-%Y")
    month, day, year = (int(x) for x in date.split('-'))
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]

udf_week_day = udf(get_weekday, T.StringType())

# extract columns to create time table
time_table = df.withColumn('hour', hour(col('start_time'))) \
               .withColumn('day', dayofmonth(col('start_time'))) \
               .withColumn('week', weekofyear(col('start_time'))) \
               .withColumn('month', month(col('start_time'))) \
               .withColumn('year', year(col('start_time'))) \
               .withColumn('weekday', udf_week_day(col('start_time'))) \
               .select('start_time',
                        'hour',
                        'day',
                        'week',
                        'month',
                        'year',
                        'weekday').distinct()

In [10]:
time_table.show()

+-------------------+----+---+----+-----+----+---------+
|         start_time|hour|day|week|month|year|  weekday|
+-------------------+----+---+----+-----+----+---------+
|2018-11-15 21:21:37|  21| 15|  46|   11|2018| Thursday|
|2018-11-21 06:15:25|   6| 21|  47|   11|2018|Wednesday|
|2018-11-14 04:13:46|   4| 14|  46|   11|2018|Wednesday|
|2018-11-14 04:58:37|   4| 14|  46|   11|2018|Wednesday|
|2018-11-14 07:29:17|   7| 14|  46|   11|2018|Wednesday|
|2018-11-28 19:00:08|  19| 28|  48|   11|2018|Wednesday|
|2018-11-28 21:03:23|  21| 28|  48|   11|2018|Wednesday|
|2018-11-28 22:40:20|  22| 28|  48|   11|2018|Wednesday|
|2018-11-05 19:02:48|  19|  5|  45|   11|2018|   Monday|
|2018-11-13 21:10:36|  21| 13|  46|   11|2018|  Tuesday|
|2018-11-30 07:22:12|   7| 30|  48|   11|2018|   Friday|
|2018-11-30 16:31:25|  16| 30|  48|   11|2018|   Friday|
|2018-11-16 08:43:25|   8| 16|  46|   11|2018|   Friday|
|2018-11-20 22:17:06|  22| 20|  47|   11|2018|  Tuesday|
|2018-11-24 12:15:27|  12| 24| 

In [11]:
songs_table.show()

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOKTJDS12AF72A25E5|Drown In My Own T...|  192.522|   0|ARA23XO1187B9AF18F|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|129.85424|2001|ARSVTNL1187B992A91|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|301.40036|1972|ARTC1LV1187B9A4858|
|SORRNOC12AB017F52B|The Last Beat Of ...|337.81506|2004|ARSZ7L31187FB4E610|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|118.07302|2005|AR73AIO1187B9AD57B|
|SODZYPO12A8C13A91E|Burn My Body (Alb...|177.99791|   0|AR1C2IX1187B99BF74|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|821.05424|1985|ARXQBR11187B98A2CC|
|SOERIDA12A6D4F8506|I Want You (Album...|192.28689|2006|ARBZIN01187FB362CC|
|SOAPERH12A58A787DC|The One And Only ...|230.42567|   0|ARZ5H0P1187B98A1DD|
|SOSMJFC12A8C13DE0C|Is That All There...|343.87546|   0|AR1KTV21187B9ACD72|
|SONRWUU12AF

In [12]:
df.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|         start_time|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|
|         The Prodigy|Lo

In [13]:
songplays_table = df.withColumn('songplay_id', F.monotonically_increasing_id()) \
                    .join(songs_table, (songs_table.title == df.song) & (df.length == songs_table.duration)) \
                    .join(artists_table, (df.artist == artists_table.name) & (songs_table.artist_id == artists_table.artist_id)) \
                    .join(time_table, time_table.start_time == df.start_time) \
                    .select('songplay_id',
                            df['start_time'],
                            df['userId'],
                            df['level'],
                            songs_table['song_id'],
                            songs_table['artist_id'],
                            df['sessionId'],
                            artists_table['location'],
                            df['userAgent'],
                            time_table['year'],
                            time_table['month']) \
                         .withColumnRenamed('userId', 'user_id') \
                         .withColumnRenamed('sessionId', 'session_id') \
                         .withColumnRenamed('userAgent', 'user_agent') 

In [14]:
songplays_table.show()

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|year|month|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+

