# Introduction
This notebook shows how to use the confluent-weaviate connector with an [embedded Weaviate](https://weaviate.io/developers/weaviate/installation/embedded) instance.

## Data Stream Source
The demonstration data stream is generated using the [Datagen Source Connector](https://docs.confluent.io/cloud/current/connectors/cc-datagen-source.html). Specifically, the `clickstream_users_schema.avro` template is employed for this purpose. You can find the template [here](https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/resources/clickstream_users_schema.avro).

## Supported Message Properties
Currently, the connector supports messages with the following properties:
- **Key**: Must be a string.
- **Value**: Serialized using Avro.
- **Timestamps**: Unix timestamps in milliseconds.

Ensure your data stream adheres to these properties for successful integration.

# Imports

In [1]:
import json
import os
import time

import weaviate
from pyspark.sql import SparkSession

# Setup

Setup weaviate (embedded):

In [2]:
client = weaviate.Client(embedded_options=weaviate.embedded.EmbeddedOptions())

client.schema.delete_all()
weaviate_url = client._connection.url
weaviate_host = weaviate_url.split("://")[1]

Started /home/vscode/.cache/weaviate-embedded: process ID 96296


{"action":"startup","default_vectorizer_module":"none","level":"info","msg":"the default vectorizer modules is set to \"none\", as a result all new schema classes without an explicit vectorizer setting, will use this vectorizer","time":"2023-09-08T11:34:55Z"}
{"action":"startup","auto_schema_enabled":true,"level":"info","msg":"auto schema enabled setting is set to \"true\"","time":"2023-09-08T11:34:55Z"}
{"action":"hnsw_vector_cache_prefill","count":3000,"index_id":"clickstream_cvECIvOhVPZT","level":"info","limit":1000000000000,"msg":"prefilled vector cache","time":"2023-09-08T11:34:55Z","took":83750}
{"action":"grpc_startup","level":"info","msg":"grpc server listening at [::]:50051","time":"2023-09-08T11:34:55Z"}
{"action":"restapi_management","level":"info","msg":"Serving weaviate at http://127.0.0.1:6666","time":"2023-09-08T11:34:55Z"}


Setup the spark session:

In [3]:
jar_packages = [
    "org.apache.spark:spark-avro_2.12:3.4.1",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
]

CONFLUENT_WEAVIATE_JAR = "../target/scala-2.12/confluent-connector_2.12-3.4.0_0.0.1.jar"

spark = (
    SparkSession.builder.appName("demo-confluent-weaviate-integration")
    .config("spark.jars.packages", ",".join(jar_packages))
    .config("spark.jars", CONFLUENT_WEAVIATE_JAR)
    .config("spark.streaming.stopGracefullyOnShutdown", "true")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/home/vscode/.local/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-62054124-421a-4697-8dbd-f511ae6b720e;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.4.1 in central
	found org.tukaani#xz;1.9 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.find

Grab the creds:

In [4]:
confluentClusterName = os.environ.get("CONFLUENT_CLUSTER_NAME")
confluentBootstrapServers = os.environ.get("CONFLUENT_BOOTSTRAP_SERVERS")
confluentTopicName = os.environ.get("CONFLUENT_TOPIC_NAME")
schemaRegistryUrl = os.environ.get("CONFLUENT_SCHEMA_REGISTRY_URL")
confluentApiKey = os.environ.get("CONFLUENT_API_KEY")
confluentSecret = os.environ.get("CONFLUENT_SECRET")
confluentRegistryApiKey = os.environ.get("CONFLUENT_REGISTRY_API_KEY")
confluentRegistrySecret = os.environ.get("CONFLUENT_REGISTRY_SECRET")

# Demo

Create the schema in Weaviate:

In [5]:
with open("../src/it/resources/schema.json", "r") as f:
    weaviate_schema = json.load(f)

client.schema.create_class(weaviate_schema)

{"action":"hnsw_vector_cache_prefill","count":1000,"index_id":"clickstream_luwwRQEGH2R3","level":"info","limit":1000000000000,"msg":"prefilled vector cache","time":"2023-09-08T11:34:59Z","took":42750}


Create a Spark Structured Streaming `DataFrame` to read streaming data from a Confluent Kafka topic:

In [6]:
clickstreamDF = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", confluentBootstrapServers)
    .option("subscribe", confluentTopicName)
    .option("startingOffsets", "latest")
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
            confluentApiKey, confluentSecret
        ),
    )
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("failOnDataLoss", "false")
    .option("name", "clickStreamReadFromConfluent")
    .load()
)

