In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear, 
                                   date_format, udf, col)


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
# import boto3
# s3 = boto3.resource('s3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
# my_bucket = s3.Bucket('udacity-dend')

# i=0
# for my_bucket_object in my_bucket.objects.filter(Prefix='log_data/').all(): #.all()
#     i+=1
#     print(my_bucket_object)


s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

# Song Table

In [4]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, LongType, TimestampType

song_schema = StructType([
    StructField("num_songs",        IntegerType()),
    StructField("artist_id",        StringType()),
    StructField("artist_latitude",  DoubleType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_location",  StringType()),
    StructField("artist_name",      StringType()),
    StructField("song_id",          StringType()),
    StructField("title",            StringType()),
    StructField("duration",         DoubleType()),
    StructField("year",             IntegerType()),
])

song_data_df = spark.read.json("data/song_data/*/*/*/*.json", schema=song_schema)
song_data_df.printSchema()
song_data_df.show(5)

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)

+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|  artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|           null|            null|                 |Montserrat Caball.

In [9]:
from pyspark.sql.functions import from_unixtime, col

log_schema = StructType([
    StructField("artist",        StringType()),
    StructField("auth",          StringType()),
    StructField("firstName",     StringType()),
    StructField("gender",        StringType()),
    StructField("itemInSession", LongType()),
    StructField("lastName",      IntegerType()),
    StructField("length",        DoubleType()),
    StructField("level",         StringType()),
    StructField("location",      StringType()),
    StructField("method",        StringType()),
    StructField("page",          StringType()),
    StructField("registration",  DoubleType()),
    StructField("sessionId",     DoubleType()),
    StructField("song",          StringType()),
    StructField("status",        IntegerType()),
    StructField("ts",            TimestampType()),
    StructField("userAgent",     IntegerType()),
    StructField("userId",        LongType()),
])

log_data_df = spark.read.json("data/log_data/*/*/*.json")
log_data_df = log_data_df.where(col("page")=="NextSong")
log_data_df = log_data_df.withColumn("datetime", from_unixtime(col("ts")/1000))
log_data_df.printSchema()
log_data_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|first

# Tables for SQL in Spark

In [10]:
song_data_df.createOrReplaceTempView("song_data")

In [11]:
log_data_df.createOrReplaceTempView("log_data")

### Songs

In [12]:
# songs = song_data_df.select("song_id", "title", "artist_id", "year", "duration") \
#                     .where("song_id is not null") \
#                     .dropDuplicates() \
#                     .show(5)

In [13]:
songs = spark.sql("""
    SELECT DISTINCT
           s.song_id   as song_id
         , s.title     as title
         , s.artist_id as artist_id
         , s.year      as year
         , s.duration  as duration
    FROM song_data s
    WHERE s.song_id IS NOT NULL
""")
songs.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOUDSGM12AC9618304|Insatiable (Instr...|ARNTLGG11E2835DDB9|   0|266.39628|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOZCTXZ12AB0182364|      Setanta matins|AR5KOSW1187FB35FF4|   0|269.58322|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



### Artists

In [14]:
# artists = song_data_df.select("artist_id", F.col("artist_name").alias("name"), F.col("artist_location").alias("location"), 
#                               F.col("artist_latitude").alias("latitude"), F.col("artist_longitude").alias("longitude")) \
#                       .where("artist_id is not null") \
#                       .dropDuplicates() \
#                       .show(5)

In [15]:
artists = spark.sql("""
    SELECT DISTINCT
           s.artist_id        as artist_id
         , s.artist_name      as name
         , s.artist_location  as location
         , s.artist_latitude  as latitude
         , s.artist_longitude as longitude
    FROM song_data s
    WHERE s.artist_id IS NOT NULL
""")
artists.show(5)

+------------------+------------+---------------+--------+----------+
|         artist_id|        name|       location|latitude| longitude|
+------------------+------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|    Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|    null|      null|
|AR0IAWL1187B9A96D0|Danilo Perez|         Panama|  8.4177| -80.11278|
|ARMBR4Y1187B9990EB|David Martin|California - SF|37.77916|-122.42005|
|ARD0S291187B9B7BF5|     Rated R|           Ohio|    null|      null|
+------------------+------------+---------------+--------+----------+
only showing top 5 rows



### Users

In [16]:
users = spark.sql("""
    SELECT DISTINCT 
           e.userId    as user_id
         , e.firstName as first_name
         , e.lastName  as last_name
         , e.gender    as gender
         , e.level     as level
    FROM log_data e
    WHERE e.userId IS NOT NULL
""")
users.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



### Time

In [17]:
from pyspark.sql.functions import hour, dayofmonth, weekofyear, month, year, dayofweek

time = spark.sql("""
    SELECT DISTINCT
           e.datetime           as start_time
         , hour(datetime)       as hour
         , day(datetime)        as day
         , weekofyear(datetime) as week
         , dayofmonth(datetime) as month
         , year(datetime)       as year
         , CASE WHEN dayofweek(datetime) in (1,7) THEN FALSE ELSE TRUE END as weekday
    FROM log_data e
    WHERE e.ts IS NOT NULL
""")
time.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 11:39:32|  11| 15|  46|   15|2018|   true|
|2018-11-15 15:16:39|  15| 15|  46|   15|2018|   true|
|2018-11-15 17:37:32|  17| 15|  46|   15|2018|   true|
|2018-11-15 22:10:08|  22| 15|  46|   15|2018|   true|
|2018-11-21 07:21:43|   7| 21|  47|   21|2018|   true|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



### Songplay

In [18]:
from pyspark.sql.functions import lower, monotonically_increasing_id

songplay = spark.sql("""
    SELECT DISTINCT
           row_number() over (order by monotonically_increasing_id()) as songplay_id
         , e.datetime  as start_time
         , e.userId    as user_id
         , e.level     as level
         , s.song_id   as song_id
         , s.artist_id as artist_id
         , e.sessionId as session_id
         , e.location  as location
         , e.userAgent as user_agent
    FROM log_data e
    JOIN song_data s ON (
        lower(e.artist) = lower(s.artist_name)
        AND lower(e.song) = lower(s.title)
        AND e.length = s.duration
    )
""")
songplay.show()

+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          1|2018-11-21 21:56:47|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [19]:
# songplay.withColumn("year", year("start_time")) \
#         .withColumn("month", month("start_time")) \
#         .write.partitionBy("year", "month") \
#         .mode("overwrite") \
#         .parquet("output/")

# ETL.py

In [None]:
!spark-submit etl.py

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3be7a5a2-c478-4cab-9da5-6e25c136427e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commo

In [209]:
for table in os.listdir("output"):
    print(120*"-")
    print("TABLE: ",table)
    df_etl = spark.read.parquet("output/" + str(table))
    df_etl.printSchema()
    df_etl.show(10)
    
    print(120*"-")


------------------------------------------------------------------------------------------------------------------------
TABLE:  users
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
|     88|  Mohammad|Rodriguez|     M| paid|
|     11| Christian|   Porter|     F| free|
|     75|    Joseph|Gutierrez|     M| free|
|     77| Magdalene|   Herman|     F| free|
|     69|  Anabelle|  Simpson|     F| free|
|     53|   Celeste| Williams|     F| free|
|     61|    Samuel| Gonzalez|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     45|  Dominick|   Norris|     M| free|
+-------+----------+---------+------+-----+
only showing top 10 rows

--------------------