In [2]:
import configparser
import os
import sys

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .appName("demo")\
                     .getOrCreate()

In [4]:
# option 1
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
# option 2
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIA3D75VCVQZIRUQ3UT")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")

In [30]:
song_df = spark.read.json("data/song-data/*")
# or
#song_df = spark.read.json("data/song-data/*/*.json")
#song_df = spark.read.json("s3a://udacity-dend/song_data/*/*/*/*.json")

In [None]:
song_df.show(50)

In [None]:
song_df.printSchema()

In [8]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf

In [31]:
songs_df = song_df.selectExpr("song_id", "title", "artist_id", "cast(year as int)", "duration").dropDuplicates()

In [None]:
songs_df.show(50)

In [13]:
songs_df.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
1,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
2,SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546
3,SOUDSGM12AC9618304,Insatiable (Instrumental Version),ARNTLGG11E2835DDB9,0,266.39628
4,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506


In [9]:
#songs_path = os.path.join('s3a://udacity-data-lakes-supratim/', 'songs')
songs_path = os.path.join('output', 'songs')
songs_df.write.parquet(songs_path, mode='overwrite', partitionBy=["year", "artist_id"])

In [58]:
# artist_id, name, location, latitude, longitude
artists_df = song_df.selectExpr("artist_id", 
                                   "artist_name as name", 
                                   "coalesce(nullif(artist_location, ''), 'N/A') as location",
                                   "coalesce(artist_latitude, 0.0) as latitude", 
                                   "coalesce(artist_longitude, 0.0) as longitude").dropDuplicates()

In [66]:
artists_df.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARKFYS91187B98E58F,Jeff And Sheri Easter,,0.0,0.0
1,ARD7TVE1187B99BFB1,Casual,California - LA,0.0,0.0
2,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
3,ARD0S291187B9B7BF5,Rated R,Ohio,0.0,0.0
4,ARXR32B1187FB57099,Gob,,0.0,0.0


In [None]:
artists_path = os.path.join('output', 'artists')
artists_df.write.parquet(artists_path, mode='overwrite')

#### facts
**songplays**
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#### dims
**time**
start_time, hour, day, week, month, year, weekday  
**users**
user_id, first_name, last_name, gender, level

In [6]:
log_df = spark.read.json("data/log-data/*")

In [7]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
log_df.count()

8056

In [9]:
log_df_clean = log_df.filter(log_df.ts.isNotNull())

In [10]:
log_df.count()

8056

#### Sometimes timestamp can be in milli-seconds or micro-seconds - in that case you have to bring it into seconds (max 10 digits integer)

In [11]:
from datetime import datetime

In [12]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType
# you can't use in udf sql.functions
@udf(TimestampType())
def parseDate(col_name):
    col_name_div = col_name/1000
    col_name_converted = datetime.fromtimestamp(col_name_div)
    return col_name_converted

In [29]:
# start_time, hour, day, week, month, year, weekday
clean_df = log_df_clean.withColumn("timestamp", parseDate(log_df["ts"]))
clean_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,2018-11-15 01:57:51.796
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12,2018-11-15 03:29:37.796


In [25]:
# time: start_time, hour, day, week, month, year, weekday
time_df = clean_df.selectExpr("timestamp as start_time", 
                    "hour(timestamp) as hour", 
                    "dayofmonth(timestamp) as day", 
                    "weekofyear(timestamp) as week", 
                    "month(timestamp) as month",
                    "year(timestamp) as year",
                    "date_format(timestamp, 'E') as weekday")

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 01:57:...|   1| 15|  46|   11|2018|    Thu|
|2018-11-15 03:29:...|   3| 15|  46|   11|2018|    Thu|
+--------------------+----+---+----+-----+----+-------+



In [None]:
clean_df.printSchema()

In [None]:
log_df.withColumn("timestamp", F.to_timestamp(log_df["ts"]/1000)).toPandas()

In [26]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [28]:
#users: user_id, first_name, last_name, gender, level
user_df = clean_df.selectExpr("userId as user_id", 
                              "firstName as first_name", 
                              "lastName as last_name", 
                              "gender", 
                              "coalesce(level, 'N/A')"
                             )

In [84]:
#songplays: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_df = clean_df.alias("lg")\
                       .filter(clean_df.page=="NextSong")\
                       .join(song_df.alias("sg"), clean_df.song==song_df.title, "leftouter")\
                       .join(artists_df.alias("ar"), clean_df.artist==artists_df.name, "leftouter")\
                       .selectExpr("lg.timestamp as start_time",
                                   "lg.userId as user_id",
                                   "lg.level as level",
                                   "sg.song_id as song_id",
                                   "coalesce(ar.artist_id, sg.artist_id) as artist_id",
                                   "lg.sessionId as session_id",
                                   "ar.location as location",
                                   "lg.userAgent as user_agent")\
                        .dropDuplicates()
songplays_df.limit(20).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-15 06:21:33.796,15,paid,,,582,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,2018-11-15 10:40:30.796,80,paid,,,611,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
2,2018-11-15 21:06:44.796,49,paid,,,630,,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...
3,2018-11-21 13:44:45.796,97,paid,,,806,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
4,2018-11-14 08:54:23.796,58,paid,,,522,,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK..."
5,2018-11-14 10:30:29.796,29,paid,,,559,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
6,2018-11-14 11:07:07.796,15,paid,,,557,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
7,2018-11-14 16:44:59.796,80,paid,,,574,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
8,2018-11-28 14:15:53.796,69,free,,,1019,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
9,2018-11-28 14:27:54.796,69,free,,,1019,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
