# Spark Streaming and Senzing

This notebook shows you how to process streaming data through Apache Spark and send it to Senzing for entity resolution, simulating a real-time data processing pipeline. If you haven't already gone through the `senzing_quickstart` tutorial in this repository, we recommend starting with that one because it contains more detailed explanations for each of the steps.

### Steps in this tutorial

1. Set up the Senzing gRPC server, download the `customer.json` data file and split it into 20 separate JSON files to simulate streaming data.
2. Configure the Senzing engine so it's ready to receive data.
3. Create a Spark session with streaming capabilities, define a schema and set up a streaming dataframe.
4. Implement a batch processing function that takes each streaming batch from Spark, sends individual records to Senzing for entity resolution, and tracks which entities are affected by each record addition.
5. Run a cleanup process to ensure the entities are as accurate as possible.


## Set up requirements

In this tutorial, we'll use the [`senzing`](https://garage.senzing.com/sz-sdk-python/index.html) and [`senzing_grpc`](https://garage.senzing.com/sz-sdk-python-grpc/) packages, in addition to PySpark. You can install all of these using the `requirements.txt` file in the repo folder containing this tutorial.

In [None]:
import grpc
from senzing import SzEngineFlags, SzError
from senzing_grpc import SzAbstractFactoryGrpc
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import json
import os
import requests
import shutil
import time

We'll start our [Senzing gRPC server](https://github.com/senzing-garage/serve-grpc/tree/main) using Docker.

Run the following command `docker run -it --publish 8261:8261 --rm senzing/serve-grpc` in a terminal window.

Then, we'll download the example data:

In [None]:

data_path = "./data/"
data_url_prefix = "https://raw.githubusercontent.com/Senzing/truth-sets/refs/heads/main/truthsets/demo/"
filename = "customers.json"

In [None]:
os.makedirs(data_path, exist_ok=True)

url = data_url_prefix + filename
filepath = data_path + filename
if not os.path.exists(filepath):
    response = requests.get(url, stream=True, timeout=10)
    response.raw.decode_content = True
    with open(filepath, "wb") as file:
        shutil.copyfileobj(response.raw, file)

## Create separate json files to simulate streaming

We'll use the `customers.json` dataset from the `spark_quickstart` tutorial, but this time we'll split it into separate json files. We'll extract the first 20 records in the `customers.json` file, and we'll save each record into a separate json file.

In [None]:
def create_streaming_files(input_file, output_dir, n_rows):
    
    os.makedirs(output_dir, exist_ok=True)
    
    with open(input_file, 'r') as f:
        for i, line in enumerate(f):
            if i >= n_rows:
                break
                
            try:
                record = json.loads(line)
                filename = f"{output_dir}/record_{record['RECORD_ID']}.json"
                
                with open(filename, 'w') as out_file:
                    json.dump(record, out_file)
                    
                print(f"Created {filename}")
                
            except json.JSONDecodeError as e:
                print(f"Error parsing line {i}: {e}")
                continue

In [None]:
create_streaming_files('data/customers.json', 'data/streaming', 20)

## Configure Senzing

Next, we'll configure the Senzing engine to accept the `customers` data, in the same way as the `spark_quickstart` tutorial.

In [None]:
grpc_channel = grpc.insecure_channel("localhost:8261")
sz_abstract_factory = SzAbstractFactoryGrpc(grpc_channel)

We'll check connectivity by getting the Senzing version:

In [None]:
sz_product = sz_abstract_factory.create_product()
print(json.dumps(json.loads(sz_product.get_version()), indent=2))

In [None]:
sz_configmanager = sz_abstract_factory.create_configmanager()
sz_diagnostic = sz_abstract_factory.create_diagnostic()
sz_engine = sz_abstract_factory.create_engine()

In [None]:
config_id = sz_configmanager.get_default_config_id()
sz_config = sz_configmanager.create_config_from_config_id(config_id)

This time, we'll only use a single data source:

In [None]:
sz_config.register_data_source('CUSTOMERS')

In [None]:
new_json_config = sz_config.export()
new_config_id = sz_configmanager.register_config(new_json_config, "Spark Streaming")
sz_configmanager.replace_default_config_id(config_id, new_config_id)

In [None]:
sz_abstract_factory.reinitialize(new_config_id)

## Set up Spark streaming functions


