In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('keys', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('keys', 'AWS_SECRET_ACCESS_KEY')


def create_spark_session():
    """Creates the spark session"""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [2]:
#song_data = "/home/workspace/data/song-data.zip"
# source: https://www.bogotobogo.com/python/python_files.php - is this a better way than using string concatenation?

song_data = os.path.join("/home/workspace/data/", "song_data/*/*/*/*.json")

# input_data = "s3a://udacity-dend/"
# song_data = input_data + "song_data/*/*/*/*.json"
print(song_data)
print()
    
    # read song data file
#df = spark.read.csv(song_data)
df = spark.read.json(song_data)

print("complete")

/home/workspace/data/song_data/*/*/*/*.json

complete


In [3]:
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [4]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [5]:
    # extract columns to create songs table: song_id, title, artist_id, year, duration
songs_table = df.select(["song_id","title","artist_id","year","duration"])
songs_table.show(3)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
+------------------+--------------------+------------------+----+---------+
only showing top 3 rows



In [6]:
    # write songs table to parquet files partitioned by year and artist_id
songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet("/home/workspace/data/output/song")
print("parquet done")
songs_table.show(5)
songs_table.printSchema()

parquet done
+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [7]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
    # extract columns to create artists table -artist_id, name, location, latitude, longitude
artists_table = df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])
artists_table.show(3)

+------------------+--------------------+-----------------+---------------+----------------+
|         artist_id|         artist_name|  artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+-----------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|       40.82624|       -74.47995|
+------------------+--------------------+-----------------+---------------+----------------+
only showing top 3 rows



In [9]:
    # write artists table to parquet files
artists_table.write.mode('overwrite').parquet("/home/workspace/data/output/artist")
print("parquet done")
artists_table.show(5)
artists_table.printSchema()

parquet done
+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|       40.82624|       -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- arti

In [None]:
# Log file


In [67]:
# get filepath to log data file
log_data ="/home/workspace/data/log_data/*.json"

    # read log data file
df = spark.read.json(log_data)
    
df.show(3)
df.printSchema()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [68]:
    # filter by actions for song plays
# df = df.filter('page="NextSong"')
df = df.filter(~col('page').isin(['NextSong']))
df.show(3)

    # extract columns for users table    
# user_id, first_name, last_name, gender, level
    users_table = df.select(["userId", "firstName", "lastName", "gender", "level"])
    
    # write users table to parquet files
users_table.write.mode('overwrite').parquet("/home/workspace/data/output/users")
users_table.printSchema()
users_table.show(3)


+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+-----+-----------------+---------+----+------+-------------+--------------------+------+
|artist|     auth|firstName|gender|itemInSession|lastName|length|level|            location|method| page|     registration|sessionId|song|status|           ts|           userAgent|userId|
+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+-----+-----------------+---------+----+------+-------------+--------------------+------+
|  null|Logged In|    Wyatt|     M|            0|   Scott|  null| free|Eureka-Arcata-For...|   GET| Home|1.540872073796E12|      563|null|   200|1542247071796|Mozilla/5.0 (Wind...|     9|
|  null|Logged In|   Austin|     M|            0| Rosales|  null| free|New York-Newark-J...|   GET| Home|1.541059521796E12|      521|null|   200|1542252577796|Mozilla/5.0 (Wind...|    12|
|  null|Logged In|   Samuel|     M|            1|Gonzalez|  

In [69]:
#this timestamp issue thing

# create timestamp column from original timestamp column
get_timestamp = udf(lambda value: datetime.fromtimestamp(value / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn("timestamp", get_timestamp(df.ts))

df.printSchema()
df.show(3)


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+-----+-----------------+---------+----+------+-------------+--------------------+------+-------------------+
|artist|     auth|firstName|gender|itemInSession|lastN

In [70]:
# extract columns to create time table
# https://stackoverflow.com/questions/53285032/how-do-i-convert-timestamp-to-unix-format-with-pyspark
# used a selectexpr and a function to convert timestamp into unix timestamp - maybe it will work for month, day, year etc.
# Also, exercise 2 made use of this function (Exercise 2 - Advanced Analytics NLP)

time_table = df.selectExpr("timestamp as start_time"
                           ,"hour(timestamp) as hour"
                           ,"dayofyear(timestamp) as day"
                           ,"weekofyear(timestamp) as week"
                           ,"month(timestamp) as month"
                           ,"year(timestamp) as year"
                           ,"weekday(timestamp) as weekday")

time_table.show(3)

 # write time table to parquet files partitioned by year and month
time_table.write.mode('overwrite').partitionBy("year","month").parquet("/home/workspace/data/output/time")

time_table.printSchema()
time_table.show(3)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 01:57:51|   1|319|  46|   11|2018|      3|
|2018-11-15 03:29:37|   3|319|  46|   11|2018|      3|
|2018-11-15 03:44:20|   3|319|  46|   11|2018|      3|
+-------------------+----+---+----+-----+----+-------+
only showing top 3 rows

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 01:57:51|   1|319|  46|   11|2018|      3|
|2018-11-15 03:29:37|   3|319|  46|   11|2018|      3|
|2018-11-15 03:44:20|   3|319|  46|   11|2

In [71]:
#     read in song data to use for songplays table
# song_df = spark.read.parquet("/home/workspace/data/output/song/year=1961/artist_id=ARH4Z031187B9A71F2/part-00000-d1db7576-6841-4d99-92bd-e026115a586f.c000.snappy.parquet")
song_df = spark.read.json(song_data)
song_df = song_df.select(["song_id","title","artist_id","artist_name"])

song_df.printSchema()

song_df.show(3)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)

+------------------+--------------------+------------------+--------------------+
|           song_id|               title|         artist_id|         artist_name|
+------------------+--------------------+------------------+--------------------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|Montserrat Caball...|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|Mike Jones (Featu...|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|The Dillinger Esc...|
+------------------+--------------------+------------------+--------------------+
only showing top 3 rows



In [72]:
    # extract columns from joined song (song_df) and log datasets (df) to create songplays table 
    #songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
    #page=NextSong
# df = spark.read.json(log_data)    
# df.printSchema()
# df.show(3)
    
# keep only the columns needed from the log file to join to the song file
log_df = df.filter(~col('page').isin(['NextSong']))    

log_df = df.selectExpr("timestamp as start_time"
                       ,"userId"
                       ,"level"
                       ,"sessionId"
                       ,"location"
                       ,"userAgent"
                       ,"song"
                       ,"artist")

# log_df.printSchema()
# log_df.sample(0.5).show()


In [80]:
# Join song df and log df to create songplays
table_join = log_df.join(song_df,
                              (song_df.title == log_df.song)
                              & (song_df.artist_name == log_df.artist)
                              ,"left")

table_join.show(3)
table_join.printSchema()

+-------------------+------+-----+---------+--------------------+--------------------+----+------+-------+-----+---------+-----------+
|         start_time|userId|level|sessionId|            location|           userAgent|song|artist|song_id|title|artist_id|artist_name|
+-------------------+------+-----+---------+--------------------+--------------------+----+------+-------+-----+---------+-----------+
|2018-11-15 01:57:51|     9| free|      563|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|null|  null|   null| null|     null|       null|
|2018-11-15 03:29:37|    12| free|      521|New York-Newark-J...|Mozilla/5.0 (Wind...|null|  null|   null| null|     null|       null|
|2018-11-15 03:44:20|    61| free|      597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|null|  null|   null| null|     null|       null|
+-------------------+------+-----+---------+--------------------+--------------------+----+------+-------+-----+---------+-----------+
only showing top 3 rows

root
 |-- start_time: string (

In [82]:
    # extract columns from joined song (song_df) and log datasets (df) to create songplays table 
    #songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
from pyspark.sql.functions import monotonically_increasing_id

# songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

songplays_table = table_join.selectExpr("monotonically_increasing_id() as songplay_id"
                                        ,"start_time as timestamp"
                                            ,"userId as user_id"
                                            ,"level"
                                            ,"song_id"
                                            ,"artist_id"
                                            ,"sessionId as session_id"
                                            ,"location"
                                            ,"userAgent as user_agent")


songplays_table.printSchema()
songplays_table.show(3)

root
 |-- songplay_id: long (nullable = false)
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|songplay_id|          timestamp|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|          0|2018-11-15 01:57:51|      9| free|   null|     null|       563|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|
|          1|2018-11-15 03:29:37|     12| free|   null|     null|       521|New York-Newark-J...|Mozilla/5.0 (Wind...|
|          2|2018-11-15 03:44:20|     

In [None]:
    # write songplays table to parquet files partitioned by year and month
songplays_table.write.mode('overwrite').partitionBy("year","month").parquet("/home/workspace/data/output/songplays")