In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://rfranco-udacity-nd/"

In [4]:
# get filepath to song data file
# song_data = input_data + 'song_data/*/*/*/*.json'
song_data = input_data + 'song_data/A/A/A/*.json'
print(song_data)

s3a://udacity-dend/song_data/A/A/A/*.json


In [5]:
# read song data file
df = spark.read.json(song_data)
df.printSchema()
df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|3

In [6]:
# extract columns to create songs table
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table.show(5)

# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs.parquet'), 'overwrite')

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [7]:
# extract columns to create artists table
artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artists_table.show(5)

# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists.parquet'), 'overwrite')

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|       51.50632|        -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|           null|            null|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [8]:
# get filepath to log data file
log_data = input_data + 'log_data/*/*/*.json'
print(log_data)

s3a://udacity-dend/log_data/*/*/*.json


In [9]:
 # read log data file
df = spark.read.json(log_data)
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [10]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [11]:
# extract columns for users table    
users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']
users_table.show(5)

# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

In [12]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x)/1000)))
df = df.withColumn('timestamp', get_timestamp(df.ts))
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId| timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval

In [13]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
df = df.withColumn("datetime", get_datetime(df.ts))
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId| timestamp|            datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|2018-11-15 00:30:...|
|The Prodigy

In [16]:
# extract columns to create time table
time_columns = ["datetime as start_time",
                "hour(datetime) as hour",
                "day(datetime) as day",
                "weekofyear(datetime) as week",
                "month(datetime) as month",
                "year(datetime) as year",
                "weekday(datetime) as weekday"]
time_table = df.selectExpr(time_columns)
time_table.show(5)

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time.parquet'), 'overwrite')

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      3|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      3|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      3|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      3|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      3|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [17]:
# read in song data to use for songplays table
song_df = spark.read.parquet(os.path.join(output_data, 'songs.parquet')) 
song_df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId| timestamp|            datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|2018-11-15 00:30:...|
|The Prodigy

In [21]:
# extract columns from joined song and log datasets to create songplays table 
df = df.join(song_df, df.song == song_df.title)
songplays_table = df['ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']
songplays_table.select(monotonically_increasing_id().alias('songplay_id')).collect()
songplays_table.show(5)

# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')

AnalysisException: "Reference 'song_id' is ambiguous, could be: song_id, song_id, song_id, song_id.;"