We'll start a new Spark session, create a schema, and then set up a stream reader from Spark's [Structured Streaming](https://spark.apache.org/docs/latest/streaming/index.html) engine. In the next section, we'll use a stream writer to send the data from the Spark streaming dataframe to Senzing.

First, we'll create a Spark session:

In [None]:
spark = SparkSession.builder \
    .appName("Senzing Streaming") \
    .master("local[*]") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .getOrCreate()

Providing a schema for our data makes sure that all the files have the correct information, and also speeds up the Spark stream reader.

In [None]:
customers_schema = StructType([
    StructField("DATA_SOURCE", StringType(), True),
    StructField("RECORD_ID", StringType(), True),
    StructField("RECORD_TYPE", StringType(), True),
    StructField("PRIMARY_NAME_ORG", StringType(), True),
    StructField("SECONDARY_NAME_ORG", StringType(), True),
    StructField("PRIMARY_NAME_FULL", StringType(), True),
    StructField("NATIVE_NAME_FULL", StringType(), True),
    StructField("PRIMARY_NAME_LAST", StringType(), True),
    StructField("PRIMARY_NAME_FIRST", StringType(), True),
    StructField("PRIMARY_NAME_MIDDLE", StringType(), True),
    StructField("GENDER", StringType(), True),
    StructField("DATE_OF_BIRTH", StringType(), True),
    StructField("PASSPORT_NUMBER", StringType(), True),
    StructField("PASSPORT_COUNTRY", StringType(), True),
    StructField("DRIVERS_LICENSE_NUMBER", StringType(), True),
    StructField("DRIVERS_LICENSE_STATE", StringType(), True),
    StructField("SSN_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_COUNTRY", StringType(), True),
    StructField("ADDR_TYPE", StringType(), True),
    StructField("ADDR_FULL", StringType(), True),
    StructField("ADDR_LINE1", StringType(), True),
    StructField("ADDR_CITY", StringType(), True),
    StructField("ADDR_STATE", StringType(), True),
    StructField("ADDR_POSTAL_CODE", StringType(), True),
    StructField("ADDR_COUNTRY", StringType(), True),
    StructField("PHONE_TYPE", StringType(), True),
    StructField("PHONE_NUMBER", StringType(), True),
    StructField("EMAIL_ADDRESS", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("CATEGORY", StringType(), True),
    StructField("AMOUNT", StringType(), True)
])

The stream reader uses this schema to write to a streaming dataframe. For this example, it reads one file at a time to simulate streaming, but you can easily change this to your input stream.

In [None]:
streaming_df = spark \
    .readStream \
    .schema(customers_schema) \
    .option("maxFilesPerTrigger", 1)  \
    .json('data/streaming')

## Add records to Senzing and to Spark dataframe

We'll use the `get_affected_entities` function from the `spark_quickstart` tutorial to track what entities have been changed or created in the Senzing repository:

In [None]:
def get_affected_entities(info_string):
    # helper function to extract the entity id
    info = json.loads(info_string)
    return [entity['ENTITY_ID'] for entity in info['AFFECTED_ENTITIES']]

In [None]:
affected_entities = set()

And we'll use the code from the `spark_quickstart` tutorial to create a function that will send a streaming batch to the Senzing engine:

In [None]:
def process_streaming_batch(batch_df, batch_id):
    
    if batch_df.count() == 0:
        return

    print(f"Processing batch {batch_id} with {batch_df.count()} records")
    
    for row in batch_df.rdd.toLocalIterator():
        record = {k: v for k, v in row.asDict().items() if v is not None}
        
        info = sz_engine.add_record(
            record['DATA_SOURCE'],
            record['RECORD_ID'], 
            record,
            SzEngineFlags.SZ_WITH_INFO,
        )
        
        affected_entities.update(get_affected_entities(info))
        print(f"Added record {record['RECORD_ID']}")

Then, we'll stream the data from the Spark dataframe to Senzing using a Spark stream writer:

In [None]:
streaming_query = streaming_df \
    .writeStream \
    .foreachBatch(process_streaming_batch) \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .start()

We can view the `affected_entities` set to confirm that entities have been created:

In [None]:
affected_entities

## Process REDO records

As in the `spark_quickstart` tutorial, we'll run the Senzing [redo process](https://senzing.zendesk.com/hc/en-us/articles/360007475133-Processing-REDO) to clean up the entities in the Senzing repository, updating the `affected_entities` set as we go.

In [None]:
while True:
    redo_record = sz_engine.get_redo_record()
    if not redo_record:
        break
    info = sz_engine.process_redo_record(redo_record, flags=SzEngineFlags.SZ_WITH_INFO)
    affected_entities.update(get_affected_entities(info))
    print(info)

## Show dashboard?

In [None]:
# check things were added to Senzing!

search_query = {
    "name_full": "robert smith",
    "date_of_birth": "11/12/1978",
}
search_result = sz_engine.search_by_attributes(json.dumps(search_query))
print(json.dumps(json.loads(search_result), indent=2))

In [None]:
spark.stop()