# Data Lake - Project

# Load Required Packages

In [9]:
import configparser
import boto3
from datetime import datetime
import os
import glob
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, col, lit, concat
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long 

# Load config file

In [3]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

input_data = config['AWS']['INPUT_DATA']
output_data = config['LOCAL']['OUTPUT_DATA']

# Create spark session

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
spark.sql("set spark.sql.parquet.compression.codec=gzip")
sc = spark.sparkContext

# Create schemas

In [5]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long
logdataSchema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("firstName",Str()),
    Fld("gender",Str()),
    Fld("itemInSession",Long()),
    Fld("lastName",Str()),
    Fld("length",Dbl()),
    Fld("level",Str()),
    Fld("location",Str()),
    Fld("method",Str()),
    Fld("page",Str()),
    Fld("registration",Dbl()),
    Fld("sessionId",Long()),
    Fld("song",Str()),
    Fld("status",Long()),
    Fld("ts",Long()),
    Fld("userAgent",Str()),
    Fld("userId",Str()),
])

songdataSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Long()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("year",Long()),
])

log_data = spark.createDataFrame(sc.emptyRDD(), logdataSchema)
song_data = spark.createDataFrame(sc.emptyRDD(), songdataSchema)

In [6]:
def getFilepaths(prefix):
    
    s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                   )
    
    filepaths=[]
    if 's3a://' in input_data:
        my_bucket = s3.Bucket('udacity-dend')
        objects = my_bucket.objects.filter(Prefix=prefix)
        for obj in objects:
            path, filename = os.path.split(obj.key)
            if '.json' in filename:
                filespaths = filepaths.append(input_data+'/'+obj.key)
    else:
        for root, dirs, files in os.walk(input_data+'/'+prefix):
            files = glob.glob(os.path.join(root,'*.json'))
            for f in files:
                filespaths = filepaths.append(os.path.abspath(f))
    return filepaths

# Load data

In [7]:
# get all filepaths - not used in final script.
start = datetime.now()
print('Extracting .JSON filepaths - In Progress...')
logFilepaths = getFilepaths('log_data')
songFilepaths = getFilepaths('song_data')
stop = datetime.now()
total = stop - start
print('Extracting .JSON filepaths - Completed, Time Taken: {}'.format(total))

# get total number of filepaths to be loaded - not used in final script.
start = datetime.now()
print('Extracting .JSON log_data - In Progress...')
log_data = spark.read.json(logFilepaths, schema=logdataSchema)
stop = datetime.now()
total = stop - start
print('Extracting .JSON log_data - Completed, Time Taken: {}'.format(total))

# loading song logs, only 50 / 14896 filepaths used - not used in final script.
start = datetime.now()
print('Extracting .JSON song_data - In Progress...')
song_data = spark.read.json(songFilepaths[:50], schema=songdataSchema)
stop = datetime.now()
total = stop - start
print('Extracting .JSON song_data - Completed, Time Taken: {}'.format(total))

Extracting .JSON filepaths - In Progress...
Extracting .JSON filepaths - Completed, Time Taken: 0:00:13.390887
Extracting .JSON log_data - In Progress...
Extracting .JSON log_data - Completed, Time Taken: 0:00:05.224602
Extracting .JSON song_data - In Progress...
Extracting .JSON song_data - Completed, Time Taken: 0:00:10.466239


In [None]:
log_data.limit(1).toPandas().T.to_dict()

In [14]:
print(song_data.rdd.getNumPartitions())

2


In [None]:
song_data.limit(1).toPandas().T.to_dict()

# Process Song Data

In [11]:
start = datetime.now()
print('Transforming .JSON log_data files - In Progress, Count of: "{}" files...'.format(log_data.count()))

# get filepath to song data file
df = song_data

# extract columns to create songs table
songs_table = df.select('song_id','title','artist_id','year','duration')
songs_table = songs_table.dropDuplicates()

# extract columns to create artists table
artists_table = df.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude')
artists_table = artists_table.dropDuplicates()

stop = datetime.now()
total = stop - start
print('Transforming .JSON log_data - Completed, Time Taken: {}'.format(total))

