# Exercise 3 - Data Lake on S3

### access S3 from Spark

In [1]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('aws/credentials.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load data from S3

In [4]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [5]:
df.printSchema()
df.show(5)

root
 |-- _c0: string (nullable = true)

+--------------------+
|                 _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows



# Infer schema, fix header and separator

In [None]:
# infer schema

In [6]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", inferSchema=True, header=True)

In [9]:
df.printSchema()
df.show(5, truncate=False)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

+----------+-----------+--------+---------+------+-----------------------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date                 |
+----------+-----------+--------+---------+------+-----------------------------+
|16050     |269        |2       |7        |1.99  |2017-01-24 21:40:19.996577+00|
|16051     |269        |1       |98       |0.99  |2017-01-25 15:16:50.996577+00|
|16052     |269        |2       |678      |6.99  |2017-01-28 21:44:14.996577+00|
|16053     |269        |2       |703      |0.99  |2017-01-29 00:58:02.996577+00|
|16054     |269        |1       |750      |4.99  |2017-01-29 08:10:06.996577+00|
+----------+-----------+--------+---------+------+-----------------------------+
only showing 

# Fix the data yourself 

In [11]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5, truncate=False)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

+----------+-----------+--------+---------+------+--------------------------+
|payment_id|customer_id|staff_id|rental_id|amount|payment_date              |
+----------+-----------+--------+---------+------+--------------------------+
|16050     |269        |2       |7        |1.99  |2017-01-24 21:40:19.996577|
|16051     |269        |1       |98       |0.99  |2017-01-25 15:16:50.996577|
|16052     |269        |2       |678      |6.99  |2017-01-28 21:44:14.996577|
|16053     |269        |2       |703      |0.99  |2017-01-29 00:58:02.996577|
|16054     |269        |1       |750      |4.99  |2017-01-29 08:10:06.996577|
+----------+-----------+--------+---------+------+--------------------------+
only showing top 5 rows



# Extract the month

In [12]:
dfPayment = dfPayment.withColumn("month", F.month("payment_date"))
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|    1|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|    1|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|    1|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|    1|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



# Compute aggregate revenue per month

In [13]:
dfPayment.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month, sum(amount) as revenue
    FROM payment
    GROUP by month
    order by revenue desc
""").show()

+-----+------------------+
|month|           revenue|
+-----+------------------+
|    4|28559.460000003943|
|    3|23886.560000002115|
|    2| 9631.879999999608|
|    1| 4824.429999999856|
|    5|  514.180000000001|
+-----+------------------+



# Fix the schema

In [14]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
paymentSchema = R([
    Fld("payment_id",Int()),
    Fld("customer_id",Int()),
    Fld("staff_id",Int()),
    Fld("rental_id",Int()),
    Fld("amount",Dbl()),
    Fld("payment_date",Date()),
])

In [None]:
# give explicit schema

In [15]:
dfPaymentWithSchema = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", schema=paymentSchema, header=True)


In [16]:
dfPaymentWithSchema.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



In [17]:
dfPaymentWithSchema.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month(payment_date) as m, sum(amount) as revenue
    FROM payment
    GROUP by m
    order by revenue desc
""").show()

+---+------------------+
|  m|           revenue|
+---+------------------+
|  4|28559.460000003943|
|  3|23886.560000002115|
|  2| 9631.879999999608|
|  1| 4824.429999999856|
|  5|  514.180000000001|
+---+------------------+



In [4]:
song_data = "s3a://udacity-dend/song_data/A/A/A/*.json"
df_song = spark.read.json(song_data)

In [8]:
df_song.printSchema()
df_song.show(1)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|       artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|The Bonzo Dog Band|301

In [5]:
from pyspark.sql.functions import expr
df_song = df_song.withColumn("num_songs", expr("cast(num_songs as int)"))
df_song = df_song.withColumn("year", expr("cast(year as int)"))

