# **QAI Agents Framework demo**

In [1]:
# Colab-ready demo of QAI_Agents_Framework (dummy components)
# Paste this whole cell into Google Colab and run.
# It simulates: Planner -> AutoCircuit -> Simulation Sandbox -> QPU Broker -> QCA -> Aggregator
# Artifacts are written to /content/qai_demo_artifacts/

from dataclasses import dataclass, field, asdict
from typing import List, Dict, Any
import random, time, json, uuid, statistics, os

# -----------------------------
# Data models
# -----------------------------
@dataclass
class JobDescriptor:
    job_id: str
    user_id: str
    problem_type: str            # e.g., "optimization", "simulation", "classification"
    objective: str               # human readable objective
    maturity: str                # Concept | Prototype | Validated | Certified
    constraints: Dict[str, Any] = field(default_factory=dict)
    params: Dict[str, Any] = field(default_factory=dict)

@dataclass
class SubTask:
    task_id: str
    task_type: str               # classical | quantum_subroutine | qca_task | simulation_task | auto_circuit | hybrid_loop
    payload: Dict[str, Any] = field(default_factory=dict)
    assigned_agent: str = None
    result: Any = None

# -----------------------------
# Policy Agent (Maturity enforcement)
# -----------------------------
class PolicyAgent:
    def __init__(self):
        # simple maturity policy map to allowed backends and budgets
        self.policy = {
            "Concept": {"allow_qpu": False, "allow_simulator": True, "qpu_budget_shots": 0},
            "Prototype": {"allow_qpu": True, "allow_simulator": True, "qpu_budget_shots": 2000},
            "Validated": {"allow_qpu": True, "allow_simulator": True, "qpu_budget_shots": 20000},
            "Certified": {"allow_qpu": True, "allow_simulator": True, "qpu_budget_shots": 100000},
        }

    def enforce(self, job: JobDescriptor):
        rule = self.policy.get(job.maturity, self.policy["Concept"])
        return rule

# -----------------------------
# Planner (simple heuristics)
# -----------------------------
class Planner:
    def __init__(self, policy_agent: PolicyAgent):
        self.policy_agent = policy_agent

    def decompose(self, job: JobDescriptor) -> List[SubTask]:
        tasks = []
        if job.problem_type == "optimization":
            tasks.append(SubTask(task_id=str(uuid.uuid4()), task_type="auto_circuit", payload={"spec":job.objective}))
            tasks.append(SubTask(task_id=str(uuid.uuid4()), task_type="hybrid_loop", payload={"params":job.params}))
        elif job.problem_type == "simulation":
            tasks.append(SubTask(task_id=str(uuid.uuid4()), task_type="simulation_task", payload={"scenarios":10}))
        else:
            tasks.append(SubTask(task_id=str(uuid.uuid4()), task_type="classical", payload={"model":"baseline"}))
        return tasks

# -----------------------------
# AutoCircuit Designer (HVA / QAOA template generator - dummy)
# -----------------------------
class AutoCircuitDesigner:
    def __init__(self):
        pass

    def propose_ansatz(self, spec: str, backend_profile: Dict[str,Any]) -> Dict[str,Any]:
        # returns a dummy ansatz description (later replaced with real quantum circuit objects)
        depth = backend_profile.get("max_depth", 3)
        ansatz = {"name":"HVA-like", "depth":depth, "params":[random.random() for _ in range(depth*2)], "notes":"dummy ansatz"}
        return ansatz

# -----------------------------
# Simulation Sandbox (batched rollouts)
# -----------------------------
class SimulationSandbox:
    def __init__(self):
        pass

    def run_rollouts(self, ansatz: Dict[str,Any], scenarios:int=10) -> Dict[str,Any]:
        # Simulate performance metric distribution (dummy)
        samples = [max(0, 1.0 - 0.1*ansatz["depth"] + random.gauss(0, 0.05)) for _ in range(scenarios)]
        return {"samples":samples, "mean":statistics.mean(samples), "std":statistics.pstdev(samples)}

