# Streaming ETL on Healthcare Payment Data using Structured Streaming

While batched data can provide powerful insights by identifying short- and long-term trends, healthcare providers can combine streaming data with real-time processing to create actionable insights on a minute-by-minute basis.

However, building production-grade continuous applications can be challenging, as developers need to overcome many obstacles, including:

1. Providing end-to-end reliability

The systems must be resilient to failures by ensuring that outputs are consistent with results processed in batch. Additionally, unusual activities (e.g failures in upstream components, traffic spikes, etc.) must be continuously monitored and automatically mitigated to ensure highly available insights are delivered in real-time.

2. Performing complex transformations 
Data arrives in a various formats (CSV, JSON, etc.). We often must restructure, transform the data before it being consumed. Such restructuring requires that all the traditional tools from batch processing systems are available, but without adding latencies.

3. Handling late or out of order data
Payment data can arrive late or out-of-order. As a result, aggregations and other complex computations must be continuously (and accurately) revised as new information arrives.

4. Integrating with other systems
Information originates from a variety of sources (Kafka, HDFS, S3, etc), which must be integrated to see the complete picture.

## 1. where is your input and your final parquet table?

### 1. specify the location of the CloudTrail logs files.

### 2. specify the output path of parquet table


In [3]:
InputPath = "s3n://MY_INPUTPUT_PATH"
parquetOutputPath = "/MY_OUTPUT_PATH"  # DBFS or S3 path 

## 2. the schema of the data?
EMR data are JSON files. 
It is essentially an array (named Payment) of fields related to payment information, some of which are nested structures.


In [4]:
from pyspark.sql.functions import *
from pyspark.sql.streaming import ProcessingTime
from pyspark.sql.types import *
from datetime import datetime

ETLPaymentSchema = StructType() \
  .add("payment", ArrayType(StructType() \
    .add("date_payment", StringType()) \
    .add("record_id", StringType()) \
    .add("payer", StringType()) \
    .add("amount", DoubleType()) \
    .add("physician_id", StringType()) \
    .add("physician_specialty", StringType()) \
    .add("eventName", StringType()) \
    .add("eventSource", StringType()) \
    .add("eventTime", StringType()) \
    .add("eventType", StringType()) \
    .add("eventVersion", StringType()) \
    .add("readOnly", BooleanType()) \
    .add("recipientAccountId", StringType()) \
    .add("requestID", StringType()) \
    .add("requestParameters", MapType(StringType(), StringType())) \
    .add("resources", ArrayType(StructType() \
      .add("ARN", StringType()) \
      .add("accountId", StringType()) \
      .add("type", StringType()) \
    )) \
    .add("responseElements", MapType(StringType(), StringType())) \
    .add("sharedEventID", StringType()) \
    .add("sourceIPAddress", StringType()) \
    .add("serviceEventDetails", MapType(StringType(), StringType())) \
    .add("userAgent", StringType()) \
    .add("userIdentity", StructType() \
      .add("accessKeyId", StringType()) \
      .add("accountId", StringType()) \
      .add("arn", StringType()) \
      .add("invokedBy", StringType()) \
      .add("principalId", StringType()) \
      .add("sessionContext", StructType() \
        .add("attributes", StructType() \
          .add("creationDate", StringType()) \
          .add("mfaAuthenticated", StringType()) \
        ) \
        .add("sessionIssuer", StructType() \
          .add("accountId", StringType()) \
          .add("arn", StringType()) \
          .add("principalId", StringType()) \
          .add("type", StringType()) \
          .add("userName", StringType()) \
        )
      ) \
      .add("type", StringType()) \
      .add("userName", StringType()) \
      .add("webIdFederationData", StructType() \
        .add("federatedProvider", StringType()) \
        .add("attributes", MapType(StringType(), StringType())) \
      )
    ) \
    .add("vpcEndpointId", StringType())))

ImportError: cannot import name ProcessingTime

a new structure,
which is an array
inside of the arry it is a new structure
w/ "additionalEventData", which is a string type
w/......
w/ "resources", which is an ArrayType( witnew StructType()

## 3. streaming ETL
### 1. create the streaming DataFrame that represents the raw records in the files

set maxFilesPerTrigger to get earlier access the final Parquet data, as this limit the number of log files processed and written out every trigger.

### 2. Transform the data
Explode (split) the array of records loaded from each file into separate records.

Parse the string event time string in each record to Spark’s timestamp type.

Flatten out the nested columns for easier querying.

function: https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/functions.html

### 3. Output the data and start stream query

Write the data out in the Parquet format,

Define the date column from that timestamp and partition the Parquet data by date for efficient time-slice queries.

Define the trigger to be every 10 seconds.

Define the checkpoint location.

Finally, start the query.

In [None]:
# data frame for the raw records
rawRecords = spark.readStream \
  .option("maxFilesPerTrigger", "100") \
  .schema(ETLPaymentSchema) \
  .json(InputPath)

# essientially, the rawRecords is a data frame that new records added 
# to the stream are like rows being appended to the table
# This allows us to treat both batch and streaming data as tables. 

In [None]:
# tranform the raw data
PaymentData = rawRecords \
  .select(explode("Records").alias("record")) \
  .select(
    unix_timestamp("record.eventTime", "yyyy-MM-dd'T'hh:mm:ss").cast("timestamp").alias("timestamp"),
    "record.*")


In [None]:
checkpointPath = "/cloudtrail.checkpoint/
# Recovering from Failures to get Exactly-once Fault-tolerance Guarantees


streamingETLQuery = cloudTrailEvents \
  .withColumn("date", cloudTrailEvents.timestamp.cast("date")) \
  .writeStream \
  .format("parquet") \
  .option("path", parquetOutputPath) \
  .partitionBy("date") \
  .trigger(processingTime="10 seconds") \
  .option("checkpointLocation", checkpointPath) \
  .start()

## Query up-to-the-minute data from Parquet Table
While the streamingETLQuery is continuously converting the data to Parquet, we can already start running ad-hoc queries on the Parquet table. Your queries will always pick up the latest written files while ensuring data consistency.

In [None]:
# running example query
parquetData = sql("select * from parquet.`{}`".format(parquetOutputPath))
display(parquetData)

In [None]:
sql("select * from parquet.`{}`".format(parquetOutputPath)).count()

Structured Streaming makes it easy to convert these periodic batch jobs to a real-time data pipeline. 

1. Filter, transform, and clean up data 

Raw data is naturally messy and needs to be cleaned up to fit into a well-defined structured format. For example, parsing timestamp strings to date/time types for faster comparisons, filtering corrupted data, nesting/unnesting/flattening complex structures to better organize important columns, etc.

2. Convert to a more efficient storage format

Text, JSON and CSV data are easy to generate and are human readable, but are very expensive to query. Converting it to more efficient formats like Parquet can reduce file size and improve processing speed.

3. Partition data by important columns 

By partitioning the data based on the value of one or more columns, common queries can be answered more efficiently by reading only the relevant fraction of the total dataset.

