# Task 1: Build & Test ETL Job Declaratively
### Instructon:
### 1. Run each selected cell by clicking on the triangle `run` button above
### 2. Complete the missing cell in secton 2.1
### 3. Query the final output table in Athena, write the answer to the challenge page.

In [None]:
%conf 
numRows=12
showLog=true

# 1. Initial Table Load (ETL Job 1)

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract initial contact table",
  "environments": ["dev", "test"],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/initial_contacts.csv",
  "outputView": "initial_raw",            
  "delimiter": "Comma",
  "header": false,
  "quote": "None",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 1.2 Check Original Data Schema

In [None]:
%printschema 
initial_raw

## 1.3 Apply Data Type

In [None]:
{
  "type": "TypingTransform",
  "name": "apply table schema to CSV",
  "environments": ["dev", "test"],
  "schemaURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/meta/contact_meta_0.json",
  "inputView": "initial_raw",            
  "outputView": "initial_typed",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 1.4 Check Typed Table Schema

In [None]:
%printschema 
initial_typed

## 1.5 Add Calculated Fields for SCD Type 2
### Pass in CURRENT_TIMESTAMP as a parameter, when the ETL job is triggered

In [None]:
%env
ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()

In [None]:
%sql outputView="initial_load" name="add calc field for SCD" environments=dev,test sqlParams=table_name=initial_typed,ETL_CONF_CURRENT_TIMESTAMP=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state,${ETL_CONF_CURRENT_TIMESTAMP} AS valid_from,CAST(null AS timestamp) AS valid_to
,1 AS iscurrent,md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

## 1.6 Full table load to Delta Lake
### [Delta Lake](https://delta.io/) is an open source storage layer to support Time Travel, ACID transaction in Data Lake

In [None]:
{
  "type": "DeltaLakeLoad",
  "name": "Initial full table load to s3 in parquet format",
  "environments": ["dev", "test"],
  "inputView": "initial_load",
  "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/",
  "numPartitions": 2
  "saveMode": "Overwrite",
  "authentication": {
     "method": "AmazonIAM"
  }
}

# Process Incremental Data Change (ETL Job 2)

## 2. Ingest a CDC CSV file that contains delta change
### Look at record ID 12, the `state` is changed

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract incremental data change",
  "environments": ["dev", "test"],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/update_contacts.csv",
  "outputView": "cdc_raw",            
  "delimiter": "Comma",
  "header": false,
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.1 Apply Data Type (build your transformation here)
### ======== TASK #1 ==========
### Tip: copy content from `section 1.3`, correct the `inputView` and `outputView` attributes that are mentioned in section 2 and in section 2.2.

In [None]:
{
  `Fill in this block with your JSON defintion`
  ?????????
  ?????????
  ?????????
}

## 2.2 Add Calculated Fields (same sql script)

In [None]:
%env
ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()

In [None]:
%sql outputView="cdc_load" name="add calc field for CDC" environments=dev,test sqlParams=table_name=cdc_typed,ETL_CONF_CURRENT_TIMESTAMP=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state, ${ETL_CONF_CURRENT_TIMESTAMP} AS valid_from, CAST(null AS timestamp) AS valid_to
,1 AS iscurrent, md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

# SCD Type2 Merge Operation (ETL Job 3)

## 3. Prepare Datasets for SCD Type2 Insert

The 'null' mergeKey helps you insert a changed record as a new row based on the SCD type 2 rule

In [None]:
%sql outputView="staged_update" name="generate extra rows for SCD" environments=dev,test

SELECT NULL AS mergeKey, new.*
FROM initial_load old
INNER JOIN cdc_load new
ON old.id = new.id
WHERE old.iscurrent=true
AND old.checksum<>new.checksum

UNION

SELECT id AS mergeKey, *
FROM cdc_load

## 3.2. Perform the MERGE operation for SCD type 2
- source = CDC table (staged_update)
- target = Initial snapshot table

In [None]:
{
  "type": "DeltaLakeMergeLoad",
  "name": "insert/update existing data and output to target Delta Lake",
  "environments": ["dev","test"],
  "inputView": "staged_update",
  "numPartitions": 2,
  "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/"
  "condition": "source.mergeKey = target.id",
  "whenMatchedUpdate": {
    "condition": "target.iscurrent = true AND source.checksum <> target.checksum",
    "values": {
      "valid_to": ${ETL_CONF_CURRENT_TIMESTAMP},
      "iscurrent": false
    }
  },
  //merge key is null
  "whenNotMatchedByTargetInsert": {}
}

## 3.3. Make the output table queryable in Athena
- Build up a Glue data catalog from Athena. 
- Use token based authentication to access Athena. 

In [None]:
{
  "type": "JDBCExecute",
  "name": "Create glue data catalog",
  "environments": [
    "dev",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/sql/create_table_contact.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion="${AWS_DEFAULT_REGION}";S3OutputLocation=s3://"${ETL_CONF_DATALAKE_LOC}"/athena-query-result;AwsCredentialsProviderClass=com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
  "sqlParams":{
    "datalake_loc": "'s3://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/_symlink_format_manifest/'",
    "table_name": "default.deltalake_contact_jupyter"
  }
}

### ======== TASK #1 ==========
### How many records are expired?
### Query the new table in [Athena](https://console.aws.amazon.com/athena/home), submit your answer to complete the task.

## 4. (OPTONAL) Query Delta Lake from S3
To skip the followng test cells in automated ETL process, use a fake environment `uat`

In [None]:
{
  "type": "DeltaLakeExtract",
  "name": "read contact Delta Lake table",
  "description": "read contact table",
  "environments": [
    "uat"
  ],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/",
  "outputView": "contact"
}

## 4.1 (OPTIONAL) View one of the changed records

In [None]:
%sql outputView="validate_type2" name="validate_type2" environments=uat
SELECT * FROM contact WHERE id=12