In [None]:
import sys
!conda install --yes --prefix {sys.prefix} boto3
!{sys.executable} -m pip install git+git://github.com/moj-analytical-services/etl_manager.git#egg=etl_manager

In [2]:
import boto3
import os
# from dataengineeringutils.utils import read_json

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.storagelevel import StorageLevel

# Import own function library
from pyspark.sql.types import *

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 pyspark-shell'

session = boto3.Session()
credentials = session.get_credentials()


spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") \
    .config("spark.hadoop.fs.s3.awsAccessKeyId", credentials.access_key) \
    .config("spark.hadoop.fs.s3.awsSecretAccessKey", credentials.secret_key) \
    .enableHiveSupport() \
    .getOrCreate()
    
os.getcwd()

'/home/jovyan/Documents/projects/spark_testing'

Example: 

Want to create a dataset that has SCD2 using spark. Creating a dataset where each record has a start and end date making it easy to query the data based on a date :

```sql
SELECT * FROM database.table where start_date >= 2018-01-01 AND end_date < 2018-01-01
```
In my example below we have 3 days worth of data. The data itself has rows that update over time rather than a row being created. We will ingest new data each day and recalculate the start and end date for each record in a partition.

Our dummy in data has an id (our primary key) and a dummy_contents. The dummy_content will always be a when the row is new and then iterates through the alphabet each time it is updated. This is just to keep track of how many times each record has changed. 

In [4]:
# Test new update method
df_partitions = ['partition_bin']
in_schema = StructType([StructField('id', IntegerType(), True), StructField('dummy_contents', StringType())])

# Day one we get our first set of data
delta_day_1 = [
    (1, 'a'),
    (2, 'a'),
    (3, 'a'),
    (4, 'a'),
    (5, 'a'),
    (6, 'a'),
    (7, 'a'),
    (8, 'a'),
    (9, 'a')]

# Data 2 we mostly get new data with some old data and updated records
delta_day_2 = [
    (7, 'b'),
    (8, 'b'),
    (9, 'b'),
    (10, 'a'),
    (11, 'a'),
    (12, 'a'),
    (13, 'a')]

# Day 3 we get loads of updated old data 
delta_day_3 = [
    (1, 'b'),
    (2, 'b'),
    (3, 'b'),
    (4, 'b'),
    (5, 'b'),
    (6, 'c'),
    (7, 'c'),
    (8, 'c'),
    (9, 'c'),
    (10, 'b'),
    (11, 'b'),
    (12, 'b'),
    (13, 'b'),
    (14, 'a'),
    (15, 'a')
]

## Day 1 (2018-01-01)
Dataset is created and day1 is written into dataset

In [7]:
spark.createDataFrame(delta_day_1, in_schema).createOrReplaceTempView('df_1')

# Create a view of our df_1 data with added start_date, end_date and partition_bin columns
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW df_1
AS SELECT *,
to_date('2018-01-01') AS start_date,
to_date('2999-01-01') AS end_date,
FLOOR(id/5) AS partition_bin
FROM df_1
""")

# spark.sql("""
# SELECT *, to_date('2018-01-01')
# """)
# df_1.withColumn('start_date', '2018-01-01')

DataFrame[]

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS scd2 (
    id INT,
    dummy_contents STRING,
    start_date DATE,
    end_date DATE,
    partition_bin INT
)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'data/scd2'
""")

In [12]:
spark.sql("""
INSERT OVERWRITE TABLE scd2 PARTITION (partition_bin)
SELECT * FROM df_1
""")

DataFrame[]

In [14]:
spark.sql("SELECT * from scd2").show()

+---+--------------+----------+----------+-------------+
| id|dummy_contents|start_date|  end_date|partition_bin|
+---+--------------+----------+----------+-------------+
|  7|             a|2018-01-01|2999-01-01|            1|
|  8|             a|2018-01-01|2999-01-01|            1|
|  9|             a|2018-01-01|2999-01-01|            1|
|  3|             a|2018-01-01|2999-01-01|            0|
|  4|             a|2018-01-01|2999-01-01|            0|
|  1|             a|2018-01-01|2999-01-01|            0|
|  2|             a|2018-01-01|2999-01-01|            0|
|  5|             a|2018-01-01|2999-01-01|            1|
|  6|             a|2018-01-01|2999-01-01|            1|
+---+--------------+----------+----------+-------------+



## Day 2 (2018-01-02)
We add new day_2 data and apply SCD to current table

In [24]:
spark.createDataFrame(delta_day_2, in_schema).createOrReplaceTempView('df_2')

# Create a view of our df_1 data with added start_date, end_date and partition_bin columns
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW df_2
AS SELECT *,
to_date('2018-02-01') AS start_date,
to_date('2999-01-01') AS end_date,
FLOOR(id/5) AS partition_bin
FROM df_2
""")

