# ETL with PySpark using DataFrame API

In [1]:
import configparser
from datetime import datetime, date
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"

In [5]:
spark

In [6]:
sc = spark.sparkContext
#sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

In [10]:
song_data = os.path.join(input_data, 'song_data/A/A/A/*.json')
df = spark.read.json(song_data)

In [7]:
log_data = os.path.join(input_data, 'log_data/2018/*/*.json')
df_log = spark.read.json(log_data)

In [9]:
df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001
3,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
4,ARXQBR11187B98A2CC,,"Liverpool, England",,Frankie Goes To Hollywood,821.05424,1,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,1985


In [10]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [11]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [13]:
df.count()

24

In [9]:
df_log.count()

8056

# Song Data

**Create `songs` DataFrame**

In [15]:
songs_df = df.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()

In [16]:
songs_df.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOIGHOD12A8C13B5A1|        Indian Angel|ARY589G1187B9A9F4E|2004|171.57179|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



**Create `Artist` DataFrame**

In [17]:
artist = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

In [18]:
artist.show(5)

+------------------+-------------+---------------+---------------+----------------+
|         artist_id|  artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-------------+---------------+---------------+----------------+
|ARC1IHZ1187FB4E920| Jamie Cullum|               |           null|            null|
|ARZKCQM1257509D107|   Dataphiles|               |           null|            null|
|AREWD471187FB49873|     Son Kite|               |           null|            null|
|ARGE7G11187FB37E05| Cyndi Lauper|   Brooklyn, NY|           null|            null|
|ARSVTNL1187B992A91|Jonathan King|London, England|       51.50632|        -0.12714|
+------------------+-------------+---------------+---------------+----------------+
only showing top 5 rows



# Log Data

Filter data for `NextSong` to indicate song plays

In [28]:
df_log = df_log.filter(df_log.page == 'NextSong')

In [19]:
@udf(TimestampType())
def conv_ts(ts):
    return datetime.fromtimestamp(ts/1000)

In [20]:
df_log_2 = df_log.withColumn('datetime', conv_ts('ts'))

In [21]:
df_log_2.select(
    'ts',
    'datetime',
    date_format('datetime','hh:mm:ss').alias('start_time'),
    year('datetime').alias('year'),
    month('datetime').alias('month'),
    dayofmonth('datetime').alias('dayofmonth'),
    weekofyear('datetime').alias('weekofyear')
).dropDuplicates().show(5)

+-------------+--------------------+----------+----+-----+----------+----------+
|           ts|            datetime|start_time|year|month|dayofmonth|weekofyear|
+-------------+--------------------+----------+----+-----+----------+----------+
|1542310460796|2018-11-15 19:34:...|  07:34:20|2018|   11|        15|        46|
|1542800446796|2018-11-21 11:40:...|  11:40:46|2018|   11|        21|        47|
|1542183016796|2018-11-14 08:10:...|  08:10:16|2018|   11|        14|        46|
|1542186667796|2018-11-14 09:11:...|  09:11:07|2018|   11|        14|        46|
|1542213072796|2018-11-14 16:31:...|  04:31:12|2018|   11|        14|        46|
+-------------+--------------------+----------+----+-----+----------+----------+
only showing top 5 rows



**`User` Query**

In [38]:
df_log_2.createOrReplaceTempView("log_data")

In [39]:
user = spark.sql("""
SELECT distinct
cast(f.userid as smallint) as id,
f.firstname,
f.lastname,
f.gender,
f.level
from
    (SELECT
    se.userid,
    se.firstname ,
    se.lastname ,
    se.gender,
    se.level,
    se.ts
    from
    log_data se
    join(
        SELECT
        userid,
        max(ts) as mts
        from
        log_data
        where
        userid != ''
        group by 1
    ) level_latest on
    se.userid = level_latest.userid
    and se.ts = level_latest.mts
) f
""").show(5)

+---+---------+--------+------+-----+
| id|firstname|lastname|gender|level|
+---+---------+--------+------+-----+
| 26|     Ryan|   Smith|     M| free|
| 15|     Lily|    Koch|     F| paid|
|  8|   Kaylee| Summers|     F| free|
| 17| Makinley|   Jones|     F| free|
| 82|    Avery|Martinez|     F| paid|
+---+---------+--------+------+-----+
only showing top 5 rows



**`songplays` Query**

In [41]:
df_log_2.filter(df_log.userId != '')\
        .join(df, [df_log.artist == df.artist_name, df_log.length == df.duration , df_log.song == df.title], how='left')\
        .dropDuplicates()\
        .select(
        col('userId').alias('id'),
        date_format('datetime','hh:mm:ss').alias('start_time'),
        'level', 'song_id', 'artist_id', 'sessionId', 'location', 'useragent')\
        .show(5)

+---+----------+-----+-------+---------+---------+--------------------+--------------------+
| id|start_time|level|song_id|artist_id|sessionId|            location|           useragent|
+---+----------+-----+-------+---------+---------+--------------------+--------------------+
| 44|  11:07:30| paid|   null|     null|      619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|
| 36|  09:23:55| paid|   null|     null|      728|Janesville-Beloit...|"Mozilla/5.0 (Win...|
| 88|  09:33:37| paid|   null|     null|      744|Sacramento--Rosev...|"Mozilla/5.0 (Mac...|
| 15|  11:11:22| free|   null|     null|      764|Chicago-Napervill...|"Mozilla/5.0 (X11...|
| 16|  06:53:13| paid|   null|     null|      479|Birmingham-Hoover...|"Mozilla/5.0 (Mac...|
+---+----------+-----+-------+---------+---------+--------------------+--------------------+
only showing top 5 rows