# -----------------------------
# QPU Broker (adapter-based, dummy submit)
# -----------------------------
class QPUBroker:
    def __init__(self, adapters:Dict[str,Dict[str,Any]]):
        # adapters: mapping name -> capability profile
        self.adapters = adapters

    def select_backend(self, job: JobDescriptor, policy_rule:Dict[str,Any]) -> str:
        # simplistic selection: prefer 'internal_qai' if budget allows and maturity high enough
        if policy_rule["allow_qpu"] and job.maturity in ("Prototype", "Validated", "Certified"):
            if "internal_qai" in self.adapters:
                return "internal_qai"
            return next(iter(self.adapters.keys()))
        else:
            return "simulator"

    def submit(self, backend_name:str, ansatz:Dict[str,Any], shots:int=1024) -> Dict[str,Any]:
        # dummy submission that returns simulated measurement statistics
        if backend_name == "simulator":
            samples = [max(0, 1.0 - 0.08*ansatz["depth"] + random.gauss(0, 0.03)) for _ in range(min(100, shots))]
        else:
            quality_boost = 0.02 if backend_name=="internal_qai" else 0.0
            actual_shots = min(shots, self.adapters.get(backend_name, {}).get("max_shots", 1000))
            samples = [max(0, 1.0 - 0.08*ansatz["depth"] + quality_boost + random.gauss(0, 0.025)) for _ in range(actual_shots)]
        return {"backend":backend_name, "shots":len(samples), "samples":samples, "mean":statistics.mean(samples)}

# -----------------------------
# Aggregator and Result Composer
# -----------------------------
class Aggregator:
    def __init__(self, store_path="/content/qai_demo_artifacts"):
        self.store_path = store_path
        os.makedirs(self.store_path, exist_ok=True)

    def compose(self, job: JobDescriptor, subtasks:List[SubTask]) -> Dict[str,Any]:
        # collect results, compute provenance, and persist a JSON artifact
        record = {
            "job": asdict(job),
            "subtasks": [{ "task_id":t.task_id, "type":t.task_type, "assigned_agent":t.assigned_agent, "result_summary": self._summarize(t.result)} for t in subtasks],
            "provenance": {"artifact_id": str(uuid.uuid4()), "timestamp": time.time()}
        }
        fname = os.path.join(self.store_path, f"{job.job_id}.json")
        with open(fname, "w") as f:
            json.dump(record, f, indent=2)
        return {"artifact_path": fname, "summary": record}

    def _summarize(self, result):
        if result is None: return None
        if isinstance(result, dict):
            return {k: (result[k] if k in ("mean","shots","backend") else "...") for k in result.keys()}
        return str(type(result))

# -----------------------------
# Simple QCA Agent (agent with internal quantum call)
# -----------------------------
class QCAAgent:
    def __init__(self, name:str, qpu_broker:QPUBroker):
        self.name = name
        self.qpu_broker = qpu_broker

    def act(self, observation:Dict[str,Any], policy_circuit:Dict[str,Any], policy_rule:Dict[str,Any]):
        # For demo: use policy circuit to request a cheap QPU evaluation and make a decision
        backend = self.qpu_broker.select_backend(JobDescriptor(job_id=str(uuid.uuid4()), user_id="qca", problem_type="internal", objective="", maturity=observation.get("maturity","Concept")), policy_rule)
        qres = self.qpu_broker.submit(backend, policy_circuit, shots=policy_rule.get("qpu_budget_shots", 100))
        # decision: if mean > threshold, accept else reject
        decision = "accept" if qres["mean"] > 0.9 else "reject"
        return {"agent":self.name, "backend_used":backend, "decision":decision, "qpu_result_mean":qres["mean"]}

