In [1]:
# initialize findspark for local
import findspark
findspark.init('/Users/johnrick/opt/spark-2.4.7-bin-hadoop2.7')

In [2]:
import configparser
from datetime import datetime
# import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
# Configure the necessary Spark environment
import os
import sys
import boto3

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.1 pyspark-shell'

In [5]:
pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, "/Users/johnrick/opt/spark-2.4.7-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip"))

# Initialize PySpark
exec(open(os.path.join(spark_home, "/Users/johnrick/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/python/pyspark/python/pyspark/shell.py")).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.7.4 (default, Aug 13 2019 15:17:50)
SparkSession available as 'spark'.


In [6]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType, LongType

In [7]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [8]:
song_schema = StructType([
        StructField("artist_id", StringType(), False),
        StructField("artist_latitude", StringType(), True),
        StructField("artist_longitude", StringType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), False),
        StructField("song_id", StringType(), False),
        StructField("title", StringType(), False),
        StructField("duration", DoubleType(), False),
        StructField("year", IntegerType(), False)
])

In [17]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://ud1-s3datalake/"

In [9]:
song_data = "data/song_data/*/*/*/*.json"

In [10]:
df = spark.read.json(song_data, schema=song_schema)

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

In [14]:
# extract columns to create songs table
songs_table = df.select("title", 
                       "artist_id",
                       "year",
                       "duration").dropDuplicates().withColumn("song_id", monotonically_increasing_id())

In [15]:
songs_table.limit(10).toPandas()

Unnamed: 0,title,artist_id,year,duration,song_id
0,Intro,AR558FS1187FB45658,2003,75.67628,51539607552
1,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751,68719476736
2,Kutt Free (DJ Volume Remix),ARNNKDK1187B98BBD5,0,407.37914,68719476737
3,Get Your Head Stuck On Your Neck,AREDL271187FB40F44,0,45.66159,77309411328
4,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546,94489280512
5,The Urgency (LP Version),ARC43071187B990240,0,245.21098,103079215104
6,Ten Tonne,AR62SOJ1187FB47BB5,2005,337.68444,103079215105
7,Spanish Grease,AROUOZZ1187B9ABE51,1997,168.25424,146028888064
8,City Slickers,AR8IEZO1187B99055E,2008,149.86404,154618822656
9,A Higher Place (Album Version),ARBEBBY1187B9B43DB,1994,236.17261,180388626432


In [18]:
songs_table.write.parquet(output_data+"songs.parquet", mode="overwrite", partitionBy=["year","artist_id"])

In [29]:
songs_table.write.partitionBy("year", "artist_id").mode("overwrite").format("parquet").save(output_data+"songs.parquet")

In [33]:
#version 2
songs_table.write.partitionBy("year", "artist_id").format("parquet").save(output_data+"songs")

In [35]:
# version3
songs_table.write.partitionBy("year", "artist_id").parquet(output_data+"songs/")

In [19]:
# test to see if it uploaded correctly

In [20]:
from time import time

In [30]:
def load_table(input_data, sub_data):
    print(f"Start loading dataframe from {sub_data}:")
    t0 = time()
    df = spark.read.parquet(input_data+sub_data)
    load_time = time()-t0
    print("=== DONE IN: {0:.2f} sec\n".format(load_time))
    return df

In [31]:
song_df = load_table(output_data, "songs/*/*/*")

Start loading dataframe from songs/*/*/*:
=== DONE IN: 41.62 sec



In [32]:
song_df.limit(5).toPandas()

Unnamed: 0,title,duration,song_id
0,I'll Slap Your Face (Entertainment USA Theme),129.85424,68719476736
1,A Poor Recipe For Civic Cohesion,118.07302,463856467968
2,It's About Time,246.9873,515396075520
3,I'll Be Waiting,304.56118,386547056640
4,Drop of Rain,189.57016,429496729600


In [34]:
# version 2
song_df = load_table(output_data, "songs/*/*/*")
song_df.limit(5).toPandas()

Start loading dataframe from songs/*/*/*:
=== DONE IN: 101.63 sec



