# 1. INITIALIZE CONFIGURATION VALUES

In [1]:
import os
import configparser
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

# 2. CREATE SPARK SESSION

In [2]:
from datetime import datetime
from pyspark.sql import SparkSession

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


In [4]:
spark = create_spark_session()

# 3. READ INPUT DATA

### 3a. EXTRACT LOCAL DATA

In [5]:
from zipfile import ZipFile

In [6]:
for file in os.listdir('data'):
    if file.endswith('.zip'):
        file_name = file.split('.')[0]
        with ZipFile('data/' + file) as zp:
            zp.extractall('extracted_data/' + file_name)
    print(file)

log-data.zip
song-data.zip


### 3b. READ LOCAL INPUT DATA FOR TESTING

In [5]:
local_song_path = config['LOCAL']['SONG_DATA_PATH']
local_log_path = config['LOCAL']['LOG_DATA_PATH']
local_song_data = spark.read.json(local_song_path)
local_log_data = spark.read.json(local_log_path)

In [6]:
print("Count: ", local_song_data.count())
local_song_data.printSchema()
print(local_song_data.take(1)), print()
local_song_data.show(10)

Count:  71
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0)]

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|       

In [7]:
print("Count: ", local_log_data.count())
local_log_data.printSchema()
print(local_log_data.take(1)), print()
local_log_data.show(10)

Count:  8056
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status

### 3c. READ DATA FROM S3 FOR PRODUCTION

In [46]:
config.read('dl.cfg')
aws_song_path = config['AWS']['SONG_DATA_PATH']
aws_log_path = config['AWS']['LOG_DATA_PATH']
config['AWS']['SONG_DATA_PATH'], config['AWS']['LOG_DATA_PATH']

('s3a://udacity-dend/song_data/A/A/A/TRAAAAW128F429D538.json',
 's3a://udacity-dend/log-data/*/*/*.json')

In [17]:
import time
start = time.time()
aws_log_data = spark.read.format("json").load(config['AWS']['LOG_DATA_PATH'])
aws_song_data = spark.read.format("json").load(config['AWS']['SONG_DATA_PATH'])
print(time.time() - start, 'seconds')


KeyboardInterrupt: 

In [None]:
print("Count: ", aws_song_data.count())
aws_song_data.printSchema()
print(aws_song_data.take(1)), print()
aws_song_data.show(10)

In [None]:
print("Count: ", aws_log_data.count())
aws_log_data.printSchema()
print(aws_log_data.take(1)), print()
aws_log_data.show(10)

# 4. CREATE TABLES

In [8]:
# SWITCH LINES FOR LOCAL VS AWS USE

song_data = local_song_data
log_data = local_log_data
output_path = config['LOCAL']['OUTPUT_DATA_PATH']

# song_data = aws_song_data
# log_data = aws_log_data
# output_path = config['AWS']['OUTPUT_DATA_PATH']

### 4a. CREATE SONGS TABLE FROM SONG DATA

In [None]:
song_data.createOrReplaceTempView("song_data")

In [10]:
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM song_data
    ORDER BY song_id
""")
print("Count: ", songs_table.count())
songs_table.printSchema()
print(songs_table.take(1)), print()
songs_table.show(10)


Count:  71
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

[Row(song_id='SOAOIBZ12AB01815BE', title='I Hold Your Hand In Mine [Live At Royal Albert Hall]', artist_id='ARPBNLO1187FB3D52F', year=2000, duration=43.36281)]

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOBEBDG12A58A76D60|   

In [53]:
songs_table.write.mode("overwrite")\
                 .partitionBy("year", "artist_id")\
                 .parquet(output_path + 'songs/')

### 4b. CREATE ARTISTS TABLE FROM SONG DATA

In [11]:
artists_table = spark.sql("""
    SELECT DISTINCT artist_id, 
                    artist_name AS name, 
                    artist_location AS location, 
                    artist_latitude AS latitude, 
                    artist_longitude AS longitude
    FROM song_data
    ORDER BY artist_id desc