# -----------------------------
# Demo flow that ties everything
# -----------------------------
def run_demo():
    print("=== QAI Agents Framework Demo (Dummy) ===")
    # Create job: an optimization problem
    job = JobDescriptor(job_id="job_"+str(uuid.uuid4())[:8], user_id="vijay", problem_type="optimization", objective="Maximize portfolio score", maturity="Prototype", params={"shots":1024})

    print(f"\nJob submitted: {job.job_id} (maturity={job.maturity})")
    policy = PolicyAgent()
    policy_rule = policy.enforce(job)
    print("Policy rule applied:", policy_rule)

    planner = Planner(policy)
    subtasks = planner.decompose(job)
    print(f"Planner decomposed job into {len(subtasks)} subtasks. Types:", [t.task_type for t in subtasks])

    # Setup components (dummy adapter profiles)
    adapters = {
        "simulator": {"max_shots": 2000, "max_depth": 5},
        "internal_qai": {"max_shots": 1500, "max_depth": 6, "note":"on-prem QAI Processor (dummy)"}
    }
    qpu_broker = QPUBroker(adapters=adapters)
    autocirc = AutoCircuitDesigner()
    sandbox = SimulationSandbox()
    aggregator = Aggregator()

    # Execute subtasks
    for t in subtasks:
        print(f"\n--- Executing subtask {t.task_id} ({t.task_type}) ---")
        if t.task_type == "auto_circuit":
            ansatz = autocirc.propose_ansatz(t.payload.get("spec",""), backend_profile=adapters["internal_qai"])
            print("AutoCircuit proposed ansatz:", ansatz)
            t.assigned_agent = "auto_circuit_service"
            t.result = {"ansatz": ansatz}
        elif t.task_type == "hybrid_loop":
            # get ansatz from prior step (simple chaining in this demo)
            ansatz = {"name":"HVA-like","depth":adapters["internal_qai"]["max_depth"], "params":[0.1,0.2,0.3]}
            print("Hybrid loop using ansatz:", ansatz)
            rollout = sandbox.run_rollouts(ansatz, scenarios=20)
            print("Simulation Sandbox rollouts -> mean:", round(rollout["mean"],4), "std:", round(rollout["std"],4))
            backend = qpu_broker.select_backend(job, policy_rule)
            print("Selected backend:", backend)
            qres = qpu_broker.submit(backend, ansatz, shots=job.params.get("shots",1024))
            print("QPU result mean:", round(qres["mean"],4), "shots:", qres["shots"], "backend:", qres["backend"])
            t.assigned_agent = f"qpu_broker:{backend}"
            t.result = qres
        else:
            t.assigned_agent = "classical_worker"
            t.result = {"status":"ok", "value": random.random()}

    # Demonstrate a QCA agent making an internal decision using a policy circuit
    qca = QCAAgent("qca_1", qpu_broker)
    policy_circuit = {"name":"policy_circuit_v1","depth":2,"params":[0.2,0.1]}
    qca_decision = qca.act({"maturity":job.maturity}, policy_circuit, policy_rule)
    print("\nQCA Agent decision:", qca_decision)

    # Aggregate results
    artifact = aggregator.compose(job, subtasks)
    print("\nAggregated artifact saved to:", artifact["artifact_path"])
    print("Artifact summary keys:", list(artifact["summary"].keys()))

    print("\n=== Demo complete ===")
    return artifact["artifact_path"]

# Run demo in Colab
if __name__ == "__main__":
    artifact_path = run_demo()
    print("\nYou can inspect the saved artifact JSON file at:", artifact_path)


=== QAI Agents Framework Demo (Dummy) ===

Job submitted: job_110ea1ee (maturity=Prototype)
Policy rule applied: {'allow_qpu': True, 'allow_simulator': True, 'qpu_budget_shots': 2000}
Planner decomposed job into 2 subtasks. Types: ['auto_circuit', 'hybrid_loop']

--- Executing subtask b190e746-d679-41cc-bd14-a7f385d285fb (auto_circuit) ---
AutoCircuit proposed ansatz: {'name': 'HVA-like', 'depth': 6, 'params': [0.8577001422812935, 0.2634036639192151, 0.4923346081731056, 0.5582831206277122, 0.5769622338566144, 0.6429995396040105, 0.5170103632590275, 0.17082877144723907, 0.20016165019493104, 0.6753068492881912, 0.8774592754078487, 0.8953840625086684], 'notes': 'dummy ansatz'}

--- Executing subtask 783b8629-02f8-4ccb-b118-51e30b2e0f0b (hybrid_loop) ---
Hybrid loop using ansatz: {'name': 'HVA-like', 'depth': 6, 'params': [0.1, 0.2, 0.3]}
Simulation Sandbox rollouts -> mean: 0.4086 std: 0.0422
Selected backend: internal_qai
QPU result mean: 0.5391 shots: 1024 backend: internal_qai

QCA Age

**Concept and execution by Bhadale IT, code generated by ChatGPT**