# Udacity DEND Project-4: Data Lake

In [17]:
import findspark
findspark.init() 
from pyspark.sql import SparkSession
import os
import configparser
from datetime import datetime

## Load config to envs and vars

In [18]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

AWS_ACCESS_KEY_ID=config['AWS']['AWS_ACCESS_KEY_ID']
INPUT_DATA = config['AWS']['INPUT_DATA']
AWS_SECRET_ACCESS_KEY= config['AWS']['AWS_SECRET_ACCESS_KEY']
INPUT_DATA_SD = config['AWS']['INPUT_DATA_SD']
INPUT_DATA_LD = config['AWS']['INPUT_DATA_LD']
OUTPUT_DATA = config['AWS']['OUTPUT_DATA']
SONG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_SD_LOCAL']
LOG_DATA_LOCAL=config['LOCAL']['INPUT_DATA_LD_LOCAL']
OUTPUT_DATA_LOCAL=config['LOCAL']['OUTPUT_DATA_LOCAL']

print(AWS_ACCESS_KEY_ID)
print(AWS_SECRET_ACCESS_KEY)
print(INPUT_DATA)
print(OUTPUT_DATA)
print(SONG_DATA_LOCAL)
print(LOG_DATA_LOCAL)
print(OUTPUT_DATA_LOCAL)

AKIAQZ6MRHNVQALAFJWU
9+i08D7oFfO8jPwMgrc7+99nIhDfZtZ3MvAaCVSn
s3a://udacity-dend/
s3a://sundog-spark12/kenny_out/
data/song_data/A/A/A/TRAAAAW128F429D538.json
data/log_data/2018/11/2018-11-01-events.json
C:/Users/Kenny/farzer/project_4/outlaw/


## Create spark session with hadoop-aws package

In [19]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0").getOrCreate()

Py4JError: org.apache.spark.api.python.PythonUtils.getPythonAuthSocketTimeout does not exist in the JVM

## Load song_data (from JSON to Spark)

In [12]:
# Read local song_data
song_data_path = SONG_DATA_LOCAL

# Use this instead if you want to read song_data from S3.
#song_data_path = INPUT_DATA_SD

df_sd = spark.read.json(song_data_path)

In [14]:
df_sd.printSchema()
df_sd.show(5, truncate=False)

# Create (songs_table + artists_table) Tables

## Create Songs table + write it to parquet file

In [15]:
df_sd.createOrReplaceTempView("songs_table_DF")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs_table_DF
    ORDER BY song_id
""")
songs_table.printSchema()
songs_table.show()

In [16]:
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = OUTPUT_DATA + "songs_table" + "_" + now
print(songs_table_path)

In [17]:
# Write DF data to JSON file(s)
# Ref: https://stackoverflow.com/questions/29908892/save-a-large-spark-dataframe-as-a-single-json-file-in-s3
#df_sd.write.mode('append').json(songs_table_path)
# -------
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
# Partitioning: https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = OUTPUT_DATA_LOCAL + "songs_table.parquet" + "_" + now

# Use this instead if you want to store output to S3.
#songs_table_path = OUTPUT_DATA + "songs_table.parquet" + "_" + now
#print(songs_table_path)

# NOTE: this command doesn't have partitioning!!
#songs_table.write.parquet(songs_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

## Create Artists table + write it to parquet file

In [19]:
df_sd.createOrReplaceTempView("artists_table_DF")
artists_table = spark.sql("""
    SELECT  artist_id        AS artist_id, 
            artist_name      AS name, 
            artist_location  AS location, 
            artist_latitude  AS latitude, 
            artist_longitude AS longitude 
    FROM artists_table_DF
    ORDER BY artist_id desc
""")
artists_table.printSchema()
artists_table.show(5, truncate=False)

In [21]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now

# Use this instead if you want to store output to S3.
#artists_table_path = OUTPUT_DATA + "artists_table.parquet" + "_" + now

#print(artists_table_path)
songs_table.write.parquet(artists_table_path)

## Load log_data (from JSON to Spark)

In [24]:
# Read local song_data
df_ld = spark.read.json(LOG_DATA_LOCAL)

# Use this instead if you want to read log_data from S3.
#df_ld = spark.read.json(INPUT_DATA_LD)

In [26]:
df_ld.printSchema()
df_ld.show(5)

In [27]:
df_ld_filtered = df_ld.filter(df_ld.page == 'NextSong')
df_ld_filtered.show(20)

# Create (users_table + time_table + songplays_table) Tables

## Create Users table + write it to parquet file

In [28]:
df_ld_filtered.createOrReplaceTempView("users_table_DF")
users_table = spark.sql("""
    SELECT  DISTINCT userId    AS user_id, 
                     firstName AS first_name, 
                     lastName  AS last_name, 
                     gender, 
                     level
    FROM users_table_DF
    ORDER BY last_name
""")
users_table.printSchema()
users_table.show(20)

In [29]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
users_table_path = OUTPUT_DATA_LOCAL + "users_table.parquet" + "_" + now
print(users_table_path)
users_table.write.parquet(users_table_path)

## Create Time table + write it to parquet file

### Create timestamp column

In [30]:
# Create a new column with timestamp
# 
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t
#from datetime import datetime

@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)
    
df_ld_filtered = df_ld_filtered.withColumn("timestamp", get_timestamp("ts"))
                    

df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

### Create datetime column

In [31]:
# Create a new column with datetime
# Ref: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dateformat#pyspark.sql.functions.from_unixtime
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_ld_filtered = df_ld_filtered.withColumn("datetime", get_datetime("ts"))
df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

In [32]:
# Ref: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

df_ld_filtered.createOrReplaceTempView("time_table_DF")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table_DF
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

In [33]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA_LOCAL + "time_table.parquet" + "_" + now
print(time_table_path)
time_table.write.parquet(time_table_path)

## Create songplays table + write it to parquet file

### Join song_data and log_data

In [34]:
# Ref: https://stackoverflow.com/questions/33745964/how-to-join-on-multiple-columns-in-pyspark
df_ld_sd_joined = df_ld_filtered.join(df_sd, (df_ld_filtered.artist == df_sd.artist_name) & (df_ld_filtered.song == df_sd.title))
df_ld_sd_joined.printSchema()
df_ld_sd_joined.show(5, truncate=False)

In [35]:
df_ld_sd_joined = df_ld_sd_joined.withColumn("songplay_id", f.monotonically_increasing_id())


df_ld_sd_joined.createOrReplaceTempView("songplays_table_DF")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_DF
    ORDER BY (user_id, session_id) 
""")

songplays_table.printSchema()
songplays_table.show(5, truncate=False)

In [36]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA_LOCAL + "songplays_table.parquet" + "_" + now
print(songplays_table_path)
time_table.write.parquet(songplays_table_path)

# Example queries

### Check files in S3

In [37]:
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                     )
song_data_path = "udacity-dend"
log_data_path = INPUT_DATA + "log_data/"
print(song_data_path)
print(log_data_path)

input_data_bucket =  s3.Bucket(song_data_path)

In [38]:
count_sd = 0
for obj in input_data_bucket.objects.filter(Prefix="song_data"):
    count_sd += 1
    print(obj)
print(count_sd)

In [39]:
count_ld = 0
for obj in input_data_bucket.objects.filter(Prefix="log_data"):
    count_ld += 1
    print(obj)
print(count_ld)

## Read data from parque files 

In [None]:
# Read songs_table
input_data_parquet = OUTPUT_DATA_LOCAL + "songplays_table.parquet"
df_sd = spark.read.parquet(input_data_parquet)