In [6]:
songs_table = df_song.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()
songs_table.limit(1).show()

+------------------+------------+------------------+----+---------+
|           song_id|       title|         artist_id|year| duration|
+------------------+------------+------------------+----+---------+
|SOHKNRJ12A6701D1F8|Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+------------------+------------+------------------+----+---------+



In [14]:
songs_table.write.partitionBy("year", "artist_id").parquet("songs")

In [7]:
artists_table = df_song.selectExpr("artist_id as artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude").dropDuplicates()
artists_table.limit(1).show()

+------------------+-------------+---------------+--------+---------+
|         artist_id|         name|       location|latitude|longitude|
+------------------+-------------+---------------+--------+---------+
|ARSVTNL1187B992A91|Jonathan King|London, England|51.50632| -0.12714|
+------------------+-------------+---------------+--------+---------+



In [18]:
artists_table.write.partitionBy("artist_id").parquet("artists")

In [8]:
log_data = 's3a://udacity-dend/log_data/2018/11/2018-11-3*.json'
df_log = spark.read.json(log_data)

In [68]:
df_log.printSchema()
df_log.show(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- new_ts: string (nullable = true)
 |-- start_time: string (nullable = true)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+-----

In [9]:
df_log = df_log.where(df_log.page == "NextSong")

from pyspark.sql.functions import expr
df_log = df_log.withColumn("itemInSession", expr("cast(itemInSession as int)"))
df_log = df_log.withColumn("sessionId", expr("cast(sessionId as int)"))
df_log = df_log.withColumn("status", expr("cast(status as int)"))

In [10]:
users_table = df_log.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender as gender", "level as level").dropDuplicates()
users_table.limit(1).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
+-------+----------+---------+------+-----+



In [11]:
users_table.write.partitionBy("user_id").parquet("users")

In [7]:
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format

In [8]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S.%f')) 
df_log = df_log.withColumn("new_ts", get_timestamp(df_log.ts))

In [9]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df_log = df_log.withColumn("start_time", get_datetime(df_log.ts))

In [58]:
time_table = df_log.selectExpr("start_time as start_time", "hour(start_time) as hour", "dayofmonth(start_time) as day", "weekofyear(start_time) as week", "month(start_time) as month", "year(start_time) as year", "dayofweek(start_time) as weekday")
time_table.limit(1).show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-05 00:33:12|   0|  5|  45|   11|2018|      2|
+-------------------+----+---+----+-----+----+-------+



In [59]:
time_table.write.partitionBy("year", "month").parquet("time")

In [10]:
from pyspark.sql.functions import monotonically_increasing_id

In [11]:
songplays_table = df_log \
                .join(df_song, [df_log.artist == df_song.artist_name, df_log.song == df_song.title], 'left') \
                .select(df_log.start_time, \
                        df_log.userId, \
                        df_log.level, \
                        df_song.song_id, \
                        df_song.artist_id, \
                        df_log.sessionId, \
                        df_log.location, \
                        df_log.userAgent)
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())
songplays_table.limit(1).show()

+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+-----------+
|         start_time|userId|level|song_id|artist_id|sessionId|            location|           userAgent|songplay_id|
+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+-----------+
|2018-11-30 00:22:07|    91| free|   null|     null|      829|Dallas-Fort Worth...|Mozilla/5.0 (comp...|          0|
+-------------------+------+-----+-------+---------+---------+--------------------+--------------------+-----------+



In [81]:
songplays_table = songplays_table.selectExpr("songplay_id as songplay_id", \
                                             "start_time as start_time", \
                                             "userId as user_id", \
                                             "level as level", \
                                             "song_id as song_id", \
                                             "artist_id as artist_id", \
                                             "sessionId as session_id", \
                                             "location as location", \
                                             "userAgent as user_agent", \
                                             "year(start_time) as year", \
                                             "month(start_time) as month")

In [82]:
songplays_table.write.partitionBy("year", "month").parquet("songplays")