In [None]:
# !pip install FlagEmbedding

# !pip uninstall backports
# !pip install backports
# !pip install --force-reinstall -v "setuptools<70"

# !pip install pymilvus==2.4.4

# !pip install openai

import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
import requests
from functools import wraps
from pymilvus import connections, utility, db, MilvusClient, FieldSchema, CollectionSchema, Collection, DataType
from openai import OpenAI

In [None]:
# KEYS

hf_token = "hf_"
open_ai_key = "sk-"

In [None]:



# cache_dir = '/tmp/huggingface_cache'
# os.makedirs(cache_dir, exist_ok=True)
# os.chmod(cache_dir, 0o777)
# os.environ['TRANSFORMERS_CACHE'] = cache_dir

model_id = "sentence-transformers/all-MiniLM-L6-v2"

api_url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_id}"
headers = {"Authorization": f"Bearer {hf_token}"}


# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaRawCSVReader") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.pyspark.memory", "2g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .getOrCreate()

# Read raw data from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "csv_stream") \
    .option("startingOffsets", "earliest") \
    .load()


# Convert Kafka 'value' column to STRING
kafka_string_df = kafka_df.selectExpr("CAST(value AS STRING)")


semantic_schema = ("customer aged {Age} is a {Gender} with marital status {Marital_Status} is a {Occupation} with monthly income of {Monthly_Income} "
                   "has completed {Educational_Qualifications} with family size of {Family_size} belongs to latitude {latitude} and "
                   "longitude {longitude} with pincode {Pin_code} has given {Feedback} feedback")


def encode(final_sentence):
    response = requests.post(api_url, headers=headers, json={"inputs": final_sentence, "options":{"wait_for_model":True}})
    return response.json()


def process_row(row):
    csv = tuple(row.asDict().values())
    csv_values_temp = tuple(csv[0].split(','))
    
    csv_values = tuple(value.strip('"') for value in csv_values_temp)
    
    data = {
        "Age": csv_values[0],
        "Gender": csv_values[1],
        "Marital_Status": csv_values[2],
        "Occupation": csv_values[3],
        "Monthly_Income": csv_values[4],
        "Educational_Qualifications": csv_values[5],
        "Family_size": csv_values[6],
        "latitude": csv_values[7],
        "longitude": csv_values[8],
        "Pin_code": csv_values[9],
        "Feedback": csv_values[10]
    }

    
    final_sentence = semantic_schema.format(
        Age=data["Age"],
        Gender=data["Gender"],
        Marital_Status=data["Marital_Status"],
        Occupation=data["Occupation"],
        Monthly_Income=data["Monthly_Income"],
        Educational_Qualifications=data["Educational_Qualifications"],
        Family_size=data["Family_size"],
        latitude=data["latitude"],
        longitude=data["longitude"],
        Pin_code=data["Pin_code"],
        Feedback=data["Feedback"]
    )
    embedded_sentence = encode(final_sentence)

    store_embeddings(embedded_sentence, final_sentence)
    
query = kafka_string_df.writeStream.foreach(process_row).start()


In [None]:


DB_NAME = "csv_vec_db"
COLLECTION_NAME = "csv_vec_coll"
connections.connect(host="standalone", port=19530)

databases = db.list_database()

if DB_NAME not in databases:
    print("db created")
    database = db.create_database(DB_NAME)
    

client = MilvusClient(
    uri="http://standalone:19530",
    db_name=DB_NAME
)

collections = client.list_collections()
print("existing collec",collections)

if COLLECTION_NAME not in collections:
    id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, description="primary id")
    embedding_field = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384, description="vector")
    text_field = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=300, description="text")
    
    schema = CollectionSchema(fields=[id_field, embedding_field,text_field], auto_id=True, enable_dynamic_field=True, description="desc of a collection")

    client.create_collection(
        collection_name=COLLECTION_NAME,
        schema = schema,
        dimension=5
    )
    print("collection created")
    

In [None]:
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
    field_name="embedding",
    metric_type="COSINE",
    index_type="IVF_FLAT",
    index_name="vector_index",
    params={ "nlist": 128 }
)

client.create_index(
    collection_name=COLLECTION_NAME,
    index_params=index_params
)


In [None]:
client.load_collection(
    collection_name=COLLECTION_NAME,
 )


def store_embeddings(vec, text):
    client = MilvusClient(
    uri="http://standalone:19530",
    db_name=DB_NAME
    )

    print(client)
    data=[{"embedding": vec, "text": text}]
    print(data)
    
    res = client.insert(
        collection_name=COLLECTION_NAME,
        data=data
    )
    print(res)

  


In [None]:
def get_similar_docs(query_text):
    query_vector = encode(query_text)

    client = MilvusClient(
    uri="http://standalone:19530",
    db_name=DB_NAME
    )

    results = client.search(
        collection_name=COLLECTION_NAME, # Replace with the actual name of your collection
        # Replace with your query vector
        data=[query_vector],
        limit=130, # Max. number of search results to return
        output_fields=["text"]
        # search_params={"params": {"text"}} # Search parameters
    )

    result = results[0]

    similar_results = []

    for res in result:
        similar_results.append(res["entity"]["text"])
    return similar_results
    
    


In [None]:
client = OpenAI(
    # This is the default and can be omitted
    api_key = open_ai_key,
)

question = "what is the average age of married men"

similar_results = get_similar_docs(question)

context = f"""
given {similar_results}
"""

chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "system",
            "content": context,
        },
        # {
        #     "role": "system",
        #     "content": "you should respond with reasoning, and dont use context window information",
        # },        
        {
            "role":"user",
            "content":question
        }
            
    ],
    # model="gpt-3.5-turbo",
    model="gpt-4o",
)

print(chat_completion.choices[0].message.content)