# Introduction

This notebook consumes the kafka queue containing new questions into Weaviate.


# Prerequisites


1. A postgreSQL instance
2. A Kafka cluster
3. CDC set up between PostgreSQL and a kafka topic to track additions of new questions

# Steps

Stream from Confluent into Spark:

In [0]:
# connect to the topic

confluentBootstrapServers = ""
confluentTopicName = "" # kafka topic where CDC on questions are published
confluentApiKey = ""
confluentSecret = ""

df =  (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", confluentBootstrapServers)
    .option("subscribe", confluentTopicName)
    .option("startingOffsets", "latest")  # Chage to "earliest" if want to read from offset 0
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
            confluentApiKey, confluentSecret
        ),
    )
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("failOnDataLoss", "false")
    # .option("name", "productsFromConfluent")
    .load()
)

In [0]:
# define message schema
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, NullType, StructType, IntegerType, FloatType, DoubleType

# Define the nested structure for both 'before' and 'after' since they have the same structure
nestedStructure = StructType([
    StructField("question_id", StringType(), True),
    StructField("question_text", StringType(), True),
    StructField("image_1", StringType(), True),
    StructField("image_2", StringType(), True),
    StructField("image_3", StringType(), True),
    StructField("image_4", StringType(), True),
    StructField("answer", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("question_type_id", StringType(), True),
    StructField("question_type", StringType(), True),
    StructField("question", StringType(), True),
    StructField("file_path", StringType(), True),
    StructField("difficulty_rating", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("instruction", StringType(), True),
])

# Define the schema with 'before' and 'after' having the same structure and being nullable
schema = StructType([
    StructField("before", nestedStructure, True),
    StructField("after", nestedStructure, True),
    StructField("op", StringType()),
])

# Convert "value" column to string
newQuestionsDF = df.withColumn("value", col("value").cast("string"))

# Apply from_json function with the updated schema
newQuestionsDF = newQuestionsDF.withColumn("value", from_json(col("value"), schema))

# Filter by operation type "c" before selecting columns
newQuestionsDF = newQuestionsDF.filter(col("value.op") == "c")

# Filter by value.before when it is a null struct
newQuestionsDF = newQuestionsDF.filter("value.before is null")

# Selecting each key in the "after" field and making them separate columns
newQuestionsDF = newQuestionsDF.select(
    col("value.after.question_type").alias("question_type"),
    col("value.after.question_text").alias("question"),
    col("value.after.answer").alias("answer"),
    col("value.after.file_path").alias("file_path"),
    col("value.after.difficulty_rating").alias("difficulty_rating"),
    col("value.after.category").alias("category"),
    col("value.after.instruction").alias("instruction")
)
display(newQuestionsDF)

question_type,question,answer,file_path,difficulty_rating,category,instruction
standard question,I would like to buy some white roses.,Vorrei comprare delle rose bianche.,,2,Buy things in a store,Remember to use 'comprare' for 'to buy' and 'delle' as the partitive article for 'some'.
standard question,Could you wrap the white roses as a gift?,Potrebbe incartare le rose bianche come un regalo?,,3,Buy things in a store,Use 'Potrebbe' for formal 'Could you' and 'incartare' for 'to wrap'.
standard question,I want to buy a blue shirt.,Voglio comprare una camicia blu.,,2,Buy things in a store,Remember to use 'voglio' for 'I want' and 'comprare' for 'to buy'. 'Una camicia' is 'a shirt' and 'blu' is 'blue'.
standard question,Can I try on this shirt?,Posso provare questa camicia?,,2,Buy things in a store,Use 'posso' for 'can I' and 'provare' for 'try on'. 'Questa camicia' means 'this shirt'.
standard question,I want to order a pizza with pepperoni.,Vorrei ordinare una pizza con il pepperoni.,,2,Order in a restaurant,Remember to use 'vorrei' for 'I want' and 'ordinare' for 'to order'. The word for 'with' is 'con'.
standard question,Can I have a pizza with mushrooms?,Posso avere una pizza con funghi?,,2,Order in a restaurant,Use 'posso avere' for 'can I have' and remember that 'mushrooms' is 'funghi' in Italian.
standard question,I would like to order a pizza to go.,Vorrei ordinare una pizza da asporto.,,3,Order in a restaurant,Use 'vorrei ordinare' for 'I would like to order' and 'da asporto' for 'to go'.
standard question,I want to order a pizza with mushrooms.,Vorrei ordinare una pizza con funghi.,,2,Order in a restaurant,Use 'vorrei ordinare' for 'I want to order' and 'con funghi' for 'with mushrooms'.
standard question,Can I have a pizza with extra cheese?,Posso avere una pizza con extra formaggio?,,3,Order in a restaurant,Use 'posso avere' for 'Can I have' and 'con extra formaggio' for 'with extra cheese'.
standard question,I would like to customize my pizza toppings.,Vorrei personalizzare gli ingredienti della mia pizza.,,4,Order in a restaurant,Use 'vorrei personalizzare' for 'I would like to customize' and 'ingredienti della mia pizza' for 'my pizza toppings'.


Stream from Spark to Weaviate:

In [0]:
import weaviate
from weaviate.classes.init import Auth

weaviate_url = ""
weaviate_api_key = ""
openai_api_key = ""

client = weaviate.connect_to_weaviate_cloud(weaviate_url, Auth.api_key(weaviate_api_key), 
    headers={
        'X-OpenAI-Api-key': openai_api_key,
        } 
)

collection = client.collections.get("ConversationalStatements")

collection.aggregate.over_all(total_count=True)



AggregateReturn(properties={}, total_count=88)

In [0]:
(
    newQuestionsDF
    .writeStream
    .format("io.weaviate.spark.Weaviate")
    .option("scheme", "https")
    .option("host", weaviate_url.replace("https://", ""))
    .option("apiKey", weaviate_api_key)
    .option("className", "ConversationalStatements")
    .option("batchSize", 100)
    .queryName("stream_new_questions_to_weaviate")
    .option("checkpointLocation", "dbfs:/tmp/product_updates_checkpoint")
    .option("header:X-Openai-Api-Key", "")
    .start()
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f24e4ca52a0>