""")
print("Count: ", artists_table.count())
artists_table.printSchema()
print(artists_table.take(1)), print()
artists_table.show(10)


Count:  69
root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

[Row(artist_id='ARYKCQI1187FB3B18F', artist_name='Tesla', artist_location='', artist_latitude=None, artist_longitude=None)]

+------------------+------------------+---------------+---------------+----------------+
|         artist_id|       artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------------+---------------+---------------+----------------+
|ARYKCQI1187FB3B18F|             Tesla|               |           null|            null|
|ARXR32B1187FB57099|               Gob|               |           null|            null|
|ARWB3G61187FB49404|       Steve Morse| Hamilton, Ohio|           null|            null|
|ARVBRGZ1187FB4675A|      Gwen Stefani|               |           null|            null|
|ARULZCI1

In [52]:
artists_table.write.mode("overwrite").parquet(output_path + 'artists/')

### 4c. CREATE TIME TABLE FROM LOG DATA

In [57]:
from datetime import datetime
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, to_date
from pyspark.sql import types as t

In [58]:
MILLISECONDS = 1000

In [13]:
extract_timestamp = udf(lambda x: datetime.fromtimestamp(x / MILLISECONDS), t.TimestampType())
log_data = log_data.withColumn("timestamp", extract_timestamp(log_data.ts))

In [14]:
log_data.createOrReplaceTempView("log_data")

In [15]:
time_table = spark.sql("""
    SELECT DISTINCT  timestamp AS start_time,
                     hour(timestamp) AS hour,
                     day(timestamp)  AS day,
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM log_data
    ORDER BY datetime
""")
print("Count: ", time_table.count())
time_table.printSchema()
print(time_table.take(1)), print()
time_table.show(10)


Count:  8023
root
 |-- datetime: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

[Row(datetime=datetime.datetime(2018, 11, 1, 20, 57, 10, 796000), hour=20, day=1, week=44, month=11, year=2018, weekday=5)]

+--------------------+----+---+----+-----+----+-------+
|            datetime|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 20:57:...|  20|  1|  44|   11|2018|      5|
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:02:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:...|  21| 

In [51]:
time_table.write.mode("overwrite")\
                .partitionBy("year", "month")\
                .parquet(output_path + 'time/')

### 4d. CREATE USERS TABLE FROM LOG DATA

In [55]:
users_table = spark.sql("""
    SELECT  DISTINCT userId    AS user_id, 
                     firstName AS first_name, 
                     lastName  AS last_name, 
                     gender, 
                     level
    FROM log_data
    ORDER BY last_name
""")

print("Count: ", users_table.count())
users_table.printSchema()
print(users_table.take(1)), print()
users_table.show(10)

Count:  107
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

[Row(user_id='', first_name=None, last_name=None, gender=None, level='free')]

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|       |      null|     null|  null| free|
|       |      null|     null|  null| paid|
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
|     91|    Jayden|     Bell|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     58|     Emily|   Benson|     F| paid|
+-------+----------+---------+------+-----+
only showing top 10 rows



In [56]:
users_table.write.mode("overwrite").parquet(output_path + 'users/')

### 4e. CREATE SONGPLAYS TABLE FROM LOG DATA

In [44]:
songplays_table = spark.sql("""
    SELECT  log_data.timestamp AS start_time, 
            log_data.userId AS user_id, 
            log_data.level, 
            song_data.song_id, 
            song_data.artist_id ,
            log_data.sessionId AS session_id, 
            log_data.location, 
            log_data.userAgent AS user_agent

    FROM log_data 
    INNER JOIN song_data 
    ON song_data.artist_name = log_data.artist 
    WHERE log_data.page = 'NextSong'
""")
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())
songplays_table = songplays_table.select('songplay_id', *songplays_table.columns[:len(songplays_table.columns)-1])

print("Count: ", songplays_table.count())
songplays_table.printSchema()
print(songplays_table.take(1)), print()
songplays_table.show(10)

Count:  21
root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

[Row(songplay_id=0, start_time=datetime.datetime(2018, 11, 15, 20, 32, 47, 796000), user_id='44', level='paid', song_id='SOBONFF12A6D4F84D8', artist_id='ARIK43K1187B9AE54C', session_id=619, location='Waterloo-Cedar Falls, IA', user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0')]

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+

In [50]:
songplays_table.write.mode("overwrite").parquet(output_path + 'song_plays/')