In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, unix_timestamp, to_date
import zipfile
from pyspark.sql.types import TimestampType

In [2]:
def zip_to_folder(log_path, song_path):
    with zipfile.ZipFile('data/log-data.zip', 'r') as zip_ref:
        zip_ref.extractall(log_path)
    with zipfile.ZipFile('data/song-data.zip', 'r') as zip_ref:
        zip_ref.extractall(song_path)

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('default', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']= config.get('default', 'AWS_SECRET_ACCESS_KEY')

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


In [5]:
spark = create_spark_session()

In [6]:
spark

In [7]:
input_data = "s3a://udacity-dend/"
song_data = input_data + 'song_data/*/*/*/*.json'
log_path = input_data + 'log_data/*.json'

In [8]:
songs = 'data/song_data/song_data/A/A/A/*.json'

In [9]:
song_data = spark.read.json(songs)

In [10]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
logs = 'data/log_data/*.json'

In [19]:
log_data = spark.read.json(logs)

In [14]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = 'data/song_data/song_data/A/A/A/*.json'
    
    # read song data file
    df = spark.read.json(logs)

    # extract columns to create songs table
    df.select('song_id', 'title', 'artist_id', 'year', 'duration')
    
    # write songs table to parquet files partitioned by year and artist
    songs_table = songs_table.write.partitionBy(['year', 'artist_id']).parquet('data/songs_table.parquet')

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')
    
    # write artists table to parquet files
    artists_table.write.parquet('data/artist_table.parquet')


In [20]:
log_filter = log_data.filter(log_data['page'] == 'NextSong')

In [22]:
get_timestamp = udf(lambda x: str(int(int(x)/1000)))
log_filter = log_filter.withColumn('timestamp', get_timestamp(log_filter.ts))

In [24]:
log_filter.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId| timestamp|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+-----

In [29]:
song_df = spark.read.parquet('data/songs_table.parquet')

In [30]:
song_df.columns

['song_id', 'title', 'duration', 'year', 'artist_id']

In [31]:
log_filter.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp']

In [None]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = 'data/log_data/*.json'

    # read log data file
    df = spark.read.json(input_data)
    
    # filter by actions for song plays
    df = log_data = spark.read.json(logs)

    # extract columns for users table    
    users = df.select('userId', 'firstName', 'lastName', 'gender', 'level')
    
    # write users table to parquet files
    users.write.parquet('data/users_table.parquet')

    #create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x)/1000)))
    df = df.withColumn('timestamp', get_timestamp(log_filter.ts))
    get_datetime = udf(lambda x: datetime.fromtimestamp(int(int(x)/1000)))
    get_week = udf(lambda x: calendar.day_name[x.weekday()])
    get_weekday = udf(lambda x: x.isocalendar()[1])
    get_hour = udf(lambda x: x.hour)
    get_day = udf(lambda x : x.day)
    get_year = udf(lambda x: x.year)
    get_month = udf(lambda x: x.month)
    
    df = df.withColumn('timestamp', get_timestamp(log_filter.ts))
    df = df.withColumn('datetime', get_datetime(df.ts))
    df = df.withColumn('start_time', get_datetime(df.ts))
    df = df.withColumn('hour', get_hour(df.start_time))
    df = df.withColumn('day', get_day(df.start_time))
    df = df.withColumn('week', get_week(df.start_time))
    df = df.withColumn('month', get_month(df.start_time))
    df = df.withColumn('year', get_year(df.start_time))
    df = df.withColumn('weekday', get_weekday(df.start_time))
    
    # extract columns to create time table
    time_table  = df['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']
    
    # write time table to parquet files partitioned by year and month
    time_table = time_table.write.partitionBy(['year', 'month']).parquet('data/time_table.parquet')

    # read in song data to use for songplays table
    song_df = spark.read.parquet('data/songs_table.parquet')
    
    #join the tables together to have all info needed for the songplays table
    df = df.join(song_df, song_df.title == df.song)
    
    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.select('ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet('data/songplays_table.parquet')
