In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Ts
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0') \
        .getOrCreate()
    return spark

In [4]:
input_data = 's3a://udacity-dend/'

In [5]:
spark = create_spark_session()
spark

In [8]:
song_data = f'{input_data}song_data/A/A/A/'
    
# read song data file
song_df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = song_df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

# extract columns to create artists table
artists_table = song_df.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])

In [10]:
log_data = f'{input_data}log_data/2018/11/'

# read log data file
log_df = spark.read.json(log_data)

# filter by actions for song plays
log_df = log_df.where(log_df.page == 'NextSong')

# extract columns for users table    
users_table = log_df.selectExpr('userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level').dropDuplicates()

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x/1000, Int())
log_df = log_df.withColumn('timestamp', get_timestamp(log_df.ts))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x), Ts())
log_df = log_df.withColumn('datetime', get_datetime(log_df.timestamp))

# extract columns to create time table
time_table = log_df.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'), 
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format('datetime', 'E').alias('weekday')
)

AttributeError: 'str' object has no attribute 'alias'

In [11]:
time_table = log_df.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'), 
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format('datetime', 'E').alias('weekday')
)

In [14]:
songs_table.take(5)

[Row(song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', artist_id='ARTC1LV1187B9A4858', year=1972, duration=301.40036),
 Row(song_id='SOKTJDS12AF72A25E5', title='Drown In My Own Tears (24-Bit Digitally Remastered 04)', artist_id='ARA23XO1187B9AF18F', year=0, duration=192.522),
 Row(song_id='SOEKAZG12AB018837E', title="I'll Slap Your Face (Entertainment USA Theme)", artist_id='ARSVTNL1187B992A91', year=2001, duration=129.85424),
 Row(song_id='SOQPWCR12A6D4FB2A3', title='A Poor Recipe For Civic Cohesion', artist_id='AR73AIO1187B9AD57B', year=2005, duration=118.07302),
 Row(song_id='SOBRKGM12A8C139EF6', title='Welcome to the Pleasuredome', artist_id='ARXQBR11187B98A2CC', year=1985, duration=821.05424)]

In [15]:
artists_table.take(5)

[Row(artist_id='ARTC1LV1187B9A4858', artist_name='The Bonzo Dog Band', artist_location="Goldsmith's College, Lewisham, Lo", artist_latitude=51.4536, artist_longitude=-0.01802),
 Row(artist_id='ARA23XO1187B9AF18F', artist_name='The Smithereens', artist_location='Carteret, New Jersey', artist_latitude=40.57885, artist_longitude=-74.21956),
 Row(artist_id='ARSVTNL1187B992A91', artist_name='Jonathan King', artist_location='London, England', artist_latitude=51.50632, artist_longitude=-0.12714),
 Row(artist_id='AR73AIO1187B9AD57B', artist_name='Western Addiction', artist_location='San Francisco, CA', artist_latitude=37.77916, artist_longitude=-122.42005),
 Row(artist_id='ARXQBR11187B98A2CC', artist_name='Frankie Goes To Hollywood', artist_location='Liverpool, England', artist_latitude=None, artist_longitude=None)]

In [16]:
users_table.take(5)

[Row(user_id='26', first_name='Ryan', last_name='Smith', gender='M', level='free'),
 Row(user_id='7', first_name='Adelyn', last_name='Jordan', gender='F', level='free'),
 Row(user_id='71', first_name='Ayleen', last_name='Wise', gender='F', level='free'),
 Row(user_id='81', first_name='Sienna', last_name='Colon', gender='F', level='free'),
 Row(user_id='87', first_name='Dustin', last_name='Lee', gender='M', level='free')]

In [17]:
time_table.take(5)

[Row(start_time=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), hour=0, day=15, week=46, month=11, year=2018, weekday='Thu'),
 Row(start_time=datetime.datetime(2018, 11, 15, 0, 41, 21, 796000), hour=0, day=15, week=46, month=11, year=2018, weekday='Thu'),
 Row(start_time=datetime.datetime(2018, 11, 15, 0, 45, 41, 796000), hour=0, day=15, week=46, month=11, year=2018, weekday='Thu'),
 Row(start_time=datetime.datetime(2018, 11, 15, 3, 44, 9, 796000), hour=3, day=15, week=46, month=11, year=2018, weekday='Thu'),
 Row(start_time=datetime.datetime(2018, 11, 15, 5, 48, 55, 796000), hour=5, day=15, week=46, month=11, year=2018, weekday='Thu')]

In [22]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

In [24]:
songplays_table = log_df.join(song_df, log_df.song == song_df.title, 'left')\
                    .select(
                        row_number().over(Window.partitionBy().orderBy([log_df.datetime])).alias('songplay_id'), 
                        log_df.datetime.alias('start_time'),
                        log_df.userId.alias('user_id'),
                        log_df.level,
                        song_df.song_id,
                        song_df.artist_id,
                        log_df.sessionId.alias('session_id'),
                        log_df.location,
                        log_df.userAgent.alias('user_agent')
                    )

In [25]:
songplays_table.take(5)

[Row(songplay_id=1, start_time=datetime.datetime(2018, 11, 1, 21, 1, 46, 796000), user_id='8', level='free', song_id=None, artist_id=None, session_id=139, location='Phoenix-Mesa-Scottsdale, AZ', user_agent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"'),
 Row(songplay_id=2, start_time=datetime.datetime(2018, 11, 1, 21, 5, 52, 796000), user_id='8', level='free', song_id=None, artist_id=None, session_id=139, location='Phoenix-Mesa-Scottsdale, AZ', user_agent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"'),
 Row(songplay_id=3, start_time=datetime.datetime(2018, 11, 1, 21, 8, 16, 796000), user_id='8', level='free', song_id=None, artist_id=None, session_id=139, location='Phoenix-Mesa-Scottsdale, AZ', user_agent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"'),
 Row(songplay_id=4, start_time=dat

In [None]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = 'x'
#songplay_id (U), start_time (L), user_id (L), level (L), song_id (S), artist_id (S), session_id (L), location (L), user_agent (L)

#{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
#artist, auth, gender, level, firstName, lastName, length, location, method, page, registration, sessionId, song, status, ts, userAgent, userId