In [None]:
%conf
numRows=3

## 1. Extract NYC taxi data

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract monthly data from green_tripdata",
  "environments": ["dev", "test"],
  "inputURI": "s3a://nyc-tlc/trip*data/green_tripdata_2013-08.csv",
  "outputView": "green_tripdata0_raw",            
  "delimiter": "Comma",
  "quote" : "DoubleQuote",
  "header": true,
  "persist": true
}

## 1.2 Check source data schema

In [None]:
%printschema
green_tripdata0_raw

## 2. Convert Data Type

### Apply original data type to CSV text, based on meta data provided. 
for example: timestamp

In [None]:
{
  "type": "TypingTransform",
  "name": "apply green_tripdata initial schema 0",
  "environments": ["dev", "test"],
  "schemaURI": "s3a://"${ETL_CONF_INPUT_LOC}"/meta/green_tripdata_meta0.json",
  "inputView": "green_tripdata0_raw",            
  "outputView": "green_tripdata0",
  "persist": false,
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.1 Check typed data schema

In [None]:
%printschema 
green_tripdata0

## 3. SQL-based data validation

### Make sure the type conversion is error free. 
note: if the result return error or reach certain threshold, can decide to fail the ETL pipeline at a early stage.

In [None]:
%sqlvalidate name="sqlvalidate" description="ensure no errors exist after data typing transformation" environments=dev,test sqlParams=inputView=green_tripdata0 

SELECT
  SUM(error) = 0 AS valid
  ,TO_JSON(NAMED_STRUCT(
        'count', COUNT(error), 
        'errors', SUM(error)
      )
  ) AS message
FROM (
  SELECT 
    CASE 
      WHEN SIZE(_errors) > 0 THEN 1 
      ELSE 0 
    END AS error 
  FROM ${inputView}
) input_table

## 4. Output typed data to Data Lake
### Delta Lake is an optimized data lake with ACID transaction support

In [None]:
{
  "type": "DeltaLakeLoad",
  "name": "write green_tripdata0 to Data Lake",
  "environments": ["dev", "test"],
  "inputView": "green_tripdata0",
  "outputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/output/green_tripdata0",
  "saveMode": "Append",
  "partitionBy": [
    "vendor_id"
  ],
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 5. SQL-based: list areas with high pickup rates

In [None]:
%sql outputView="high_pickup" description="Group location coordinates by number of pickups" environments=dev,test sqlParams=inputView=green_tripdata0

SELECT count("_index") AS pickup_cnt ,sum(Passenger_count) AS Passenger_count, pickup_longitude, pickup_latitude
FROM ${inputView}
WHERE pickup_longitude is not null
GROUP BY pickup_longitude, pickup_latitude
ORDER BY pickup_cnt DESC

## 6. SQL-based: average trip duration to JFK by pickup hour

In [None]:
%sql outputView="trip_duration" description="Average trip duration to JFK by pickup hour" environments=dev,test sqlParams=inputView=green_tripdata0 

SELECT CAST(avg(trip_duration) as DECIMAL(18,2)) AS avg_trip_duration, HOUR(lpep_pickup_datetime) AS pickup_hour
FROM
(
  SELECT (unix_timestamp(lpep_dropoff_datetime) - unix_timestamp(lpep_pickup_datetime))/60 AS trip_duration
        ,lpep_pickup_datetime
        ,lpep_dropoff_datetime
  FROM ${inputView}
  WHERE dropoff_latitude BETWEEN 40.640668 AND 40.651381
  AND dropoff_longitude BETWEEN -73.794694 AND -73.776283
 )
GROUP BY pickup_hour
ORDER BY avg_trip_duration

## 7. Write reporting datasets to Data Lake

In [None]:
{
  "type": "ParquetLoad",
  "name": "Output high pickup rate dataset to Data Lake",
  "environments": ["dev", "test"],
  "inputView": "high_pickup",
  "outputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/output/high_pickup",
  "saveMode": "Overwrite",
  "numPartitions":1, 
  "authentication": {
     "method": "AmazonIAM"
  }
}

In [None]:
{
  "type": "ParquetLoad",
  "name": "Output avg trip duration dataset to Data Lake",
  "environments": ["dev", "test"],
  "inputView": "trip_duration",
  "outputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/output/trip_duration",
  "saveMode": "Overwrite",
  "numPartitions":1,
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 8. Build metadata for direct query (OPTIONAL)
Create data catalog in Glue via Athena. 
NOTE: Before running the following block, input correct Athena credential in your Secrets Manager, then delete a jupyter task in ECS console to refresh the access key pair.

In [None]:
{
  "type": "JDBCExecute",
  "name": "Create high pickup data catalog",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/appcode/sql/create_table_highpickup.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion=ap-southeast-2;S3OutputLocation=s3://aws-athena-query-results-"${ETL_CONF_INPUT_LOC}"/;User="${ATHENA_ETL_CONF_S3A_ACCESS_KEY}";Password="${ATHENA_ETL_CONF_S3A_SECRET_KEY},
  "sqlParams":{
    "datalake_loc": "'s3://"${ETL_CONF_INPUT_LOC}"/output/high_pickup'",
    "table_name": "default.test_high_pickup"
  }
}

In [None]:
{
  "type": "JDBCExecute",
  "name": "Create avg duration data catalog",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/appcode/sql/create_table_duration.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion=ap-southeast-2;S3OutputLocation=s3://aws-athena-query-results-"${ETL_CONF_INPUT_LOC}"/;User="${ATHENA_ETL_CONF_S3A_ACCESS_KEY}";Password="${ATHENA_ETL_CONF_S3A_SECRET_KEY},
  "sqlParams":{
    "datalake_loc": "'s3://"${ETL_CONF_INPUT_LOC}"/output/trip_duration'",
    "table_name": "default.test_trip_duration"
  }
}

In [None]:
{
  "type": "JDBCExecute",
  "name": "Create green trip data catalog",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/appcode/sql/create_table.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion=ap-southeast-2;S3OutputLocation=s3://aws-athena-query-results-"${ETL_CONF_INPUT_LOC}"/;User="${ATHENA_ETL_CONF_S3A_ACCESS_KEY}";Password="${ATHENA_ETL_CONF_S3A_SECRET_KEY},
  "sqlParams":{
    "datalake_loc": "'s3://"${ETL_CONF_INPUT_LOC}"/output/green_tripdata0/_symlink_format_manifest'",
    "table_name": "default.test_green_tripdata0"
  }
}

In [None]:
{
  "type": "JDBCExecute",
  "name": "refresh table partitions",
  "environments": [
    "production",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_INPUT_LOC}"/appcode/sql/repair_table.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion=ap-southeast-2;S3OutputLocation=s3://aws-athena-query-results-"${ETL_CONF_INPUT_LOC}"/;User="${ATHENA_ETL_CONF_S3A_ACCESS_KEY}";Password="${ATHENA_ETL_CONF_S3A_SECRET_KEY},
  "sqlParams":{
    "table_name": "default.test_green_tripdata0"
  }
}