In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, monotonically_increasing_id #I took out udf
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, TimestampType

In [None]:
os.environ['AWS_ACCESS_KEY_ID']=''
os.environ['AWS_SECRET_ACCESS_KEY']=''

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
# get filepath to song data file
song_data = 's3a://udacity-dend/song_data/A/A/*/*.json'

In [None]:
song_schema = StructType([
                        StructField("num_songs", IntegerType()),
                        StructField("artist_id", StringType()),
                        StructField("artist_latitude", DoubleType()),
                        StructField("artist_longitude", DoubleType()),
                        StructField("artist_location", StringType()),
                        StructField("artist_name", StringType()),
                        StructField("song_id", StringType()),
                        StructField("title", StringType()),
                        StructField("duration", DoubleType()),
                        StructField("year", IntegerType())
                        ])

In [None]:
spark = create_spark_session()

df = spark.read.json(song_data, schema=song_schema)

In [None]:
df.printSchema()
df.show(5)


In [None]:
df.createOrReplaceTempView("songs_view")

songs_table = spark.sql("""
                        SELECT song_id, title, artist_id, year, duration
                        FROM songs_view
                        WHERE song_id IS NOT NULL
                        """)

In [None]:
print(type(songs_table))

In [None]:
songs_table.show(5)

In [None]:
songs_table.write.partitionBy("year", "artist_id").mode('overwrite').parquet("songs.parquet")

In [None]:
# extract columns to create artists table
artists_table = spark.sql("""
                          SELECT DISTINCT artist_id, artist_name AS name, artist_location AS location, artist_latitude AS latitude, artist_longitude AS longitude
                          FROM songs_view
                          WHERE artist_id IS NOT NULL
                          """)
artists_table.show(5)

In [None]:
artists_table.write.mode('overwrite').parquet("artists.parquet")

In [None]:
log_data = 's3a://udacity-dend/log_data/*/*/*.json'

In [None]:
log_schema = StructType([
                        StructField("artist", StringType()),
                        StructField("auth", StringType()),
                        StructField("firstName", StringType()),
                        StructField("gender", StringType()),
                        StructField("itemInSession", IntegerType()),
                        StructField("lastName", StringType()),
                        StructField("length", DoubleType()),
                        StructField("level", StringType()),
                        StructField("location", StringType()),
                        StructField("method", StringType()),
                        StructField("page", StringType()),
                        StructField("registration", DoubleType()),
                        StructField("sessionId", IntegerType()),
                        StructField("song", StringType()),
                        StructField("status", IntegerType()),
                        StructField("ts", IntegerType()),
                        StructField("userAgent", StringType()),
                        StructField("userId", IntegerType())
                        ])

In [None]:
# read log data file
###
# I CHANGED DF TO DF_LOG, MAKE SURE YOU KNOW THAT THIS WAS CHANGED SO IF YOU KEEP YOU CAN CHANGE ETL.PY
df_log = spark.read.json(log_data)


In [None]:
# filter by actions for song plays
df_log = df_log.filter(df_log.page=='NextSong')
df_log.createOrReplaceTempView("log_view")

In [None]:
df_log.show(10)
df_log.printSchema()

In [None]:
users_table = spark.sql("""
                        SELECT DISTINCT userId AS user_id, firstName AS first_name, lastName AS last_name, gender, level
                        FROM log_view
                        WHERE userId IS NOT NULL
                        """)

In [None]:
users_table.show(5)

In [None]:
users_table.write.mode('overwrite').parquet("users.parquet")

In [None]:
from pyspark.sql.functions import from_unixtime

# create timestamp column from original timestamp column
# get_timestamp = udf(lambda x: (x / 1000))
# df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))

print('Here is the type of ts col:')
print(type(df_log.first()['ts']))

print('\n Here is the first value of the ts col:')
print((df_log.first()['ts']))

# df_log = df_log.withColumn('nonMilli', get_timestamp(df_log.ts))

# print('Here is the type of nonMilli col:')
# print(type(df_log.first()['nonMilli']))

