# Exercise 3 - Data Lake on S3

In [1]:
from pyspark.sql import SparkSession
import os 
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [6]:
config = configparser.ConfigParser()

config.read_file(open('/Users/yerxiong/.aws/credentials'))

os.environ["AWS_ACCESS_KEY_ID"] = config['default']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['default']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [8]:
spark = SparkSession.builder\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
    .getOrCreate()
spark 

In [9]:
# Load data from S3
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [10]:
df.printSchema()
df.show(5)

root
 |-- _c0: string (nullable = true)

+--------------------+
|                 _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows



# Infer schema, fix header and separator

In [12]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", inferSchema=True, header=True)

In [13]:
df.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



# Fix the data 

In [14]:
import pyspark.sql.functions as F
# convert the payment date which is currently a string to a timestamp
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 16:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 10:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 16:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-28 19:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 03:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



# Extract the month 

In [16]:
dfPayment = dfPayment.withColumn("month", F.month("payment_date"))
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2017-01-24 16:40:...|    1|
|     16051|        269|       1|       98|  0.99|2017-01-25 10:16:...|    1|
|     16052|        269|       2|      678|  6.99|2017-01-28 16:44:...|    1|
|     16053|        269|       2|      703|  0.99|2017-01-28 19:58:...|    1|
|     16054|        269|       1|      750|  4.99|2017-01-29 03:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



# Computer aggregate revenue per month

In [17]:
dfPayment.createOrReplaceTempView("payment")
spark.sql("""
SELECT month, sum(amount) as revenue 
FROM payment 
GROUP BY month
ORDER BY revenue desc 
""").show()

+-----+------------------+
|month|           revenue|
+-----+------------------+
|    4|28559.460000003943|
|    3| 23237.11000000187|
|    2|10281.329999999574|
|    1| 4824.429999999856|
|    5|  514.180000000001|
+-----+------------------+



# Fix the schema 

In [19]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
paymentSchema = R([
    Fld("payment_id", Int()),
    Fld("customer_id", Int()),
    Fld("staff_id", Int()),
    Fld("rental_id", Int()),
    Fld("amount", Dbl()),
    Fld("payment_date", Date()),
])

In [20]:
dfPaymentWithSchema = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";",schema=paymentSchema, header=True)

In [21]:
dfPaymentWithSchema.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



In [22]:
dfPaymentWithSchema.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month(payment_date) as m, sum(amount) as revenue
    FROM payment 
    GROUP BY m
    ORDER BY revenue desc 
""").show()

+---+------------------+
|  m|           revenue|
+---+------------------+
|  4|28559.460000003943|
|  3|23886.560000002115|
|  2| 9631.879999999608|
|  1| 4824.429999999856|
|  5|  514.180000000001|
+---+------------------+