spark.sql("SELECT * FROM df_2").show()

+---+--------------+----------+----------+-------------+
| id|dummy_contents|start_date|  end_date|partition_bin|
+---+--------------+----------+----------+-------------+
|  7|             b|2018-02-01|2999-01-01|            1|
|  8|             b|2018-02-01|2999-01-01|            1|
|  9|             b|2018-02-01|2999-01-01|            1|
| 10|             a|2018-02-01|2999-01-01|            2|
| 11|             a|2018-02-01|2999-01-01|            2|
| 12|             a|2018-02-01|2999-01-01|            2|
| 13|             a|2018-02-01|2999-01-01|            2|
+---+--------------+----------+----------+-------------+



In [35]:
# Append df_2 with any data in scd2 that matches df_2 partitions
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW df_2_update AS
SELECT scd2.*
FROM scd2,
(
    SELECT DISTINCT partition_bin
    FROM df_2
) as p
WHERE scd2.partition_bin = p.partition_bin
UNION ALL
SELECT df_2.*
FROM df_2
""")

# RECALCULATE SCD2 FOR OUR UPDATE TABLE
spark.sql("""
CREATE OR REPLACE TEMPORARY TABLE df_2_update AS
SELECT id, dummy_contents, start_date,
to_date(lead(start_date, 1, '2999-01-01') OVER (PARTITION BY id ORDER BY start_date)) AS end_date,
partition_bin
FROM df_2_update
""")
          
spark.sql("SELECT * FROM df_2_update").show()

+---+--------------+----------+----------+-------------+
| id|dummy_contents|start_date|  end_date|partition_bin|
+---+--------------+----------+----------+-------------+
| 12|             a|2018-02-01|2999-01-01|            2|
| 13|             a|2018-02-01|2999-01-01|            2|
|  6|             a|2018-01-01|2999-01-01|            1|
|  5|             a|2018-01-01|2999-01-01|            1|
|  9|             a|2018-01-01|2018-02-01|            1|
|  9|             b|2018-02-01|2999-01-01|            1|
|  8|             a|2018-01-01|2018-02-01|            1|
|  8|             b|2018-02-01|2999-01-01|            1|
|  7|             a|2018-01-01|2018-02-01|            1|
|  7|             b|2018-02-01|2999-01-01|            1|
| 10|             a|2018-02-01|2999-01-01|            2|
| 11|             a|2018-02-01|2999-01-01|            2|
+---+--------------+----------+----------+-------------+



In [37]:
spark.sql("""
CREATE TABLE scd2_update
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'data/scd2_update'
SELECT * FROM df_2_update
""")

DataFrame[]

In [38]:
spark.sql("""
INSERT OVERWRITE TABLE scd2 PARTITION (partition_bin)
SELECT * FROM scd2_update
""")

DataFrame[]

In [39]:
spark.sql('SELECT * FROM scd2').show()

+---+--------------+----------+----------+-------------+
| id|dummy_contents|start_date|  end_date|partition_bin|
+---+--------------+----------+----------+-------------+
|  9|             a|2018-01-01|2018-02-01|            1|
|  9|             b|2018-02-01|2999-01-01|            1|
|  8|             a|2018-01-01|2018-02-01|            1|
|  8|             b|2018-02-01|2999-01-01|            1|
|  7|             a|2018-01-01|2018-02-01|            1|
|  7|             b|2018-02-01|2999-01-01|            1|
| 11|             a|2018-02-01|2999-01-01|            2|
| 13|             a|2018-02-01|2999-01-01|            2|
| 12|             a|2018-02-01|2999-01-01|            2|
|  3|             a|2018-01-01|2999-01-01|            0|
|  4|             a|2018-01-01|2999-01-01|            0|
|  1|             a|2018-01-01|2999-01-01|            0|
|  2|             a|2018-01-01|2999-01-01|            0|
|  5|             a|2018-01-01|2999-01-01|            1|
|  6|             a|2018-01-01|