# Data Lake S3 - Local

In [2]:
from pyspark.sql import SparkSession
import os
import configparser

## Make sure that your AWS credentials are loaded as env vars

In [3]:
config = configparser.ConfigParser()

# Normally this file should be in ~/.aws/credentials
config.read_file(open('/home/hynso/.aws/credentials'))

os.environ['AWS_ACCESS_KEY_ID'] = config.get('default', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('default', 'AWS_SECRET_ACCESS_KEY')

# print(config.get('default', 'AWS_ACCESS_KEY_ID'))
# print(config.get('default', 'AWS_SECRET_ACCESS_KEY'))

# print(os.environ['AWS_ACCESS_KEY_ID'])
# print(os.environ['AWS_SECRET_ACCESS_KEY'])

## Create Spark session with `hadoop-aws` package

In [3]:
spark = \
    SparkSession \
        .builder \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.5') \
        .getOrCreate()

# spark = \
#     SparkSession \
#         .builder \
#         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
#         .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
#         .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
#         .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
#         .getOrCreate()

## Load data from Udacity S3

In [4]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [5]:
df.printSchema()

root
 |-- _c0: string (nullable = true)



In [6]:
df.limit(10).toPandas()

Unnamed: 0,_c0
0,payment_id;customer_id;staff_id;rental_id;amou...
1,16050;269;2;7;1.99;2017-01-24 21:40:19.996577+00
2,16051;269;1;98;0.99;2017-01-25 15:16:50.996577+00
3,16052;269;2;678;6.99;2017-01-28 21:44:14.99657...
4,16053;269;2;703;0.99;2017-01-29 00:58:02.99657...
5,16054;269;1;750;4.99;2017-01-29 08:10:06.99657...
6,16055;269;2;1099;2.99;2017-01-31 12:23:14.9965...
7,16056;270;1;193;1.99;2017-01-26 05:10:14.99657...
8,16057;270;1;1040;4.99;2017-01-31 04:03:42.9965...
9,16058;271;1;1096;8.99;2017-01-31 11:59:15.9965...


In [7]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv", sep=";", inferSchema=True, header=True)

In [8]:
df.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)



In [9]:
df.limit(10).toPandas()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,16050,269,2,7,1.99,2017-01-24 21:40:19.996577+00
1,16051,269,1,98,0.99,2017-01-25 15:16:50.996577+00
2,16052,269,2,678,6.99,2017-01-28 21:44:14.996577+00
3,16053,269,2,703,0.99,2017-01-29 00:58:02.996577+00
4,16054,269,1,750,4.99,2017-01-29 08:10:06.996577+00
5,16055,269,2,1099,2.99,2017-01-31 12:23:14.996577+00
6,16056,270,1,193,1.99,2017-01-26 05:10:14.996577+00
7,16057,270,1,1040,4.99,2017-01-31 04:03:42.996577+00
8,16058,271,1,1096,8.99,2017-01-31 11:59:15.996577+00
9,16059,272,1,33,0.99,2017-01-25 02:47:17.996577+00


In [10]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType

schema = StructType([
  StructField('payment_id', IntegerType(), True),
  StructField('customer_id', IntegerType(), True),
  StructField('staff_id', IntegerType(), True),
  StructField('rental_id', IntegerType(), True),
  StructField('amount', DoubleType(), True),
  StructField('payment_date', TimestampType(), True)
])
# schema = "payment_id INT, customer_id INT, staff_id INT, rental_id INT, amount DOUBLE, payment_date TIMESTAMP"

df2 = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv", 
                    schema=schema, sep=";", header=True, 
                    timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

In [11]:
df2.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)



In [12]:
df2.limit(10).toPandas()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,16050,269,2,7,1.99,2017-01-24 21:40:19.996577
1,16051,269,1,98,0.99,2017-01-25 15:16:50.996577
2,16052,269,2,678,6.99,2017-01-28 21:44:14.996577
3,16053,269,2,703,0.99,2017-01-29 00:58:02.996577
4,16054,269,1,750,4.99,2017-01-29 08:10:06.996577
5,16055,269,2,1099,2.99,2017-01-31 12:23:14.996577
6,16056,270,1,193,1.99,2017-01-26 05:10:14.996577
7,16057,270,1,1040,4.99,2017-01-31 04:03:42.996577
8,16058,271,1,1096,8.99,2017-01-31 11:59:15.996577
9,16059,272,1,33,0.99,2017-01-25 02:47:17.996577


