# Spark Streaming and Senzing

Intro goes here

### Steps in this tutorial

TODO what are all the steps

## Set up requirements

Run senzing/serve-grpc with `docker run -it --publish 8261:8261 --rm senzing/serve-grpc`

https://github.com/senzing-garage/serve-grpc/tree/main 

In [None]:
import grpc
from senzing import SzEngineFlags, SzError
from senzing_grpc import SzAbstractFactoryGrpc
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import json
import os
import requests
import shutil
import time

In [None]:
# add data download code here, check if files exist

## Create separate json files to simulate streaming

In [None]:
def create_streaming_files(input_file, output_dir, n_rows=10):
    
    os.makedirs(output_dir, exist_ok=True)
    
    with open(input_file, 'r') as f:
        for i, line in enumerate(f):
            if i >= n_rows:
                break
                
            try:
                record = json.loads(line)
                filename = f"{output_dir}/record_{record['RECORD_ID']}.json"
                
                with open(filename, 'w') as out_file:
                    json.dump(record, out_file)
                    
                print(f"Created {filename}")
                
            except json.JSONDecodeError as e:
                print(f"Error parsing line {i}: {e}")
                continue

In [None]:
create_streaming_files('data/customers.json', 'data/streaming', 20)

## Configure Senzing

In [None]:
grpc_channel = grpc.insecure_channel("localhost:8261")
sz_abstract_factory = SzAbstractFactoryGrpc(grpc_channel)

In [None]:
sz_product = sz_abstract_factory.create_product()
print(json.dumps(json.loads(sz_product.get_version()), indent=2))

In [None]:
sz_configmanager = sz_abstract_factory.create_configmanager()
sz_diagnostic = sz_abstract_factory.create_diagnostic()
sz_engine = sz_abstract_factory.create_engine()

In [None]:
config_id = sz_configmanager.get_default_config_id()
sz_config = sz_configmanager.create_config_from_config_id(config_id)

In [None]:
sz_config.register_data_source('CUSTOMERS')

In [None]:
new_json_config = sz_config.export()
new_config_id = sz_configmanager.register_config(new_json_config, "Spark Streaming")
sz_configmanager.replace_default_config_id(config_id, new_config_id)

In [None]:
sz_abstract_factory.reinitialize(new_config_id)

## Set up Spark streaming functions


In [None]:
spark = SparkSession.builder \
    .appName("Senzing Streaming") \
    .master("local[*]") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .getOrCreate()

In [None]:
# schema makes it faster
customers_schema = StructType([
    StructField("DATA_SOURCE", StringType(), True),
    StructField("RECORD_ID", StringType(), True),
    StructField("RECORD_TYPE", StringType(), True),
    StructField("PRIMARY_NAME_ORG", StringType(), True),
    StructField("SECONDARY_NAME_ORG", StringType(), True),
    StructField("PRIMARY_NAME_FULL", StringType(), True),
    StructField("NATIVE_NAME_FULL", StringType(), True),
    StructField("PRIMARY_NAME_LAST", StringType(), True),
    StructField("PRIMARY_NAME_FIRST", StringType(), True),
    StructField("PRIMARY_NAME_MIDDLE", StringType(), True),
    StructField("GENDER", StringType(), True),
    StructField("DATE_OF_BIRTH", StringType(), True),
    StructField("PASSPORT_NUMBER", StringType(), True),
    StructField("PASSPORT_COUNTRY", StringType(), True),
    StructField("DRIVERS_LICENSE_NUMBER", StringType(), True),
    StructField("DRIVERS_LICENSE_STATE", StringType(), True),
    StructField("SSN_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_COUNTRY", StringType(), True),
    StructField("ADDR_TYPE", StringType(), True),
    StructField("ADDR_FULL", StringType(), True),
    StructField("ADDR_LINE1", StringType(), True),
    StructField("ADDR_CITY", StringType(), True),
    StructField("ADDR_STATE", StringType(), True),
    StructField("ADDR_POSTAL_CODE", StringType(), True),
    StructField("ADDR_COUNTRY", StringType(), True),
    StructField("PHONE_TYPE", StringType(), True),
    StructField("PHONE_NUMBER", StringType(), True),
    StructField("EMAIL_ADDRESS", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("CATEGORY", StringType(), True),
    StructField("AMOUNT", StringType(), True)
])

In [None]:
# read one file at a time from a folder to simulate streaming

streaming_df = spark \
    .readStream \
    .schema(customers_schema) \
    .option("maxFilesPerTrigger", 1)  \
    .json('data/streaming')

## Add records to Senzing and to Spark dataframe

In [None]:
def get_affected_entities(info_string):
    # helper function to extract the entity id
    info = json.loads(info_string)
    return [entity['ENTITY_ID'] for entity in info['AFFECTED_ENTITIES']]

In [None]:
# save affected entity to set as in other tutorial
affected_entities = set()

In [None]:
def process_streaming_batch(batch_df, batch_id):
    
    if batch_df.count() == 0:
        return

    #TODO this is not getting triggered

    print(f"Processing batch {batch_id} with {batch_df.count()} records")
    
    for row in batch_df.rdd.toLocalIterator():
        record = {k: v for k, v in row.asDict().items() if v is not None}
        
        info = sz_engine.add_record(
            record['DATA_SOURCE'],
            record['RECORD_ID'], 
            record,
            SzEngineFlags.SZ_WITH_INFO,
        )
        
        affected_entities.update(get_affected_entities(info))
        print(f"Added record {record['RECORD_ID']}")

In [None]:
streaming_query = streaming_df \
    .writeStream \
    .foreachBatch(process_streaming_batch) \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .start()

In [None]:
affected_entities

## Process REDO records

In [None]:
while True:
    redo_record = sz_engine.get_redo_record()
    if not redo_record:
        break
    info = sz_engine.process_redo_record(redo_record, flags=SzEngineFlags.SZ_WITH_INFO)
    affected_entities.update(get_affected_entities(info))
    print(info)

## Show dashboard?

In [None]:
# check things were added to Senzing!

search_query = {
    "name_full": "robert smith",
    "date_of_birth": "11/12/1978",
}
search_result = sz_engine.search_by_attributes(json.dumps(search_query))
print(json.dumps(json.loads(search_result), indent=2))

In [None]:
spark.stop()