In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, monotonically_increasing_id

In [2]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

In [4]:
def create_spark_session():
    """
    create a spark session
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
def process_song_data(spark, input_data, output_data):
    """
    Function: Fetch data from song_data folder in S3 bucket and extract columns for songs and artists tables. 
    Write data into parquet files and load to S3 bucket
    
    parameter list
    
    spark:        session, spark session has been created. 
    input_data:   string of path, a path point to S3 bucket.
    output_data:  string of path, a path point to destination in S3.
           
    """
    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select('song_id', 'title','artist_id','year', 'duration').dropDuplicates()
              
    songs_table.createOrReplaceTempView('songs')
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs/songs.parquet'), 'overwrite')

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
                      .withColumnRenamed('artist_name', 'name')\
                      .withColumnRenamed('artist_location', 'location')\
                      .withColumnRenamed('artist_latitude', 'latitude')\
                      .withColumnRenamed('artist_longitude', 'longitude')\
                      .dropDuplicates()
    artists_table.createOrReplaceTempView('artists')
 
    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data, 'artists/artists.parquet'), 'overwrite')


In [5]:
    spark = create_spark_session()
#     input_data = "s3a://udacity-dend/"
    input_data = "./data/"
#     output_data = "s3a://lj_loaded_data/"
    output_data = "./data/output/"

In [6]:
log_data = input_data + 'log_data/*.json'

# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df_actions = df.filter(df.page == 'NextSong')\
               .select('ts', 'userId', 'level', 'song', 'artist', 'sessionId', 'length', 'location', 'userAgent')


In [7]:
 # extract columns for users table    
users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

users_table.createOrReplaceTempView('users')

# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users/users.parquet'), 'overwrite')

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
df_actions = df_actions.withColumn('timestamp', get_timestamp(df_actions.ts)) 

# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x)/1000)))
df_actions = df_actions.withColumn('datetime', get_datetime(df_actions.ts)) 

# extract columns to create time table
time_table = df_actions.select('datetime')\
                       .withColumn('start_time', df_actions.datetime)\
                       .withColumn('hour', hour('datetime'))\
                       .withColumn('day', dayofmonth('datetime'))\
                       .withColumn('week', weekofyear('datetime'))\
                       .withColumn('month', month('datetime'))\
                       .withColumn('year', year('datetime'))\
                       .withColumn('weekday', dayofweek('datetime'))\
                       .dropDuplicates()

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month')\
                .parquet(os.path.join(output_data, 'time/time.parquet'), 'overwrite')

# read in song data to use for songplays table
df_songs = spark.read.json(input_data + 'song_data/*/*/*/*.json')

In [19]:
df_songs.dropDuplicates()
df_songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
df_actions.printSchema()


root
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- length: double (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [15]:
df_actions.limit(100).toPandas()

Unnamed: 0,ts,userId,level,song,artist,sessionId,length,location,userAgent,timestamp,datetime
0,1542241826796,26,free,Sehr kosmisch,Harmonia,583,655.77751,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1542241826,2018-11-14 18:30:26.796000
1,1542242481796,26,free,The Big Gundown,The Prodigy,583,260.07465,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1542242481,2018-11-14 18:41:21.796000
2,1542242741796,26,free,Marry Me,Train,583,205.45261,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1542242741,2018-11-14 18:45:41.796000
3,1542253449796,61,free,Blackbird,Sony Wonder,597,218.06975,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1542253449,2018-11-14 21:44:09.796000
4,1542260935796,80,paid,Best Of Both Worlds (Remastered Album Version),Van Halen,602,289.38404,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1542260935,2018-11-14 23:48:55.796000
...,...,...,...,...,...,...,...,...,...,...,...
95,1542281853796,16,paid,Suicide,Jedi Mind Tricks,575,232.88118,"Birmingham-Hoover, AL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1542281853,2018-11-15 05:37:33.796000
96,1542281872796,80,paid,Stella By Starlight,Miles Davis,611,285.20444,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1542281872,2018-11-15 05:37:52.796000
97,1542281972796,30,paid,Nothin' On You [feat. Bruno Mars] (Album Version),B.o.B,324,269.63546,"San Jose-Sunnyvale-Santa Clara, CA",Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,1542281972,2018-11-15 05:39:32.796000
98,1542282085796,16,paid,I And Love And You,The Avett Brothers,575,300.85179,"Birmingham-Hoover, AL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1542282085,2018-11-15 05:41:25.796000


In [9]:
df_songs.take(1)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)]

In [20]:
df_song_actions = df_actions.join(df_songs, (df_actions['artist'] == df_songs['artist_name'])
                                  & (df_actions['song'] == df_songs['title'])
                                  & (df_actions['length'] == df_songs['duration']),'inner')

In [29]:
songplays_table = df_song_actions.select(
        col('datetime').alias('start_time'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('song_id').alias('song_id'),
        col('artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent')
    ).withColumn('songplay_id', monotonically_increasing_id())
    


In [18]:

# write songplays table to parquet files partitioned by year and month
songplays_table.createOrReplaceTempView('songplays')



NameError: name 'songplays_table' is not defined

In [None]:
time_table = time_table.alias('timetable')



In [None]:
songplays_table.write.partitionBy(
    'year', 'month'
).parquet(os.path.join(output_data, 'songplays/songplays.parquet'), 'overwrite')

In [None]:
def process_log_data(spark, input_data, output_data):
    """
    Function: Extract data from log_data files for user and time tables. From both log_data and song_data files get data for songplays     table. Data written into parquet files and load into S3 bucket
    
    Prameter list
    spark:        session, spark session has been created. 
    input_data:   string of path, a path point to S3 bucket.
    output_data:  string of path, a path point to destination in S3.
    
    """
    # get filepath to log data file
    log_data = input_data + 'log_data/*.json'

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df_actions = df.filter(df.page == 'NextSong')\
                   .select('ts', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', 'userAgent')

    # extract columns for users table    
    users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()
    
    users_table.createOrReplaceTempView('users')
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, 'users/users.parquet'), 'overwrite')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
    df_actions = df_actions.withColumn('timestamp', get_timestamp(df_actions.ts)) 
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x)/1000)))
    df_actions = df_actions.withColumn('datetime', get_datetime(df_actions.ts)) 
    
    # extract columns to create time table
    time_table = df_actions.select('datetime')\
                           .withColumn('start_time', df_actions.datetime)\
                           .withColumn('hour', hour('datetime'))\
                           .withColumn('day', dayofmonth('datetime'))\
                           .withColumn('week', weekofyear('datetime'))\
                           .withColumn('month', month('datetime'))\
                           .withColumn('year', year('datetime'))\
                           .withColumn('weekday', dayofweek('datetime'))\
                           .dropDuplicates()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month')\
                    .parquet(os.path.join(output_data, 'time/time.parquet'), 'overwrite')

    # read in song data to use for songplays table
    df_songs = spark.read.json(input_data + 'song_data/*/*/*/*.json')

    # extract columns from joined song and log datasets to create songplays table 
    df_song_actions = df_actions.join(df_songs, df_actions['artist'] == df_songs['artist_name'],'inner')
    songplays_table = df_song_actions.select(
        col('df_actions.datetime').alias('start_time'),
        col('df_actions.userId').alias('user_id'),
        col('df_actions.level').alias('level'),
        col('df_songs.song_id').alias('song_id'),
        col('df_songs.artist_id').alias('artist_id'),
        col('df_actions.sessionId').alias('session_id'),
        col('df_actions.location').alias('location'),
        col('df_actions.userAgent').alias('user_agent'),
        year('log_df.datetime').alias('year'),
        month('log_df.datetime').alias('month')
    ).withColumn('songplay_id', monotonically_increasing_id())
    

    # write songplays table to parquet files partitioned by year and month
    songplays_table.createOrReplaceTempView('songplays')
    
    time_table = time_table.alias('timetable')
    
    songplays_table.write.partitionBy(
        'year', 'month'
    ).parquet(os.path.join(output_data, 'songplays/songplays.parquet'), 'overwrite')


In [None]:
def main():
    '''
    Funtions:
    * Get or create a spark session
    * Read the song and log data from S3
    * Transform data into dimension and fact tables
    * Wrote into parquet files
    * Load parquet files to S3
    '''
    spark = create_spark_session()
#     input_data = "s3a://udacity-dend/"
    input_data = "./data/"
#     output_data = "s3a://lj_loaded_data/"
    output_data = "./data/output/"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)
