In [3]:

import pandas as pd
import random
import time
import uuid
from kafka import KafkaProducer
import numpy as np
import json

# =========================================================
# CONFIGURATION

INPUT_CSV = r"C:\Users\mikha\OneDrive\Documents\Demos\AI Observability\OpenAI_MockResponses.csv" 
NUM_MODELS = [
    "gpt-4.1",
    "gpt-4o-mini",
    "gpt-3.5-turbo",
    "llama2-13b",
    "mistral-7b-instruct"
]

RESPONSE_QUALITY = ["Good", "Average", "Poor"]
RESPONSE_TYPE = ["Informative", "Creative", "Actionable"]
COMPLEXITY_LEVELS = ["low","medium","high"]

LATENCY_RANGE = (0.1, 1.5)
INPUT_TOKENS_RANGE = (10, 80)
OUTPUT_TOKENS_RANGE = (30, 200)
READABILITY_RANGE = (30, 90)
COMPLEXITY_RANGE = (1, 10)
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# Embedding centers for fake attack vs normal
NORMAL_CENTER = [0.5, 0.5]
ATTACK_CENTER = [0.9, 0.9]

# Kafka producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
TOPIC_KAFKA = "llm_events"

# =========================================================
# HELPER FUNCTIONS

def mock_metrics(model_output):
    input_tokens = random.randint(*INPUT_TOKENS_RANGE)
    output_tokens = random.randint(*OUTPUT_TOKENS_RANGE)
    total_tokens = input_tokens + output_tokens
    latency = round(random.uniform(*LATENCY_RANGE), 3)
    
    num_sentences = len(model_output.split("."))
    num_words = len(model_output.split())
    
    return {
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "total_tokens": total_tokens,
        "latency_sec": latency,
        "estimated_cost_usd": round((input_tokens/1000)*0.0002 + (output_tokens/1000)*0.0008,6),
        "num_sentences": num_sentences,
        "num_words": num_words,
        "num_special_tokens": random.randint(0,5),
        "prompt_complexity_score": random.randint(*COMPLEXITY_RANGE),
        "output_readability_score": random.randint(*READABILITY_RANGE),
        "hallucination_score": round(random.uniform(0,1),3),
        "repetition_score": round(random.uniform(0,1),3),
        "sentiment_score": round(random.uniform(-1,1),3),
        "response_variation_score": round(random.uniform(0,1),3),
        "toxicity_score": round(random.uniform(0,1),3),
        "model_version": f"v{random.randint(1,3)}.{random.randint(0,9)}.{random.randint(0,9)}",
        "num_edits_required": random.randint(0,5)
    }

def generate_embeddings(event_type):
    if event_type == "jailbreak_attempt":
        x, y = np.random.normal(ATTACK_CENTER, 0.05, 2)
    else:
        x, y = np.random.normal(NORMAL_CENTER, 0.2, 2)
    return float(np.clip(x,0,1)), float(np.clip(y,0,1))

# =========================================================
# LOAD INPUT CSV

df_input = pd.read_csv(INPUT_CSV)
print(f"Loaded {len(df_input)} rows from {INPUT_CSV}")

# =========================================================
# STREAM EVENTS TO KAFKA

def stream_to_kafka():
    while True:
        for idx, row_input in df_input.iterrows():
            for model in NUM_MODELS:
                # Deterministic per row + model
                seed_val = RANDOM_SEED + idx + hash(model) % 10000
                random.seed(seed_val)
                
                # Randomly assign event_type
                event_type = "jailbreak_attempt" if random.random() > 0.95 else "llm_generation"
                
                # Generate embeddings
                embedding_x, embedding_y = generate_embeddings(event_type)
                
                # Generate metrics
                metrics = mock_metrics(row_input["mock_response"])
                
                record = row_input.to_dict()  # preserve all input CSV columns
                record.update({
                    "trace_id": str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{idx}-{model}")),
                    "model": model,
                    "event_type": event_type,
                    "embedding_x": embedding_x,
                    "embedding_y": embedding_y,
                    "generation_timestamp": time.time(),
                })
                record.update(metrics)
                
                # Send to Kafka
                producer.send(TOPIC_KAFKA, record)
                
                # Optional: print to console
                print(record)
                
                # Control speed
                time.sleep(0.001)

# Start streaming
stream_to_kafka()


Loaded 110 rows from C:\Users\mikha\OneDrive\Documents\Demos\AI Observability\OpenAI_MockResponses.csv
{'topic': 'cloud computing', 'topic_category': 'Technology', 'user_prompt': 'How can I optimize my cloud infrastructure costs?', 'mock_response': 'You can optimize cloud costs by reviewing resource utilization, using reserved instances, and automating scaling based on demand.', 'trace_id': 'f62e4d22-916f-5155-85b3-b4d96e924c5f', 'model': 'gpt-4.1', 'event_type': 'llm_generation', 'embedding_x': 0.7185417595557928, 'embedding_y': 0.3420528401595899, 'generation_timestamp': 1764960366.9161189, 'input_tokens': 35, 'output_tokens': 44, 'total_tokens': 79, 'latency_sec': 0.901, 'estimated_cost_usd': 4.2e-05, 'num_sentences': 2, 'num_words': 18, 'num_special_tokens': 4, 'prompt_complexity_score': 5, 'output_readability_score': 84, 'hallucination_score': 0.566, 'repetition_score': 0.829, 'sentiment_score': -0.486, 'response_variation_score': 0.68, 'toxicity_score': 0.968, 'model_version': 'v

KeyboardInterrupt: 