Unnamed: 0,title,duration,song_id
0,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281,816043786240
1,I Think My Wife Is Running Around On Me (Taco ...,186.48771,919123001344
2,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771,1211180777472
3,The Moon And I (Ordinary Day Album Version),267.7024,231928233984
4,Streets On Fire (Explicit Album Version),279.97995,1374389534720


In [36]:
# version 3
song_df = load_table(output_data, "songs/*/*/*")
song_df.limit(5).toPandas()

Start loading dataframe from songs/*/*/*:
=== DONE IN: 80.56 sec



Unnamed: 0,title,duration,song_id
0,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281,816043786240
1,I Think My Wife Is Running Around On Me (Taco ...,186.48771,919123001344
2,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771,1211180777472
3,The Moon And I (Ordinary Day Album Version),267.7024,231928233984
4,Streets On Fire (Explicit Album Version),279.97995,1374389534720


In [27]:
song_df = (spark.read
    .format("parquet")
    .option("mergeSchema", "true")
    .load(output_data + "songs/*/*/*/"))

Unnamed: 0,title,duration,song_id
0,I'll Slap Your Face (Entertainment USA Theme),129.85424,68719476736
1,A Poor Recipe For Civic Cohesion,118.07302,463856467968
2,It's About Time,246.9873,515396075520
3,I'll Be Waiting,304.56118,386547056640
4,Drop of Rain,189.57016,429496729600


In [None]:
# extract column to get artists_table
artists_table = df.select("artist_id",
                         col("artist_name").alias("name"),
                         col("artist_location").alias("location"),
                         col("artist_latitude").alias("latitude"),
                         col("artist_longitude").alias("longitude")).dropDuplicates()

# Process Log Data

In [None]:
log_schema = StructType([
        StructField("artist", StringType(), True),
        StructField("auth", StringType(), False),
        StructField("firstName", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("itemInSession", IntegerType(), False),
        StructField("lastName", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("level", StringType(), False),
        StructField("location", StringType(), True),
        StructField("method", StringType(), False),
        StructField("page", StringType(), False),
        StructField("registration", DoubleType(), True),
        StructField("sessionId", IntegerType(), False),
        StructField("song", StringType(), True),
        StructField("status", IntegerType(), False),
        StructField("ts", DoubleType(), False),
        StructField("userAgent", StringType(), True),
        StructField("userId", StringType(), True)
    ])

In [None]:
log_data = "data/log-data/*.json"

In [None]:
# read log data file
log_df = spark.read.json(log_data, schema = log_schema)

In [None]:
log_df = log_df.where(log_df.page == "NextSong")

In [None]:
users_table = log_df.select(col("userId").alias("user_id"),
                       col("firstName").alias("first_name"),
                       col("lastName").alias("last_name"),
                       "gender",
                       "level").dropDuplicates()

In [None]:
users_table.limit(5).toPandas()

## time_stamp

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
log_df = log_df.withColumn("timestamp", get_timestamp(log_df.ts))

In [None]:
log_df.limit(5).toPandas()

In [None]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
log_df = log_df.withColumn("datetime", get_datetime(log_df.ts))

In [None]:
log_df.limit(5).toPandas()

In [None]:
# extract columns to create time table
time_table = log_df.select(
    log_df.timestamp.alias('start_time'),
    hour(log_df.datetime).alias('hour'),
    dayofmonth(log_df.datetime).alias('day'),
    weekofyear(log_df.datetime).alias('week'),
    month(log_df.datetime).alias('month'),
    year(log_df.datetime).alias('year'),
    date_format(log_df.datetime, 'u').alias('weekday')).dropDuplicates().limit(10)

In [None]:
time_table.limit(5).toPandas()

In [None]:
# songs_table,
# artists_table,
# users_table,
# time_table

In [None]:
songs_logs = log_df.join(songs_table, (log_df.song == songs_table.title))

In [None]:
artists_songs_logs = songs_logs.join(artists_table, (songs_logs.artist == artists_table.name))

In [None]:
time_table.limit(5).toPandas()

In [None]:
songplays = artists_songs_logs.join(
        time_table, artists_songs_logs.timestamp == time_table.start_time, 'left')

In [None]:
songplays.limit(5).toPandas()

In [None]:
artists_songs_logs.limit(5).toPandas().timestamp