Define a function to run on each microbatch:

In [7]:
total_rows_processed = 0


def f(df, batch_id):
    global total_rows_processed
    row_count = df.count()
    total_rows_processed += row_count

    print(f"Number of rows in the batch with batch id {batch_id}: {row_count}")
    df.write.format("io.weaviate.confluent.Weaviate").option("batchsize", 200).option(
        "scheme", "http"
    ).option("host", weaviate_host).option(
        "className", weaviate_schema["class"]
    ).option(
        "schemaRegistryUrl", schemaRegistryUrl
    ).option(
        "schemaRegistryApiKey", confluentRegistryApiKey
    ).option(
        "schemaRegistryApiSecret", confluentRegistrySecret
    ).mode(
        "append"
    ).save()

Start writinng the stream:

In [8]:
query = (
    clickstreamDF.writeStream.foreachBatch(f)
    .queryName("write_stream_to_weaviate")
    .start()
)

23/09/08 11:35:00 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b54d9113-db3a-4b85-b14b-19bdb66cf4d1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/09/08 11:35:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


23/09/08 11:35:01 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Number of rows in the batch with batch id 0: 2


                                                                                

Number of rows in the batch with batch id 1: 10


                                                                                

Number of rows in the batch with batch id 2: 11


                                                                                

Number of rows in the batch with batch id 3: 9


                                                                                

Number of rows in the batch with batch id 4: 1


                                                                                

Stop writing after 15 seconds:

In [9]:
# this does not gracefully shutdown the stream!
# easiest way to gracefully shutdown is to pause the source connector
time.sleep(15)
query.stop()

Compare the number of rows processed and the number of objects in Weaviate:

In [10]:
results = client.query.aggregate(weaviate_schema["class"]).with_meta_count().do()
total_objects_in_weaviate = results["data"]["Aggregate"][weaviate_schema["class"]][0][
    "meta"
]["count"]

assert (
    total_rows_processed == total_objects_in_weaviate
), f"Total rows processed {total_rows_processed} does not match total objects in weaviate {total_objects_in_weaviate}"

Look at some of the objects in Weaviate:

In [11]:
client.data_object.get(class_name=weaviate_schema["class"], limit=3)

{'deprecations': [],
 'objects': [{'class': 'Clickstream',
   'creationTimeUnix': 1694172920668,
   'id': '01eafe7b-a2aa-4610-acb4-a0ff499b92ee',
   'lastUpdateTimeUnix': 1694172920668,
   'properties': {'_kafka_key': '200377',
    '_kafka_offset': 33454,
    '_kafka_partition': 0,
    '_kafka_schemaId': 100002,
    '_kafka_timestamp': '2023-09-08T11:35:15.241Z',
    '_kafka_timestampType': 0,
    '_kafka_topic': 'clickstreams-users',
    'city': 'Frankfurt',
    'first_name': 'Curran',
    'last_name': 'Tomini',
    'level': 'Gold',
    'registered_at': 1483222736170,
    'user_id': 200377,
    'username': 'Roberto_123'},
   'vectorWeights': None},
  {'class': 'Clickstream',
   'creationTimeUnix': 1694172923212,
   'id': '07497374-2bda-4ada-9cf3-d87107c3b2d2',
   'lastUpdateTimeUnix': 1694172923212,
   'properties': {'_kafka_key': '200389',
    '_kafka_offset': 33317,
    '_kafka_partition': 1,
    '_kafka_schemaId': 100002,
    '_kafka_timestamp': '2023-09-08T11:35:20.426Z',
    '_ka