start = datetime.now()
print('Loading/Writing .JSON log_data files - In Progress, Count of: "{}" files...'.format(log_data.count()))

# write songs table to parquet files partitioned by year and artist
songs_tablewrite=songs_table.repartition('year','artist_id')
songs_tablewrite.write.mode("overwrite").partitionBy('year','artist_id').parquet(output_data+'/'+'songs_table.parquet')

# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data+'/'+'artists_table')

stop = datetime.now()
total = stop - start
print('Loading/Writing .JSON log_data - Completed, Time Taken: {}'.format(total))

Transforming .JSON log_data files - In Progress, Count of: "8056" files...
Transforming .JSON log_data - Completed, Time Taken: 0:00:05.256818
Loading/Writing .JSON log_data files - In Progress, Count of: "8056" files...
Loading/Writing .JSON log_data - Completed, Time Taken: 0:00:29.289841


# Process Log Data

In [None]:
start = datetime.now()
print('Transforming .JSON song_data - In Progress, Count of: "{}" files...'.format(song_data.count()))


# get filepath to log data file


# read log data file
df = log_data

# filter by actions for song plays
df=df.where(col("page") == "NextSong")

# extract columns for users table   
users_table = df.withColumn("row_number",F.row_number()\
                            .over(Window.partitionBy(df.userId, df.firstName, df.lastName, df.gender)\
                            .orderBy(df.ts.desc())))\
                            .filter(F.col("row_number")==1)\
                            .drop("row_number")\
                            .select('userId','firstName','lastName','gender','level')



# create timestamp / datetime column from original timestamp column
df = df.withColumn("timestamp", F.to_timestamp(df['ts']/1000))

# read in song data to use for songplays table
col_list = ['song','artist','length']

song_df = df.select(col("timestamp").alias("start_time"), 
                    col("userid").alias("user_id"),
                    col("level"),
                    col("sessionid").alias("session_id"),
                    col("location"),
                    col("useragent").alias("user_agent"),
                    col("song"),
                    col("artist"),
                    col("length"),
                    col("ts"))
song_df = song_df.withColumn('concat_id',concat(*col_list))

col_list = ['title','artist_name','duration']
song_df2 = songs_table.withColumn("artist_id", col("artist_id"))\
    .join(artists_table.withColumn("artist_id", col("artist_id")), on="artist_id")\
    .select("song_id","title","artist_id","artist_name","duration")
song_df2 = song_df2.withColumn('concat_id',concat(*col_list))

# extract columns from joined song and log datasets to create songplays table 
songplays_table = song_df.join(song_df2, song_df["concat_id"] == song_df2["concat_id"], "left")\
    .select("start_time","user_id","level","song_id","artist_id","session_id","location","user_agent")\
    .dropDuplicates()
songplays_table = songplays_table.withColumn("month", F.month("start_time"))
songplays_table = songplays_table.withColumn("year", F.year("start_time"))

# extract columns to create time table
time_table = songplays_table.select(col("start_time").alias("time_stamp"))
time_table = time_table.withColumn("hour", F.hour("time_stamp"))
time_table = time_table.withColumn("day", F.dayofyear("time_stamp"))
time_table = time_table.withColumn("weekofyear", F.weekofyear("time_stamp"))
time_table = time_table.withColumn("month", F.month("time_stamp"))
time_table = time_table.withColumn("year", F.year("time_stamp"))
time_table = time_table.withColumn("weekday", F.dayofweek("time_stamp"))

stop = datetime.now()
total = stop - start
print('Transforming .JSON song_data - Completed, Time Taken: {}'.format(total))


# start of loading/writing users / songplays and time tables
start = datetime.now()
print('Loading/Writing .JSON song_data - In Progress, Count of: "{}" files...'.format(song_data.count()))

# write users table to parquet files
users_table.write.mode("overwrite").parquet(output_data+'/'+'users_table')

# write songplays table to parquet files partitioned by year and month?
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_data+'/'+'songplays_table')

# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_data+'/'+'time_table')

stop = datetime.now()
total = stop - start
print('Loading/Writing .JSON song_data - Completed, Time Taken: {}'.format(total))