# print('\n Here is the first value of the nonMilli col:')
# print((df_log.first()['nonMilli']))

# df_log = df_log.withColumn("timestamp5", from_unixtime(get_timestamp(df_log.ts)))

# df_log = df_log.withColumn("timestamp4", get_timestamp(df_log.ts))

# df.withColumn("tsDate", from_unixtime($"timestamp"))

In [None]:
# df_log = df_log.withColumn('nonMilli', ((df_log.ts)/1000).cast("int").cast(TimestampType()))

# print('Here is the type of nonMilli col:')
# print(type(df_log.first()['nonMilli']))

# print('\n Here is the first value of the nonMilli col:')
# print((df_log.first()['nonMilli']))

from pyspark.sql.functions import udf


get_timestamp_no_milliseconds = udf(lambda x: int(int(x)/1000), IntegerType())
df_log = df_log.withColumn('timestamp_no_milliseconds', get_timestamp_no_milliseconds(df_log.ts))

print('Here is the type of nonMilli col:')
print(type(df_log.first()['timestamp_no_milliseconds']))

print('\n Here is the first value of the nonMilli col:')
print((df_log.first()['timestamp_no_milliseconds']))


In [None]:
# df_log = df_log.withColumn('nonMilli3', col('nonMilli').cast("int"))

# print('Here is the nonMilli3 type, should be int:')
# print(type(df_log.first()['nonMilli3']))

# print('\n Here is the first value of nonMilli3:')
# print(df_log.first()['nonMilli3'])

In [None]:
from datetime import datetime

get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())

df_log = df_log.withColumn('start_time', get_datetime(df_log.timestamp_no_milliseconds))

print('Here is the type of start_time col:')
print(type(df_log.first()['start_time']))

print('\n Here is the first value of the start_time col:')
print((df_log.first()['start_time']))

In [None]:
# print(type(df_log.first()['timestamp5']))

In [None]:
# create datetime column from original timestamp column
# get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())
# df_log = df_log.withColumn("start_time", df_log.nonMilli)
df_log.show(5)

In [None]:
df_log = df_log.withColumn('year', year(df_log.start_time))
df_log = df_log.withColumn('month', month(df_log.start_time))
df_log = df_log.withColumn('week', weekofyear(df_log.start_time))
df_log = df_log.withColumn('weekday', date_format(df_log.start_time, 'E'))
df_log = df_log.withColumn('day', dayofmonth(df_log.start_time))
df_log = df_log.withColumn('hour', hour(df_log.start_time))



In [None]:
df_log.show(5)

In [None]:
df_log.createOrReplaceTempView("log_view_2")


In [None]:
df_log.printSchema()
# print(df_log)

In [None]:
time_table = spark.sql("""
                       SELECT DISTINCT start_time, hour, day, week, weekday, year, month
                       FROM log_view_2
                       """)

In [None]:
time_table.show(10)

In [None]:
time_table.write.mode('overwrite').parquet('time.parquet')

In [None]:
song_df = spark.read.json(song_data, schema=song_schema)
song_df.createOrReplaceTempView("songs_view_2")


In [None]:

songplays_table = spark.sql("""
                            SELECT monotonically_increasing_id() AS songplay_id, l.start_time AS start_time, l.userId AS user_id,
                                   l.level AS level, s.song_id AS song_id, s.artist_id AS artist_id, l.sessionId AS session_id,
                                   l.location AS location, l.userAgent AS user_agent, l.year AS year, l.month AS month
                            FROM songs_view_2 s
                            JOIN log_view_2 l ON s.artist_name = l.artist AND s.title = l.song
                            """)

In [None]:
songplays_table.show(5)

In [None]:
songplays_table.write.partitionBy("year", "month").mode('overwrite').parquet("songplays.parquet")

In [None]:
run etl.py

In [None]:
s3://data-lake-project-delaguila/test/

In [None]:
https://data-lake-project-delaguila.s3.us-west-2.amazonaws.com/test/