In [1]:
import os
import configparser

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()
    return spark

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id

spark = create_spark_session()

log_data = spark.read.json("s3a://udacity-dend/log_data/*/*/*.json")
log_data = log_data[log_data.page == 'NextSong']  
log_data = log_data.withColumn('timestamp', F.to_timestamp(log_data.ts/1000))
log_data = log_data.withColumn('datetime', F.to_date(log_data.timestamp))
log_data = log_data.withColumn('uniqueId', monotonically_increasing_id())
log_data.createOrReplaceTempView("log_data_table")

song_data = spark.read.json("s3a://udacity-dend/song_data/*/*/*/*.json")
song_data.createOrReplaceTempView("song_data_table")

KeyboardInterrupt: 

In [7]:
# extract columns to create songs table
songs_table = song_data.select('song_id', 'title', 'artist_id', 'year', 'duration').limit(10).show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
|SORRNOC12AB017F52B|The Last Beat Of ...|ARSZ7L31187FB4E610|2004|337.81506|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOSMJFC12A8C13DE0C|Is That All There...|AR1KTV21187B9ACD72|   0|343.87546|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
+-----------

In [8]:
# extract columns to create artists table
artists_table = song_data.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').limit(10).show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|       51.50632|        -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|           null|            null|
|ARSZ7L31187FB4E610|           Devotchka|          Denver, CO|       39.74001|      -104.99226|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|           null|            null|
|ARZ5H0P1187B98A1DD|          Snoop Dogg

In [9]:
users_table = log_data.select('userId', 'firstName', 'lastName', 'gender', 'level').limit(10).show()

+------+----------+--------+------+-----+
|userId| firstName|lastName|gender|level|
+------+----------+--------+------+-----+
|    10|    Sylvie|    Cruz|     F| free|
|    53|   Celeste|Williams|     F| free|
|    53|   Celeste|Williams|     F| free|
|    53|   Celeste|Williams|     F| free|
|    29|Jacqueline|   Lynch|     F| paid|
|    29|Jacqueline|   Lynch|     F| paid|
|    29|Jacqueline|   Lynch|     F| paid|
|    29|Jacqueline|   Lynch|     F| paid|
|    29|Jacqueline|   Lynch|     F| paid|
|    29|Jacqueline|   Lynch|     F| paid|
+------+----------+--------+------+-----+



In [13]:
time_table = spark.sql("""
        select timestamp
        , datetime AS start_time
        , hour(timestamp) AS hour
        , day(timestamp) AS day
        , weekofyear(timestamp) AS week
        , month(timestamp) AS month
        , year(timestamp) AS year
        , weekday(timestamp) AS weekday
        from log_data_table
    """).limit(10).show()

+--------------------+----------+----+---+----+-----+----+-------+
|           timestamp|start_time|hour|day|week|month|year|weekday|
+--------------------+----------+----+---+----+-----+----+-------+
|2018-11-12 02:37:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:37:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:42:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:45:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:47:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:50:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:54:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 02:57:...|2018-11-12|   2| 12|  46|   11|2018|      0|
|2018-11-12 03:00:...|2018-11-12|   3| 12|  46|   11|2018|      0|
|2018-11-12 03:03:...|2018-11-12|   3| 12|  46|   11|2018|      0|
+--------------------+----------+----+---+----+-----+----+-------+



In [24]:
songplay_table = spark.sql("""
    SELECT DISTINCT
        stg.uniqueId AS songplay_id,
        stg.ts AS start_time,
        stg.userId,
        stg.level,
        stg2.song_id,
        stg2.artist_id,
        stg.sessionId,
        stg.location,
        stg.userAgent
    FROM log_data_table stg
    LEFT JOIN song_data_table stg2
        ON stg.artist = stg2.artist_name
        AND stg.song = stg2.title
        AND stg.length = stg2.duration
    WHERE stg.userId IS NOT NULL
""").limit(10).show()

#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

+-----------+-------------+------+-----+-------+---------+---------+--------------------+--------------------+
|songplay_id|   start_time|userId|level|song_id|artist_id|sessionId|            location|           userAgent|
+-----------+-------------+------+-----+-------+---------+---------+--------------------+--------------------+
|         12|1542012644796|   100| free|   null|     null|      428|New York-Newark-J...|"Mozilla/5.0 (Mac...|
|        110|1542049762796|    73| paid|   null|     null|      294|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|
|         91|1542046045796|    33| free|   null|     null|      399|          Eugene, OR|"Mozilla/5.0 (Win...|
|          9|1541991804796|    29| paid|   null|     null|      389|Atlanta-Sandy Spr...|"Mozilla/5.0 (Mac...|
|        160|1542063919796|    80| paid|   null|     null|      481|Portland-South Po...|"Mozilla/5.0 (Mac...|
|         61|1542039844796|    97| paid|   null|     null|      374|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|