In [None]:
from pyspark.sql import SparkSession

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = '3.5.5'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0',
    'org.mongodb.spark:mongo-spark-connector_2.12:10.4.1'
]
spark = SparkSession.builder \
    .master("local") \
    .appName("realtime-rag-ingestion-pipeline") \
    .config("spark.jars.packages", ",".join(packages)) \
    .getOrCreate()

spark

In [None]:
from pyspark.sql.functions import col, concat, lit, from_json
from pyspark.sql.types import ArrayType, FloatType, StringType, StructType, StructField, IntegerType, LongType
from pyspark.sql.functions import from_json, schema_of_json

kafka_topic_name = "product_events"
kafka_bootstrap_servers = "0.0.0.0:29092"

# Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic_name,
    "enable.auto.commit": "true",
    "auto.offset.reset": "latest",
    "max.poll.records": "1000"
}

# Read from Kafka with JSON deserializer
kafkaDf = spark.read.format("kafka") \
    .options(**kafka_options) \
    .load()


schema_product = StructType([
    StructField("store_id", IntegerType()),
    StructField("product_id", IntegerType()),
    StructField("count", IntegerType()),
    StructField("price", FloatType()),
    StructField("size", StringType()),
    StructField("ageGroup", StringType()),
    StructField("gender", StringType()),
    StructField("season", StringType()),
    StructField("fashionType", StringType()),
    StructField("brandName", StringType()),
    StructField("baseColor", StringType()),
    StructField("articleType", StringType())
])

# Deserialize the value column (assuming it contains JSON data)
jsonDf = kafkaDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# jsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")
jsonDf_new = jsonDf.withColumn("value", from_json(col("value"), schema_product))
jsonDf_new.printSchema()
jsonDf_new.show(truncate=False)

In [None]:
# Select individual fields from the JSON
queryableDf = jsonDf_new.select(
    col("key"),
    col("value.store_id"),
    col("value.product_id"),
    col("value.count"),
    col("value.price"),
    col("value.size"),
    col("value.ageGroup"),
    col("value.gender"),
    col("value.season"),
    col("value.fashionType"),
    col("value.brandName"),
    col("value.baseColor"),
    col("value.articleType")
)

# Show the schema and data
queryableDf.printSchema()
queryableDf.show(truncate=False)

In [None]:
json_data = queryableDf.toJSON().first()
print(json_data)

In [None]:
jsonDf_transformed = (kafkaDf.selectExpr("CAST(value AS STRING)")
 .select(from_json(col("value"), schema_product).alias("data")) \
    .select("data.*"))

jsonDf_transformed.printSchema()
jsonDf_transformed.show(truncate=False)


In [None]:
from pyspark.sql.functions import initcap, concat_ws

df = jsonDf_transformed.withColumn("content", initcap(concat_ws(' ',
                                                col("size"),
                                                col("ageGroup"),
                                                col("gender"),
                                                col("season"),
                                                col("fashionType"),
                                                col("brandName"),
                                                col("baseColor"),
                                                col("articleType"),
                                                concat_ws('', lit(', price: '), col("price").cast("string")),
                                                concat_ws('', lit(', store number: '), col("store_id").cast("string")),
                                                concat_ws('', lit(', product id: '), col("product_id").cast("string"))
                                                )))

In [None]:
df.printSchema()
df.show(truncate=False)

### Embedding Model
With RAG, Embeddings are dense vector representations of data (such as text, images, etc.) that capture the semantic meaning of the data. This process allows the system to find and retrieve information based on similarity, even if the search terms aren’t exact matches.

Adding extra data, like keywords, tags, or content summaries, can improve these embeddings. Combining this additional context with the main text increases the chances of retrieving relevant information when users ask questions.

#For example: 
If a user searches for “pain relief”, if someone searches for “pain relief,” an embedding that includes related keywords—even if the original text doesn’t mention them—can help them find the correct information. This method ensures that the embeddings reflect a broader context, improving the RAG system's ability to provide accurate and relevant answers.


***Using OpenAI API to generate embeddings***
Embedding API: https://platform.openai.com/docs/guides/embeddings

In [None]:
!pip install --upgrade openai

In [None]:
from pyspark.sql.functions import udf
import openai
import os

print(openai.__version__)
# Set up OpenAI API Key (replace with your actual key)
openai.api_key = os.getenv("OPENAI_API_KEY") 

In [None]:
def generate_embedding(text: str) -> list:
    # Generate a 1536-dimensional vector using OpenAI
    response = openai.embeddings.create(input=text, model="text-embedding-ada-002")
    query_vector = response.data[0].embedding  # Extract the 1536-d vector
    return query_vector

# Create a UDF from the embedding function
get_embeddings_udf = udf(generate_embedding, ArrayType(FloatType()))

# Apply the UDF to create df_with_embeddings
df_with_embeddings = df.withColumn("embedding", get_embeddings_udf(col("content")))
df_with_embeddings.printSchema()
df_with_embeddings.show(truncate=False)

### Write to MongoDB

https://www.mongodb.com/docs/spark-connector/current/streaming-mode/streaming-write-config/


In [None]:
# Save the data to vector database - MongoDB atlas
# MongoDB configuration
mongo_db_user = "dbuser_genai"
mongo_db_password = "R6WHZ7MB2KNLCIPC"
mongo_db_host = "cluster-genai.wcknb.mongodb.net"
mongo_db_options = "?retryWrites=true&w=majority&appName=Cluster-GenAI"
mongo_db_uri = f"mongodb+srv://{mongo_db_user}:{mongo_db_password}@{mongo_db_host}/{mongo_db_options}"
mongo_db_name = "retail"
mongo_collection_name = "product"

# batch mode
# Set up write connection
# conf.set("spark.mongodb.write.connection.uri", mongo_db_uri)
# conf.set("spark.mongodb.write.database", mongo_db_name)
# conf.set("spark.mongodb.write.collection", mongo_collection_name)
# # If you need to update instead of inserting :
# conf.set("spark.mongodb.write.operationType", "update")

df_with_embeddings.write \
    .format("mongodb") \
    .mode("append") \
    .option("spark.mongodb.write.connection.uri", mongo_db_uri) \
    .option("spark.mongodb.write.database", mongo_db_name) \
    .option("spark.mongodb.write.collection", mongo_collection_name) \
    .save()


