# Data Exploration

This notebook supports the understanding from the json files in order to develop the ETL process.

In [1]:
import os
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))
    
# AWS parameters to use "Programmatic access"
key = config.get('AWS','aws_access_key_id')
secret = config.get('AWS','aws_secret_access_key')
    
# Set AWS credentions as enviroment variables
os.environ['AWS_ACCESS_KEY_ID'] = key 
os.environ['AWS_SECRET_ACCESS_KEY'] = secret

# Bucket source
!aws s3 ls udacity-dend

                           PRE data-pipelines/
                           PRE log-data/
                           PRE log_data/
                           PRE pagila/
                           PRE song-data/
                           PRE song_data/
                           PRE udac-data-pipelines/
2019-04-02 16:58:44        456 log_json_path.json


In [3]:
# Start Spark Session
spark = SparkSession \
            .builder \
            .appName("Data Exploration") \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .getOrCreate()
spark

In [4]:
# Config Spark to read files from S3
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

In [9]:
song = spark.read.parquet("s3a://udacity-de-output-data-lake/songs_table/")
artist = spark.read.parquet("s3a://udacity-de-output-data-lake/artists_table/")
song.createOrReplaceTempView('songs_table')
song.createOrReplaceTempView('artists_table')

In [12]:
song.show(2)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...| 94.56281|   0|ARSUVLW12454A4C8B8|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|227.10812|2001|ARWUNH81187FB4A3E0|
|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|221.70077|   0|ARLRWBW1242077EB29|
|SODBHKO12A58A77F36|Fingers Of Love (...|335.93424|   0|ARKGS2Z1187FB494B5|
|SOGXFIF12A58A78CC4|Hanging On (Mediu...|204.06812|   0|AR5LZJD1187FB4C5E5|
|SOZCRVP12A81C21F40|Welcome To The Do...| 46.94159|2008|AR4503S1187FB43199|
|SOOBEML12A8C138C91|Johnny Leary's Po...|  197.642|   0|ARP4O0W1187FB5A06B|
|SOUOPFM12AB0185809|You'd Be So Nice ...|405.41995|   0|ARSXDJO1269FCD9405|
|SOVJXVJ12A8C13517D|Where The Thunder...|298.84036|   0|ARCCRTI11F4C845308|
|SOKTJDS12AF72A25E5|Drown In My Own T...|  192.522|   0|ARA23XO1187B9AF18F|
|SOHHANU12A5

In [13]:
artist.show(2)

+------------------+-----------------+--------------------+---------------+----------------+
|         artist_id|      artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+-----------------+--------------------+---------------+----------------+
|ARA870V1187FB3C4B7|Hank Williams Jr.|           Paris, TN|       36.30188|       -88.32588|
|ARCZC791187B991DF4|      Chris Isaak|Stockton, California|           null|            null|
+------------------+-----------------+--------------------+---------------+----------------+
only showing top 2 rows



In [15]:
artist_song_df = spark.sql(
        """
            select
                a.artist_id,
                s.song_id,
                a.artist_name,
                s.title,
                s.duration
            from 
                songs_table s
            inner join 
                artists_table a
            on s.artist_id = a.artist_id
        """)
artist_song_df.createOrReplaceTempView("artists_songs")

AnalysisException: "cannot resolve '`a.artist_name`' given input columns: [a.duration, s.year, a.title, s.song_id, a.artist_id, a.year, s.duration, s.artist_id, s.title, a.song_id]; line 5 pos 16;\n'Project [artist_id#102, song_id#36, 'a.artist_name, title#37, duration#38]\n+- Join Inner, (artist_id#40 = artist_id#102)\n   :- SubqueryAlias `s`\n   :  +- SubqueryAlias `songs_table`\n   :     +- Relation[song_id#36,title#37,duration#38,year#39,artist_id#40] parquet\n   +- SubqueryAlias `a`\n      +- SubqueryAlias `artists_table`\n         +- Relation[song_id#98,title#99,duration#100,year#101,artist_id#102] parquet\n"

## Songs Data

In [6]:
## Read a sample
songs = spark.read.json("s3a://udacity-dend/song-data/A/A/*")

In [7]:
songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
songs.show(10)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni...|  94.56281|        1|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|   0|
|ARXQC081187FB4AD42|       54.31407|                  UK|        -2.23001|William Shatner_ ...|1047.71873|        1|SOXRPUH12AB017F769|Exodus: Part I: M...|   0|
|ARWUNH81187FB4A3E0|           null|     Miami , Florida|            null|         Trick Daddy| 227.10812|        1|SOVNKJI12A8C13CB0D|Take It To Da Hou...|2001|
|ARTC1LV1187B9A4858|        

The songs will be splited in 2 tables, one with the songs and another with the artists.

The song table schema:
- song_id
- title
- artist_id
- year
- duration

The artist table schema:
- artist_id
- name
- location
- latitude
- longitude

In [9]:
songs.createOrReplaceTempView("song_table")

In [10]:
song_table = spark.sql(
        """
        select 
            song_id,
            title,
            artist_id,
            year,
            duration
        from 
            song_table
        where
            song_id is not null
        """
)

In [11]:
song_table.show(5)

+------------------+--------------------+------------------+----+----------+
|           song_id|               title|         artist_id|year|  duration|
+------------------+--------------------+------------------+----+----------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|ARSUVLW12454A4C8B8|   0|  94.56281|
|SOXRPUH12AB017F769|Exodus: Part I: M...|ARXQC081187FB4AD42|   0|1047.71873|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|ARWUNH81187FB4A3E0|2001| 227.10812|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972| 301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|   192.522|
+------------------+--------------------+------------------+----+----------+
only showing top 5 rows



In [12]:
artist_table = spark.sql(
        """
        select 
            distinct artist_id,
            artist_name name,
            artist_location,
            artist_latitude,
            artist_longitude
        from 
            song_table
        where
            artist_id is not null
        """
)

In [13]:
artist_table.show(5)

+------------------+---------------+----------------+---------------+----------------+
|         artist_id|           name| artist_location|artist_latitude|artist_longitude|
+------------------+---------------+----------------+---------------+----------------+
|ARB57BN1187B9B5EAF|  36 Crazyfists|   Anchorage, AK|       61.21756|      -149.85776|
|ARMI4NV1187B99D55D|        Man Man|Philadelphia, PA|       39.95227|       -75.16237|
|ARKCTSM11F4C83C839|Igor Stravinsky|                |           null|            null|
|AR8YYNB1187B9A4BB3|  Assemblage 23|                |           null|            null|
|ARDDQKN1187FB50651|         Rednex|                |           null|            null|
+------------------+---------------+----------------+---------------+----------------+
only showing top 5 rows



## Logs Data

In [14]:
## Read a sample of data to process
logs = spark.read.json("s3a://udacity-dend/log_data/*/*/")

In [15]:
logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [16]:
logs.show(10)

+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12| 

In [17]:
logs.createOrReplaceTempView('log_table')

The log data will be splited in users, songplay, and time tables:
 
The users schema:
- user id
- first_name
- last_name
- gender
- level

The songplay schema:
- songplay_id
- start_time
- user_id
- level
- song_id
- artist_id
- session_id
- location
- user_agent

The time schema:
- start_time
- hour
- day
- week
- month
- year
- weekDay

In [18]:
spark.sql("""select * from log_table limit 5""").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [19]:
user = spark.sql(
    """            
    select 
        distinct  userId user_id,
        firstName first_name,
        lastName last_name,
        gender,
        level 
    from 
        log_table    
    where 
        page = 'NextSong'
        and userId is not null
    """)

In [60]:
user.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
|     89|   Kynnedi|  Sanchez|     F| free|
|     57| Katherine|      Gay|     F| free|
|     74|    Braden|   Parker|     M| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     75|    Joseph|Gutierrez|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     88|  Mohammad|Rodriguez|     M| free|
|     64|    Hannah|  Calhoun|     F| free|
|     15|      Lily|     Koch|     F| free|
|     95|      Sara|  Johnson|  

In [87]:
song_play = spark.sql(
    """
    select  
        to_timestamp(l.ts/1000) as start_time,
        l.userId as user_id,
        l.level,
        s.song_id,
        s.artist_id,
        l.sessionId as session_id,
        l.location,
        l.userAgent as user_agent
    from 
        log_table l
                           
    inner join
        song_table s
    on l.song = s.title and l.artist = s.artist_name and l.length = s.duration
                           
    where 
        l.page = 'NextSong'   
    """
)

In [103]:
song_play.show()
song_play.createOrReplaceTempView("song_play")

+--------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|          start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+--------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|2018-11-21 21:56:...|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|2018-11-05 17:49:...|    73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|      255|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|
|2018-11-13 22:39:...|    55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|      415|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|
|2018-11-16 14:21:...|    85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|      436|       Red Bluff, CA|"Mozilla/5.0 (Mac...|
|2018-11-20 17:46:...|    49| paid|SOCHRXB12A8AE48069|ARTDQRC1187FB4EFD4|      758|San Francisco-Oak...|Mozilla/5.0 (W

In [104]:
time_table = spark.sql(
    """
    select 
      start_time,
       extract(hour from start_time) as hour,
       extract(day from start_time) as day,
       extract(week from start_time) as week, 
       extract(month from start_time) as month,
       extract(year from start_time) as year, 
       extract(dayofweek from start_time) as day_of_week
    from 
        song_play
    """
)

In [105]:
time_table.show()

+--------------------+----+---+----+-----+----+-----------+
|          start_time|hour|day|week|month|year|day_of_week|
+--------------------+----+---+----+-----+----+-----------+
|2018-11-21 21:56:...|  21| 21|  47|   11|2018|          4|
|2018-11-05 17:49:...|  17|  5|  45|   11|2018|          2|
|2018-11-13 22:39:...|  22| 13|  46|   11|2018|          3|
|2018-11-16 14:21:...|  14| 16|  46|   11|2018|          6|
|2018-11-20 17:46:...|  17| 20|  47|   11|2018|          3|
|2018-11-24 12:43:...|  12| 24|  47|   11|2018|          7|
|2018-11-29 21:00:...|  21| 29|  48|   11|2018|          5|
|2018-11-27 18:09:...|  18| 27|  48|   11|2018|          3|
|2018-11-09 17:55:...|  17|  9|  45|   11|2018|          6|
|2018-11-09 19:57:...|  19|  9|  45|   11|2018|          6|
|2018-11-26 08:33:...|   8| 26|  48|   11|2018|          2|
|2018-11-26 15:33:...|  15| 26|  48|   11|2018|          2|
|2018-11-26 18:25:...|  18| 26|  48|   11|2018|          2|
|2018-11-08 15:01:...|  15|  8|  45|   1

In [74]:
df = spark.read.json("s3a://udacity-dend/log_data/*/*/")

In [79]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x/1000.0)
df = df.withColumn('timestamp', get_timestamp(df.ts))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.utcfromtimestamp(x).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn("datetime", get_datetime(df.timestamp))

In [80]:
df.show()

+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------------+-------------------+
|              artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|       timestamp|           datetime|
+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------------+-------------------+
|            Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11.