In [3]:
import pandas as pd
import boto3
import json
import configparser
import psycopg2
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import SQLContext
from pyspark import SparkContext

In [4]:
KEY=''#Removed for security reasons
SECRET=''#Removed for security reasons
os.environ['AWS_ACCESS_KEY_ID'] = KEY
os.environ['AWS_SECRET_ACCESS_KEY']  = SECRET

In [9]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET)

In [10]:
bucket=s3.Bucket('udacity-dend')
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='log-data')]
log_data_files[:10]

['log-data/',
 'log-data/2018/11/2018-11-01-events.json',
 'log-data/2018/11/2018-11-02-events.json',
 'log-data/2018/11/2018-11-03-events.json',
 'log-data/2018/11/2018-11-04-events.json',
 'log-data/2018/11/2018-11-05-events.json',
 'log-data/2018/11/2018-11-06-events.json',
 'log-data/2018/11/2018-11-07-events.json',
 'log-data/2018/11/2018-11-08-events.json',
 'log-data/2018/11/2018-11-09-events.json']

In [11]:
song_data_files = [o.key for o in bucket.objects.filter(Prefix='song-data/A/A')]
print ("Number of files in song-data in /A/A:",len(song_data_files)-1)
print ("26 * 26 * 600 = "+ str(26*26*600) + " Files in song-data")
song_data_files[1:5]

Number of files in song-data in /A/A: 603
26 * 26 * 600 = 405600 Files in song-data


['song-data/A/A/A/TRAAAAV128F421A322.json',
 'song-data/A/A/A/TRAAABD128F429CF47.json',
 'song-data/A/A/A/TRAAACN128F9355673.json',
 'song-data/A/A/A/TRAAAEA128F935A30D.json']

In [5]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f11e32547f0>


In [1]:
input_data = "s3a://"
song_data = input_data + 'song-data1/A/A/A/*.json'

In [6]:
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

In [7]:
df = spark.read.json(song_data)
df.createOrReplaceTempView("song_data_table")

In [8]:
songs_table = spark.sql("""
                            SELECT sdt.song_id, sdt.title,sdt.artist_id,sdt.year,sdt.duration
                            FROM song_data_table sdt
                            WHERE song_id IS NOT NULL
                        """)

In [10]:
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data+'songs_table/')

In [12]:
artists_table =  spark.sql("""
                                SELECT DISTINCT art.artist_id, art.artist_name,art.artist_location,art.artist_latitude,art.artist_longitude
                                FROM song_data_table art
                                WHERE art.artist_id IS NOT NULL
                            """)

In [13]:
artists_table.write.mode('overwrite').parquet(output_data+'artists_table/')


In [15]:
input_data = "s3a://udacity-dend/"
log_data = input_data + 'log_data/2018/11/2018-11-01-events.json'
df_log = spark.read.json(log_data)

df_log = df_log.filter(df_log.page == 'NextSong')
df_log.createOrReplaceTempView("log_data_table")

In [30]:
users_table = spark.sql("""
                            SELECT DISTINCT userT.userId as user_id, 
                            userT.firstName as first_name,
                            userT.lastName as last_name,
                            userT.gender as gender,
                            userT.level as level
                            FROM log_data_table userT
                            WHERE userT.userId IS NOT NULL
                        """) 

In [9]:
output_data = "s3a://output-spark-proj/"

In [33]:
users_table.write.mode('overwrite').parquet(output_data+'users_table/')

In [37]:
#df_test_log = spark.read.parquet(output_data + 'users_table/*.parquet')

In [38]:
time_table = spark.sql("""
                            SELECT 
                            A.start_time_sub as start_time,
                            hour(A.start_time_sub) as hour,
                            dayofmonth(A.start_time_sub) as day,
                            weekofyear(A.start_time_sub) as week,
                            month(A.start_time_sub) as month,
                            year(A.start_time_sub) as year,
                            dayofweek(A.start_time_sub) as weekday
                            FROM
                            (SELECT to_timestamp(timeSt.ts/1000) as start_time_sub
                            FROM log_data_table timeSt
                            WHERE timeSt.ts IS NOT NULL
                            ) A
                        """)
    
    
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'time_table/')

In [16]:
songplays_table = spark.sql("""
                                SELECT monotonically_increasing_id() as songplay_id,
                                to_timestamp(logT.ts/1000) as start_time,
                                month(to_timestamp(logT.ts/1000)) as month,
                                year(to_timestamp(logT.ts/1000)) as year,
                                logT.userId as user_id,
                                logT.level as level,
                                songT.song_id as song_id,
                                songT.artist_id as artist_id,
                                logT.sessionId as session_id,
                                logT.location as location,
                                logT.userAgent as user_agent
                                FROM log_data_table logT
                                JOIN song_data_table songT on logT.artist = songT.artist_name and logT.song = songT.title
                            """)

In [17]:
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')