In [1]:
import os
import configparser
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, trunc
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.types import IntegerType

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('aws.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

## Read Data - Songs & Logs

In [4]:
#data_input = "s3a://udacity-dend/"
data_input = "data/"

In [21]:
data_output = "s3a://sparkify-dl/output/"

In [23]:
from boto.s3.connection import S3Connection

conn = S3Connection(config['AWS']['AWS_ACCESS_KEY_ID'],config['AWS']['AWS_SECRET_ACCESS_KEY'])
bucket = conn.get_bucket('sparkify-dl')
for key in bucket.list():
    print(key.name.encode('utf-8'))

b'Data+Scientist+Nanodegree+Syllabus.pdf'
b'output/artists.parquet/_SUCCESS'
b'output/artists.parquet/part-00000-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00002-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00006-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00008-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00009-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00010-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00011-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00014-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00018-500c5975-cbea-4f82-bcd4-83420e0c217b-c000.snappy.parquet'
b'output/artists.parquet/part-00020-500c5975-cbea-4f82-bcd4-83420e0c217b-c000

In [11]:
#data_log = data_input + "log-data/*/*/*.json"
#data_song = data_input + "song_data/*/*/*/*.json"

data_log = data_input + "log_data/*.json"
data_song = data_input + "song_data/*/*/*/*.json"

In [12]:
print(data_log)

data/log_data/*.json


In [13]:
df_log = spark.read.json(data_log)
df_song = spark.read.json(data_song)

In [14]:
# check schema & count
df_log.printSchema()
df_log.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



8056

In [15]:
#a better show?
df_log.show(2, truncate=False)

+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+
|artist     |auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration     |sessionId|song           |status|ts           |userAgent                                                                                                                                |userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+---------------------------------------------------------------------------------------------------------------------------

In [9]:
#pandas to the rescue
pd.set_option('max_colwidth', 300)
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0,12


In [10]:
# check schema & count
df_song.printSchema()
df_song.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



71

In [11]:
#a better show?
df_song.show(2, truncate=False)

+------------------+---------------+---------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+-------------------------------+----+
|artist_id         |artist_latitude|artist_location|artist_longitude|artist_name                                                                                   |duration |num_songs|song_id           |title                          |year|
+------------------+---------------+---------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+-------------------------------+----+
|ARDR4AC1187FB371A1|null           |               |null            |Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti|511.16363|1        |SOBAYLL12A8C138AF9|Sono andati? Fingevo di dormire|0   |
|AREBBGV1187FB523D2|null           |

In [12]:
#pandas to the rescue
pd.set_option('max_colwidth', 300)
df_song.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert Hall],2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


## Process Data - Songs

In [16]:
songs_table = df_song.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()

In [14]:
songs_table.write.partitionBy("year","artist_id").parquet("output/songs.parquet")

AnalysisException: 'path file:/home/workspace/output/songs.parquet already exists.;'

In [17]:
artists_table = df_song.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

In [22]:
artists_table.write.parquet(data_output + "artists.parquet")

## Process Data - Logs

In [45]:
df_log_filtered = df_log.filter(df_log.page == 'NextSong')

In [49]:
df_log_filtered = df_log_filtered.withColumn("songplay_id",monotonically_increasing_id())

In [17]:
users_table = df_log_filtered.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

In [None]:
users_table.write.parquet("output/users.parquet")

In [51]:
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime('%H:%M:%S'))

In [52]:
get_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d'))

In [53]:
df_log_filtered = df_log_filtered.withColumn("timestamp", get_timestamp(df_log_filtered.ts))
df_log_filtered = df_log_filtered.withColumn("datetime", get_datetime(df_log_filtered.ts))

In [54]:
df_log_filtered.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- songplay_id: long (nullable = false)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [None]:
time_table = df_log_filtered.select(
    'timestamp',
    hour(to_timestamp('timestamp', 'HH:mm:ss')).alias('hour'),
    dayofmonth(to_date('datetime', 'yyyy-MM-dd')).alias('day'),
    weekofyear(to_date('datetime', 'yyyy-MM-dd')).alias('week'),
    month(to_date('datetime', 'yyyy-MM-dd')).alias('month'),
    year(to_date('datetime', 'yyyy-MM-dd')).alias('year'),
    date_format('datetime', 'u').alias('day_number')
).dropDuplicates()

In [22]:
get_weekday = udf(lambda day_number: 1 if day_number < 6 else 0)

In [23]:
time_table = df_log_filtered.select(
    'timestamp',
    hour(to_timestamp('timestamp', 'HH:mm:ss')).alias('hour'),
    dayofmonth(to_date('datetime', 'yyyy-MM-dd')).alias('day'),
    weekofyear(to_date('datetime', 'yyyy-MM-dd')).alias('week'),
    month(to_date('datetime', 'yyyy-MM-dd')).alias('month'),
    year(to_date('datetime', 'yyyy-MM-dd')).alias('year'),
    date_format('datetime', 'u').cast(IntegerType()).alias('day_number')
).dropDuplicates()

In [24]:
time_table = time_table.withColumn("weekday", get_weekday(time_table.day_number))

In [25]:
time_table = time_table.withColumnRenamed("timestamp", "start_time")

In [26]:
time_table = time_table.drop('day_number')

In [None]:
time_table.write.partitionBy("year","month").parquet("output/time.parquet")

In [27]:
df_log_filtered.createOrReplaceTempView("tbl_log")

In [28]:
songs_table.createOrReplaceTempView("tbl_songs")

In [186]:
songplays_table = spark.sql(
    "SELECT * \
       FROM ( \
    SELECT row_number() over (order by userId) as songplay_id, \
           timestamp as start_time, userId as user_id, level, song, artist, \
           sessionId as session_id, location, userAgent as user_agent \
      FROM tbl_log) AS tbl_log \
      LEFT JOIN ( \
    SELECT song_id, title, artist_id \
      FROM tbl_songs) AS tbl_songs \
        ON tbl_log.song = tbl_songs.title \ 
      LEFT JOIN ( \
    SELECT start_time, year, month \
      FROM time) AS time \
        ON tbl_log.start_time = time.start_time"
)

SyntaxError: EOL while scanning string literal (<ipython-input-186-4ad5697d6eeb>, line 11)

In [179]:
songplays_table = songplays_table.drop('song','artist','title')

In [180]:
songplays_table.show(5)

+-----------+----------+-------+-----+----------+--------------------+--------------------+------------------+------------------+
|songplay_id|start_time|user_id|level|session_id|            location|          user_agent|           song_id|         artist_id|
+-----------+----------+-------+-----+----------+--------------------+--------------------+------------------+------------------+
|          1|  04:53:36|     10| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|              null|              null|
|          2|  04:58:37|     10| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|              null|              null|
|          3|  05:04:00|     10| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|              null|              null|
|          4|  05:06:03|     10| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|SOGDBUF12A8C140FAA|AR558FS1187FB45658|
|          5|  05:08:07|     10| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...

In [None]:
songplays_table.write.partitionBy("year","month").parquet("output/songplays.parquet")

In [194]:
query = "SELECT row_number() over (order by userId) as songplay_id, "\
"tbl_log.timestamp as start_time, tbl_log.userId as user_id, tbl_log.level, "\
"tbl_log.song, df1.artist, tbl_log.sessionId as session_id, tbl_log.location, "\
"tbl_log.userAgent as user_agent, "\
"tbl_songs.song_id, tbl_songs.title, tbl_songs.artist_id, "\
"time.start_time, time.year, time.month "\
"FROM tbl_log "\
"LEFT JOIN tbl_songs ON tbl_log.song = tbl_songs.title "\ 
"LEFT JOIN time ON tbl_log.start_time = time.start_time"

SyntaxError: unexpected character after line continuation character (<ipython-input-194-fcc0b38e7d92>, line 1)

In [None]:
joined_df = spark.sql(query)

In [56]:
df_log_filtered = df_log_filtered.alias('df_log_filtered')
songs_table = songs_table.alias('songs_table')
time_table = time_table.alias('time_table')

In [68]:
songplays_table = df_log_filtered.join(songs_table, col('df_log_filtered.song') == col('songs_table.title'), 'left') \
.join(time_table, col('df_log_filtered.timestamp') == col('time_table.start_time'), 'left') \
.select('df_log_filtered.songplay_id', 
        'df_log_filtered.timestamp', 
        'df_log_filtered.userId', 
        'df_log_filtered.level', 
        'songs_table.song_id', 
        'songs_table.artist_id', 
        'df_log_filtered.sessionId', 
        'df_log_filtered.location', 
        'df_log_filtered.userAgent', 
        'time_table.year', 
        'time_table.month')

In [70]:
songplays_table = (songplays_table.withColumnRenamed("timestamp", "start_time")
.withColumnRenamed("userId", "user_id")
.withColumnRenamed("sessionId", "session_id")
.withColumnRenamed("userAgent", "user_agent"))

In [72]:
songplays_table.write.partitionBy("year","month").parquet("output/songplays.parquet")