In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date, date_format
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl,StringType as Str, IntegerType as Int, DateType as Date, TimestampType as TS

In [2]:
import pandas as pd

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    """ 
    Create and return a Spark Session 
    """
    
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    
    return spark

In [5]:
spark = create_spark_session()
input_data = "s3a://udacity-dend"
output_data = "s3a://sparkify-data-lake-udacity"

# process_song_data(spark, input_data, output_data)    
# process_log_data(spark, input_data, output_data)

In [6]:
song_data = f"{input_data}/song-data/A/A/A/*.json"


In [7]:
# defining song_data schema
songData_schema = R([
    Fld("num_songs", Int()),
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dbl()),
    Fld("artist_longitude", Dbl()),
    Fld("artist_location", Str()),
    Fld("artist_name", Str()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("duration", Dbl()),
    Fld("year", Int()),
])

df = spark.read.json(song_data, schema = songData_schema).dropDuplicates()


In [8]:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")


In [11]:
songs_table.write.partitionBy("year", "artist_id").parquet(f"{output_data}/songs/")


In [9]:
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")


In [13]:
artists_table.write.parquet(f"{output_data}/artists/")

# Second function

In [10]:
# get filepath to log data file
log_data = f"{input_data}/log-data/*/*/*.json"

In [11]:
df = spark.read.json(log_data).dropDuplicates()


In [12]:
df = df.filter(df.page == "NextSong")


In [13]:
users_table = df.select("userId", "firstName", "lastName", "gender", "level")


In [18]:
users_table.write.parquet(f"{output_data}/users/")


In [14]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TS())
df = df.withColumn("start_time", get_timestamp(col("ts")))

In [15]:
# extract columns to create time table
df = df.withColumn("hour", hour("start_time"))
df = df.withColumn("day", dayofmonth("start_time"))
df = df.withColumn("month", month("start_time"))
df = df.withColumn("year", year("start_time"))
df = df.withColumn("week", weekofyear("start_time"))
df = df.withColumn("weekday", dayofweek("start_time"))

In [16]:
time_table = df.select("start_time", "hour", "day", "month", "year", "week", "weekday")


In [85]:

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(f"{output_data}/time/")

In [22]:
# read in song data to use for songplays table
df_songs = spark.read.parquet(f"{output_data}/songs/")

df_songs = df_songs.select('song_id', 'title', 'duration')

df_artists = spark.read.parquet(f"{output_data}/artists/")

songs_df = df.join(df_songs, ([df.song == df_songs.title,
                              df.length == df_songs.duration]), "inner")

In [23]:
df_songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)



In [24]:
songs_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- song_id: string (nullable = t

In [17]:
# read in song data to use for songplays table
df_songs = spark.read.parquet(f"{output_data}/songs/")

df_songs = df_songs.select('song_id', 'title', 'duration')

df_artists = spark.read.parquet(f"{output_data}/artists/")

songs_df = df.join(df_songs, (df.song == df_songs.title), "inner")

songs_df = songs_df.join(df_artists, (songs_df.artist == df_artists.artist_name), "inner")

In [108]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = songs_df.select("year", "month", "start_time", "userId", "level", 
                                  "song_id", "artist_id", "sessionId", "location", "userAgent")

songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(f"{output_data}/songplays/")

In [91]:
# read in song data to use for songplays table
df_songs = spark.read.parquet(f"{output_data}/songs/")

df_artists = spark.read.parquet(f"{output_data}/artists/")

songs_df = df.join(df_songs, (df.song == df_songs.title), "inner")

songs_df = songs_df.join(df_artists, (songs_df.artist == df_artists.artist_name), "inner")

# extract columns from joined song and log datasets to create songplays table 
songplays_table = songs_df.select("year", "month", "start_time", "userId", "level", 
                                  "song_id", "artist_id", "sessionId", "location", "userAgent")

songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(f"{output_data}/songplays/")

AnalysisException: "Reference 'artist_id' is ambiguous, could be: artist_id, artist_id.;"

In [None]:
# get filepath to log data file
log_data = f"{input_data}/log-data/*/*/*.json"

# read log data file
df = spark.read.json(log_data).dropDuplicates()

# filter by actions for song plays
df = df.filter(df.page == "NextSong")

# extract columns for users table    
users_table = df.select("userId", "firstName", "lastName", "gender", "level")

# write users table to parquet files
users_table.write.parquet(f"{output_data}/users/")

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TS())
df = df.withColumn("timestamp", get_timestamp(col("ts")))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: to_date(x), TS())
df = df.withColumn("start_time", get_datetime(col("timestamp")))

#     # create timestamp column from original timestamp column
#     get_timestamp = udf(lambda x: x / 1000, TS())
#     df = df.withColumn("timestamp", get_timestamp(col("ts")))

#     # create datetime column from original timestamp column
#     get_datetime = udf(lambda x: datetime.fromtimestamp(x), TS())
#     df = df.withColumn("start_time", get_datetime(col("timestamp")))

# extract columns to create time table
df = df.withColumn("hour", hour("start_time"))
df = df.withColumn("day", dayofmonth("start_time"))
df = df.withColumn("month", month("start_time"))
df = df.withColumn("year", year("start_time"))
df = df.withColumn("week", weekofyear("start_time"))
df = df.withColumn("weekday", dayofweek("start_time"))

time_table = df.select("start_time", "hour", "day", "month", "year", "week", "weekday")

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(f"{output_data}/time/")

# read in song data to use for songplays table
df_songs = spark.read.parquet(f"{output_data}/songs/")

df_artists = spark.read.parquet(f"{output_data}/artists/")

songs_df = df.join(df_songs, (df.song == df_songs.title), "inner")

songs_df = songs_df.join(df_artists, (songs_df.artist == df_artists.artist_name), "inner")

# extract columns from joined song and log datasets to create songplays table 
songplays_table = songs_df.select("year", "month", "start_time", "userId", "level", 
                                  "song_id", "artist_id", "sessionId", "location", "userAgent")

songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(f"{output_data}/songplays/")