# Spark Stractured Streaming
![alt text](pics/demo2.png )

The motivation of this project is to provide ability of processing data in **real-time**
 from various sources like openmrs, eid, e.t.c

https://spark.apache.org/docs/2.3.3/structured-streaming-kafka-integration.html#deploying

https://mtpatter.github.io/bilao/notebooks/html/01-spark-struct-stream-kafka.html

http://www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/

## Set up Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder \
            .appName("Spark Structured Streaming from Kafka") \
            .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.3") \
            .config('spark.executor.memory', '10G')\
            .config('spark.driver.memory', '10G')\
            .config('spark.driver.maxResultSize', '10G')\
            .getOrCreate()
 
spark

## Connection to Kakfa
A Kafka topic can be viewed as an infinite stream where data is retained for a configurable amount of time. The infinite nature of this stream means that when starting a new query, we have to first decide what data to read and where in time we are going to begin. At a high level, there are three choices:

- earliest — start reading at the beginning of the stream. This excludes data that has already been deleted from Kafka because it was older than the retention period (“aged out” data).
- latest — start now, processing only new data that arrives after the query has started.

<img src="https://databricks.com/wp-content/uploads/2017/04/kafka-topic.png" width="300">

In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
obs_schema = StructType([
    StructField('obs_id', LongType(), True),
    StructField('voided', BooleanType(), True),
    StructField('concept_id', IntegerType(), True),
    StructField('obs_datetime', TimestampType(), True),
    StructField('value', StringType(), True),
    StructField('value_type', StringType(), True),
    StructField('obs_group_id', IntegerType(), True),
    StructField('parent_concept_id', IntegerType(), True)
])


### Patient Stream

In [3]:
patient= StructType([
    StructField('patient_id', LongType(), True),
    StructField('date_created', LongType(), True),
    StructField('allergy_status', StringType(), True),
    StructField('creator', LongType(), True)
])

patientSchema= StructType([
                StructField('schema', StringType()),
                StructField('payload', 
                           StructType([
                                StructField('before', StringType()),
                                StructField('after', patient)
                           ])
                           )
            ])

jsonOptions = { "timestampFormat": "yyyy-MM-dd'T'HH:mm:ss.sss'Z'" }

patientDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.openmrs.patient") \
    .option("startingOffsets", "earliest") \
    .load()\
    .select(f.from_json(f.col("value").cast("string"), patientSchema, jsonOptions).alias("parsed_value"),\
            f.col("timestamp").alias("patient_timestamp"))\
    .withWatermark("patient_timestamp", "10 seconds ")   # max 10 seconds late
    
patientDF.createOrReplaceTempView("patients")

print(patientDF.printSchema())

# display on console
patientDF.select("parsed_value.payload.after.*", "patient_timestamp").writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .start()



root
 |-- parsed_value: struct (nullable = true)
 |    |-- schema: string (nullable = true)
 |    |-- payload: struct (nullable = true)
 |    |    |-- before: string (nullable = true)
 |    |    |-- after: struct (nullable = true)
 |    |    |    |-- patient_id: long (nullable = true)
 |    |    |    |-- date_created: long (nullable = true)
 |    |    |    |-- allergy_status: string (nullable = true)
 |    |    |    |-- creator: long (nullable = true)
 |-- patient_timestamp: timestamp (nullable = true)

None


<pyspark.sql.streaming.StreamingQuery at 0x7f48305030f0>

![alt text](pics/patient.png )

### Person Stream

In [4]:
person= StructType([
    StructField('person_id', LongType(), True),
    StructField('gender', StringType(), True),
    StructField('birthdate', LongType(), True)
])

personSchema= StructType([
                StructField('schema', StringType()),
                StructField('payload', 
                           StructType([
                                StructField('before', StringType()),
                                StructField('after', person)
                           ])
                           )
            ])

jsonOptions = { "timestampFormat": "yyyy-MM-dd'T'HH:mm:ss.sss'Z'" }

personDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.openmrs.person") \
    .option("startingOffsets", "earliest") \
    .load()\
    .select(f.from_json(f.col("value").cast("string"), personSchema, jsonOptions).alias("parsed_value"),\
            f.col("timestamp").alias("person_timestamp"))\
    .withWatermark("person_timestamp", "10 seconds ")   # max 10 seconds late
    
personDF.createOrReplaceTempView("person")

print(personDF.printSchema())

# display on console
personDF.select("parsed_value.payload.after.*", "person_timestamp").writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .start()


root
 |-- parsed_value: struct (nullable = true)
 |    |-- schema: string (nullable = true)
 |    |-- payload: struct (nullable = true)
 |    |    |-- before: string (nullable = true)
 |    |    |-- after: struct (nullable = true)
 |    |    |    |-- person_id: long (nullable = true)
 |    |    |    |-- gender: string (nullable = true)
 |    |    |    |-- birthdate: long (nullable = true)
 |-- person_timestamp: timestamp (nullable = true)

None


<pyspark.sql.streaming.StreamingQuery at 0x7f4830516b00>

![alt text](pics/person.png )

### Join the 2 streams with watermark
![alt text](pics/join-streams.png )
Watermarking a stream decides how delayed a record can arrive and gives a timeline after which the records can be dropped. For example, if you set a watermark for 30 minutes, then records older than 30 minutes will be dropped/ignored. When you inner join two streaming datasets watermarking and time constraint is optional. If watermark and time constraints are not specified then data is stored in the state indefinitely. Setting watermark on both sides and time constraint will enable state cleanup accordingly. Ref: https://dzone.com/articles/spark-structured-streaming-joins

In [5]:
personPatientDF = personDF.select("parsed_value.payload.after.*", "person_timestamp").join(
  patientDF.select("parsed_value.payload.after.*", "patient_timestamp"),
  f.expr(""" 
   person_id = patient_id AND 
    person_timestamp >= patient_timestamp AND 
    person_timestamp <= patient_timestamp + interval 10 minutes    
    """
  )
)

personPatientDF.createOrReplaceTempView("personPatient")

print(personPatientDF.printSchema())

result = personPatientDF.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .start()


root
 |-- person_id: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: long (nullable = true)
 |-- person_timestamp: timestamp (nullable = true)
 |-- patient_id: long (nullable = true)
 |-- date_created: long (nullable = true)
 |-- allergy_status: string (nullable = true)
 |-- creator: long (nullable = true)
 |-- patient_timestamp: timestamp (nullable = true)

None


![alt text](pics/join.png )


### Perform Some Aggregations

In [15]:
sql="""
        SELECT
          -- WINDOW(FROM_UNIXTIME(date_created/1000), "1 hour", "1 hour") AS eventWindow,
          WINDOW(patient_timestamp, "1 hour", "1 hour") AS eventWindow,
          creator,
          Count(patient_id) AS num_events,
          approx_count_distinct(patient_id) AS num_dist_patients,
          AVG(patient_id) AS avgAge,
          MIN(patient_id) AS minAge,
          MAX(patient_id) AS maxAge
        FROM
          personPatient
        GROUP BY
          eventWindow,
          creator
  """
    
query = spark.sql(sql)

# show results
result = query\
    .writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .start()


#result.awaitTermination()

![alt text](pics/console.png )