# Test Data Lake project in Local

In [1]:
# Import necessary libraries
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [4]:
# Create spark session with hadoop-aws package
spark = SparkSession.builder.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

### Test song data

In [41]:
# Read data files
song_path_local = "./data/song-data/*/*/*/*.json"
df_song = spark.read.json(song_path_local)

In [42]:
# Show schema
df_song.printSchema()
df_song.show(3)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+-----------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|      artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+-----------------+---------+---------+------------------+--------------------+----+
|ARNF6401187FB57032|       40.79086|New York, NY [Man...|       -73.96644|Sophie B. Hawkins|  305.162|   

In [43]:
songSchema = StructType([
        StructField("artist_id", StringType()),
        StructField("artist_latitude", DoubleType()),
        StructField("artist_location", StringType()),
        StructField("artist_longitude", DoubleType()),
        StructField("artist_name", StringType()),
        StructField("duration", DoubleType()),
        StructField("num_songs", IntegerType()),
        StructField("song_id", StringType()),
        StructField("title", StringType()),
        StructField("year", IntegerType())
])

dfSongWithSchema = spark.read.json(song_path_local, schema=songSchema)

In [44]:
dfSongWithSchema.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [45]:
dfSongWithSchema.show(3)

+------------------+---------------+--------------------+----------------+-----------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|      artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+-----------------+---------+---------+------------------+--------------------+----+
|ARNF6401187FB57032|       40.79086|New York, NY [Man...|       -73.96644|Sophie B. Hawkins|  305.162|        1|SONWXQJ12A8C134D94|The Ballad Of Sle...|1994|
|ARLTWXK1187FB5A3F8|       32.74863|      Fort Worth, TX|       -97.32925|      King Curtis|326.00771|        1|SODREIN12A58A7F2E5|A Whiter Shade Of...|   0|
|ARPFHN61187FB575F6|       41.88415|         Chicago, IL|       -87.63241|      Lupe Fiasco|279.97995|        1|SOWQTQZ12A58A7B63E|Streets On Fire (...|   0|
+------------------+---------------+----------------

In [46]:
song_fields = ["song_id", "title", "artist_id", "year", "duration"]

In [53]:
# songs_table = dfSongWithSchema.select(song_fields).dropDuplicates()
songs_table = dfSongWithSchema.select(song_fields)
# songs_table = df.select(song_fields).dropDuplicates().withColumn("song_id", monotonically_increasing_id())

In [54]:
songs_table.count()

12

In [28]:
# output path
output_data = "./output/"

In [59]:
# output data
songs_table.write.partitionBy("year", "artist_id").parquet(output_data_local + 'songs/')

In [62]:
# artist table
artists_fields = ["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"]
artists_table = dfSongWithSchema.selectExpr(artists_fields)

In [63]:
artists_table.show(3)

+------------------+-----------------+--------------------+--------+---------+
|         artist_id|             name|            location|latitude|longitude|
+------------------+-----------------+--------------------+--------+---------+
|ARNF6401187FB57032|Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|ARLTWXK1187FB5A3F8|      King Curtis|      Fort Worth, TX|32.74863|-97.32925|
|ARPFHN61187FB575F6|      Lupe Fiasco|         Chicago, IL|41.88415|-87.63241|
+------------------+-----------------+--------------------+--------+---------+
only showing top 3 rows



In [65]:
artists_table.write.parquet(output_data_local + 'artists/')

### Test log data

In [46]:
log_path_local = "./data/log-data/*.json"

In [54]:
df = spark.read.json(log_path_local)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [94]:
dfUser = df.withColumn("registration_cast", df["registration"].cast(DecimalType()))

In [95]:
dfUser.where(col("registration_cast").isNull()).show()

+--------------------+---------+---------+------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+--------------------+--------------------+------+-----------------+
|              artist|     auth|firstName|gender|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|                  ts|           userAgent|userId|registration_cast|
+--------------------+---------+---------+------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+--------------------+--------------------+------+-----------------+
|N.E.R.D. FEATURIN...|Logged In|   Jayden|     M|     Fox| 288.9922| free|New Orleans-Metai...|   PUT|NextSong|1.541033612796E12|      184|Am I High (Feat. ...|   200|50806-03-18 10:19...|"Mozilla/5.0 (Win...|   101|             null|
|                null|Logged In|  Stefany|     F|   White|  

In [96]:
logSchema = StructType([
        StructField("artist", StringType()),
        StructField("auth", StringType()),
        StructField("firstName", StringType()),
        StructField("gender", StringType()),
        StructField("lastName", StringType()),
        StructField("length", DoubleType()),
        StructField("level", StringType()),
        StructField("location", StringType()),
        StructField("method", StringType()),
        StructField("page", StringType()),
        StructField("registration", StringType()), # should be DecimalType
        StructField("sessionId", IntegerType()),
        StructField("song", StringType()),
        StructField("status", IntegerType()),
        StructField("ts", TimestampType()),
        StructField("userAgent", StringType()),
        StructField("userId", StringType()) # should be Integer
    ])
df = spark.read.json(log_path_local, schema=logSchema)

In [102]:
# df.printSchema()
df = spark.read.json(log_path_local)
df.show(3)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|N.E.R.D. FEATURIN...|Logged In|   Jayden|     M|            0|     Fox| 288.9922| free|New Orleans-Metai...|   PUT|NextSong|1.541033612796E12|      184|Am I High (Feat. ...|   200|1541121934796|"Mozilla/5.0 (Win...|   101|
|                null|Logged In|  Stefany|     F|            0|   White|     null| free|         Lubbock

In [97]:
df = df.withColumn("userId", df["userId"].cast(IntegerType()))

In [99]:
df.printSchema()
df.show(3)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)

+--------------------+---------+---------+------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+--------------------+--------------------+------+
|              artist|     auth|firstName|gender|lastName|   length|level|            location|method|    page|     r

In [120]:
dfNextSong = df.filter(df.page == 'NextSong')
dfNextSong.show(3)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|N.E.R.D. FEATURIN...|Logged In|   Jayden|     M|            0|     Fox| 288.9922| free|New Orleans-Metai...|   PUT|NextSong|1.541033612796E12|      184|Am I High (Feat. ...|   200|1541121934796|"Mozilla/5.0 (Win...|   101|
| Death Cab for Cutie|Logged In|  Stefany|     F|            1|   White|216.42404| free|         Lubbock

In [121]:
dfNextSong = dfNextSong.withColumn("userId", dfNextSong["userId"].cast(IntegerType()))

In [114]:
users_fields = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = dfNextSong.selectExpr(users_fields)
users_table.count()

266

In [109]:
users_table.show(3)
users_table.where(col("user_id").isNull()).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|    101|    Jayden|      Fox|     M| free|
|     83|   Stefany|    White|     F| free|
|     83|   Stefany|    White|     F| free|
+-------+----------+---------+------+-----+
only showing top 3 rows

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
+-------+----------+---------+------+-----+



In [110]:
users_table.na.drop()

DataFrame[user_id: int, first_name: string, last_name: string, gender: string, level: string]

In [112]:
users_table.show(3)
users_table.where(col("user_id").isNull()).show()
users_table.count()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|    101|    Jayden|      Fox|     M| free|
|     83|   Stefany|    White|     F| free|
|     83|   Stefany|    White|     F| free|
+-------+----------+---------+------+-----+
only showing top 3 rows

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
+-------+----------+---------+------+-----+



266

In [33]:
users_table.write.parquet(output_data + "users/")

In [122]:
# convert date time
from datetime import datetime

get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
dfNextSong = dfNextSong.withColumn("timestamp", get_timestamp('ts'))

In [123]:
dfNextSong.show(3)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|           timestamp|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|N.E.R.D. FEATURIN...|Logged In|   Jayden|     M|            0|     Fox| 288.9922| free|New Orleans-Metai...|   PUT|NextSong|1.541033612796E12|      184|Am I High (Feat. ...|   200|1541121934796|"Mozilla/5.0 (Win...|   101|2018-11-02 01:25:...|
| Death Cab for Cuti

In [132]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0), DateType())
dfNextSong = dfNextSong.withColumn("start_time", get_datetime('ts'))

In [133]:
dfNextSong.show(3)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------+----------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|           timestamp|  datetime|start_time|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------+----------+
|N.E.R.D. FEATURIN...|Logged In|   Jayden|     M|            0|     Fox| 288.9922| free|New Orleans-Metai...|   PUT|NextSong|1.541033612796E12|      184|Am I High (Feat. ...|   200|1541121934796|"Moz

In [136]:
time_table = dfNextSong.select("start_time").dropDuplicates() \
        .withColumn("hour", hour(col("start_time"))).withColumn("day", dayofmonth(col("start_time"))) \
        .withColumn("week", weekofyear(col("start_time"))).withColumn("month", month(col("start_time"))) \
        .withColumn("year", year(col("start_time"))).withColumn("weekday", date_format(col("start_time"), 'E'))

In [137]:
time_table.show(5)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|2018-11-02|   0|  2|  44|   11|2018|    Fri|
|2018-11-01|   0|  1|  44|   11|2018|    Thu|
|2018-11-03|   0|  3|  44|   11|2018|    Sat|
+----------+----+---+----+-----+----+-------+



In [138]:
time_table.write.partitionBy("year", "month").parquet(output_data + 'time_local/')

In [141]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + 'songs/*/*/*')

In [142]:
song_df.show(3)

+------------------+--------------------+---------+
|           song_id|               title| duration|
+------------------+--------------------+---------+
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|  305.162|
+------------------+--------------------+---------+
only showing top 3 rows



In [143]:
artists_df = spark.read.parquet(output_data + 'artists/*')

In [144]:
artists_df.show(3)

+------------------+-----------------+--------------------+--------+---------+
|         artist_id|             name|            location|latitude|longitude|
+------------------+-----------------+--------------------+--------+---------+
|ARNF6401187FB57032|Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|ARLTWXK1187FB5A3F8|      King Curtis|      Fort Worth, TX|32.74863|-97.32925|
|ARPFHN61187FB575F6|      Lupe Fiasco|         Chicago, IL|41.88415|-87.63241|
+------------------+-----------------+--------------------+--------+---------+
only showing top 3 rows



In [146]:
songs_logs = dfNextSong.join(song_df, (dfNextSong.song == song_df.title))
songs_logs.show(2)

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|timestamp|datetime|start_time|song_id|title|duration|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+



In [158]:
artists_songs_logs = songs_logs.join(artists_df, (songs_logs.artist == artists_df.name)).drop(artists_df.location)

In [159]:
artists_songs_logs.show(2)

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+---------+----+--------+---------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|timestamp|datetime|start_time|song_id|title|duration|artist_id|name|latitude|longitude|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+---------+----+--------+---------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+----------+-------+-----+--------+---------+----+--------+---------+



In [176]:
songplays = artists_songs_logs.join(time_table, (artists_songs_logs.start_time == time_table.start_time), 'left').drop(artists_songs_logs.start_time)

In [161]:
songplays.show(2)

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+-------+-----+--------+---------+----+--------+---------+----------+----+---+----+-----+----+-------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|timestamp|datetime|song_id|title|duration|artist_id|name|latitude|longitude|start_time|hour|day|week|month|year|weekday|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+-------+-----+--------+---------+----+--------+---------+----------+----+---+----+-----+----+-------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---------+--------+-------+-----+--------+---------+----+--------

In [181]:
songplays = songplays.withColumn("songplay_id", F.monotonically_increasing_id())

In [186]:
songplays_table = songplays.select(
        col("songplay_id"),
        col('start_time'),
        col('userId').alias('user_id'),
        col('level'),
        col('song_id'),
        col('artist_id'),
        col('sessionId'),
        col('location'),
        col('userAgent').alias('user_agent'),
        col('year').alias('year'),
        col('month').alias('month')
    )

In [187]:
songplays_table.show(3)
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

+-----------+----------+-------+-----+-------+---------+---------+--------+----------+----+-----+
|songplay_id|start_time|user_id|level|song_id|artist_id|sessionId|location|user_agent|year|month|
+-----------+----------+-------+-----+-------+---------+---------+--------+----------+----+-----+
+-----------+----------+-------+-----+-------+---------+---------+--------+----------+----+-----+



In [188]:
songplays_table.write.partitionBy("year", "month").parquet(output_data + 'songplays_local/')

# Test with S3 data

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
                    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
                    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
                    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
                    .getOrCreate()

In [4]:
song_data = "s3a://udacity-dend/song_data/A/B/C/*.json"

In [5]:
# dfSong = spark.read.json(song_data)
dfSong = spark.read.format("json").load(song_data)

In [7]:
dfSong.printSchema()
dfSong.show(3)
dfSong.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------+----------------+-------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|  artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+-------------+---------+---------+------------------+--------------------+----+
|ARLTWXK1187FB5A3F8|       32.74863| Fort Worth, TX|       -97.32925|  King Curtis|326.00771|        1|SODREIN12A58A7F2E5|A Whiter S

23