## Fix the timestamp column type manually

In [13]:
import pyspark.sql.functions as F

dfPayment = df.withColumn('payment_date', F.to_timestamp('payment_date'))

In [14]:
dfPayment.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)



In [15]:
dfPayment.limit(10).toPandas()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,16050,269,2,7,1.99,2017-01-24 16:40:19.996577
1,16051,269,1,98,0.99,2017-01-25 10:16:50.996577
2,16052,269,2,678,6.99,2017-01-28 16:44:14.996577
3,16053,269,2,703,0.99,2017-01-28 19:58:02.996577
4,16054,269,1,750,4.99,2017-01-29 03:10:06.996577
5,16055,269,2,1099,2.99,2017-01-31 07:23:14.996577
6,16056,270,1,193,1.99,2017-01-26 00:10:14.996577
7,16057,270,1,1040,4.99,2017-01-30 23:03:42.996577
8,16058,271,1,1096,8.99,2017-01-31 06:59:15.996577
9,16059,272,1,33,0.99,2017-01-24 21:47:17.996577


## Extract the month

In [16]:
dfPayment = dfPayment.withColumn('month', F.month('payment_date'))

In [17]:
dfPayment.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)
 |-- month: integer (nullable = true)



In [18]:
dfPayment.limit(10).toPandas()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,month
0,16050,269,2,7,1.99,2017-01-24 16:40:19.996577,1
1,16051,269,1,98,0.99,2017-01-25 10:16:50.996577,1
2,16052,269,2,678,6.99,2017-01-28 16:44:14.996577,1
3,16053,269,2,703,0.99,2017-01-28 19:58:02.996577,1
4,16054,269,1,750,4.99,2017-01-29 03:10:06.996577,1
5,16055,269,2,1099,2.99,2017-01-31 07:23:14.996577,1
6,16056,270,1,193,1.99,2017-01-26 00:10:14.996577,1
7,16057,270,1,1040,4.99,2017-01-30 23:03:42.996577,1
8,16058,271,1,1096,8.99,2017-01-31 06:59:15.996577,1
9,16059,272,1,33,0.99,2017-01-24 21:47:17.996577,1


## Compute aggregate revenue by month

In [19]:
dfPayment \
    .select(['amount', 'month']) \
    .groupBy('month') \
    .sum('amount') \
    .orderBy(F.desc('sum(amount)')) \
    .toPandas()

Unnamed: 0,month,sum(amount)
0,4,28559.46
1,3,23237.11
2,2,10281.33
3,1,4824.43
4,5,514.18


In [20]:
dfPayment \
    .select(['amount', 'month']) \
    .groupBy('month') \
    .agg(F.sum('amount').alias('revenue')) \
    .orderBy(F.desc('revenue')) \
    .toPandas()

Unnamed: 0,month,revenue
0,4,28559.46
1,3,23237.11
2,2,10281.33
3,1,4824.43
4,5,514.18


In [21]:
# SQL approach
dfPayment.createOrReplaceTempView('payment')

spark.sql(
    """
    select month, sum(amount) as revenue
    from payment
    group by month 
    order by revenue desc
    """
).toPandas()

Unnamed: 0,month,revenue
0,4,28559.46
1,3,23237.11
2,2,10281.33
3,1,4824.43
4,5,514.18


In [22]:
dfPayment.createOrReplaceTempView('payment')

spark.sql(
    """
    select month(payment_date) as mo, sum(amount) as revenue
    from payment
    group by mo
    order by revenue desc
    """
).toPandas()

Unnamed: 0,mo,revenue
0,4,28559.46
1,3,23237.11
2,2,10281.33
3,1,4824.43
4,5,514.18


In [23]:
spark.stop()