Skip to content

rubelw/OSSS

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

560 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

!!! warning "Project status: active development" OSSS is still being developed. Community input and assistance are very welcome! - Share feedback and ideas via issues or discussions. - Open PRs for bug fixes and small improvements. - As of 7/11/2026 - working to on step machine and logic for workflows, gates, boundaries, etc

Open Source School Software (OSSS)

License Python 3.11+ FastAPI Keycloak SQLAlchemy PostgreSQL Docs

Open Source School Software (K-12 SIS) — FastAPI + Keycloak + SQLAlchemy + Datalake + Ollama + MetaGPT + A2A; governance + student info + accounting + activites + transportation.

A community-driven, modular suite of applications for K-12 districts.

📚 Live documentation: https://rubelw.github.io/OSSS/

This repository is a polyglot monorepo with a Next.js frontend (src/osss-web) and a FastAPI backend (src/OSSS). Documentation is built with MkDocs Material, with API references generated from source:

  • Frontend (TypeScript) → TypeDoc → Markdown (docs/api/web/*)
  • Backend (Python) → mkdocstrings renders code objects from src/OSSS
  • REST (OpenAPI) → exported JSON rendered with ReDoc
  • AI (Ollama + MetaGPT + A2A)

Network diagram

OSSS network architecture (draw.io)


OSSS AI Query Flow

High-level Flow

sequenceDiagram
  autonumber
  participant C as Client UI
  participant API as /api/query route
  participant O as LangGraphOrchestrator
  participant G as GraphFactory
  participant LG as LangGraph
  participant A as Agents

  C->>API: POST /api/query
  API->>API: create AgentContext + workflow_id + correlation_id
  API->>O: run(context)
  O->>G: create graph + compile
  G-->>O: compiled graph
  O->>LG: invoke(graph, initial_state)

  loop each node
    LG->>A: execute node wrapper
    A->>A: run_with_retry + update context
    A-->>LG: updated state
  end

  LG-->>O: final state
  O-->>API: result (outputs + meta)
  API-->>C: HTTP response

---
# Screen Shots

The static site is output to `./documentation/`.

![Example Web View](docs/img/web_view.png)


Consul example:

![Example Consul](docs/img/consul_example.png)

Vault example:

![Example Vault](docs/img/vault_example.png)

Keycloak example:

![Example Keycloak](docs/img/keycloak_example.png)

Kibana example:
![Example Kibana](docs/img/kibana_example.png)

Zulip (Chat) example:
![Example Zulip](docs/img/zulip_example.png)

Taiga (Task/Project Management) example:
![Example Taiga](docs/img/taiga_example.png)


---

Rasa Mentor example:
![Example Rasa Mentor](docs/img/rasa_mentor_example.png)

AI Tutor Example:
![Math Tutor](docs/img/math_tutor.png)

AI Guardrail Example:
![Mentor Guardrail](docs/img/mentor_guardrail.png)

A2A Dashboard Example:
![A2A Dashboard](docs/img/a2a_dashboard.png)

General AI Chat Example:

![General AI Chat](docs/img/general_ai_chat.png)

AI Slot-Agent Example:

![AI Slot-Agent](docs/img/ai_slot_agent.png)

AI Query Agent Example:

![AI Query Agent](docs/img/ai_query_agent.png)


## Why This Is Important

When Artificial General Intelligence (AGI) starts to emerge—potentially by 2030—districts will need to adjust governance, safety filters, and curricula rapidly. That kind of agility is exactly what community-maintained, open-source software delivers—without waiting on a vendor roadmap. Today, many incumbent systems are tied to legacy architectures and slow release cycles. While AI is already reshaping mainstream apps, most school platforms haven’t meaningfully evolved to leverage it.

We are building the next generation of school software as an open, participatory project. Administrators, staff, students, and families will be able to propose enhancements, contribute code, and ship improvements together—so the platform keeps pace with classroom needs and policy changes.

---
# Development Environment Configuration
```commandline
docker-compose version 1.29.2, build 5becea4c
docker-py version: 5.0.0
CPython version: 3.9.0
OpenSSL version: OpenSSL 1.1.1h  22 Sep 2020

---
# Minimum System Requirements

- **OS:** Linux or macOS (Windows via Docker Desktop + WSL2)
- **CPU:** 4 cores (minimum), 8 cores recommended (Elasticsearch + Keycloak + dev servers)
- **RAM:** 12 GB usable for Docker (minimum), 16 GB+ recommended
- **Disk free:** ~50–60 GB (images + ES/Kibana data + two Postgres volumes)
- **Docker:** Engine 24+ with Compose v2; cgroup v2 enabled on modern Linux
- **Ports:**  
  - 8081 (API)  
  - 3000 (web)  
  - 8080 (Keycloak)  
  - 5433/5434 (Postgres)  
  - 5601 (Kibana)  
  - 9200 (Elasticsearch)  
  - 8200 (Vault)  
  - 8500 (Consul)
  - 8444 (Trino)
  - 8088 (Superset)
  - 8083 (Airflow)
  - 8585 (OpenMetadata)
  - 8082 (OpenMetadata Ingestion)
  - 5005 (Rasa Mentor)
  - 8086 (A2A Server)
  - 9000 (A2A Agent)
  - 8111 (Zulip Chat)
  - 15672 (Zulip RabbitMQ Management UI)
  - 5672 (Zulip RabbitMQ AMQP)
  - 6383 (Zulip Redis)
  - 5438 (Zulip DB)
  - 8120 (Taiga-Gateway)
  - 8103 (Taiga-Protected)
  - 8188 (Taiga-Events)
  - 8100 (Taiga-Back)
  - 8161 (Taiga-RabbitMQ Management UI) 
  - 8162 (Taiga-RabbitMQ AMQP)
  - 5439 (Taiga-DB)
  
  
---

## 📖 Documentation Quick Start

> Run all commands from the **repo root**. Create and activate a Python venv first.  
> Live docs are published at **https://rubelw.github.io/OSSS/**.

### Quick start
```bash
# clone
git clone https://github.com/rubelw/OSSS.git
cd OSSS

# (optional) copy environment examples
cp .env.example .env || true

# create a venv in a folder named .venv (inside your project)
python3 -m venv .venv
source .venv/bin/activate

# build + run local stack (database, API, web)
./start_osss.sh

# to run the cli
osss <TAB>

# Keycloak http://localhost:8080 with username 'admin' and password 'admin'
# Keycloak login to OSSS realm: https://keycloak.local:8443/realms/OSSS/account
# FastApi  http://localhost:8081/docs# username 'activities_director@osss.local' and password 'password'
# Web: http://localhost:3000 username 'activities_director@osss.local' and password 'password'
# Vault: http://localhost:8200 username 'chief_technology_officer@osss.local and password 'password'
# Consul: http://localhost:8500
# Kibana: http://localhost:5601
# ElasticSearch: http://localhost:9200
# Airflow: http://localhost:8083
# Openmetadata: http://localhost:8585
# Superset: http://localhost:8088
Loading

Build the static site to ./documentation/:

# Optional: regenerate TypeDoc first if code changed
npx typedoc --options typedoc.frontend.json
mkdocs build --clean

📁 Docs Layout (MkDocs)

docs/
├─ index.md                      # Landing page
├─ frontend/
│  └─ overview.md                # Next.js app overview
├─ backend/
│  └─ overview.md                # FastAPI app overview
├─ api/
│  ├─ web/                       # (generated) TypeDoc markdown for Next.js
│  └─ openapi/                   # (generated) openapi.json for ReDoc
└─ api/python/
   ├─ index.md                   # (generated) landing for Python API
   └─ OSSS.md                    # (generated) mkdocstrings page for OSSS package

The pages under docs/api/python/ and docs/api/openapi/ are created during the MkDocs build by small helper scripts (see below). TypeDoc output is generated before the build runs.


Demo

OSSS demo

🧠 LangGraph Flow

OSSS uses LangGraph to orchestrate multiple AI “agents” as a DAG (directed acyclic graph). This lets OSSS separate concerns (guardrails, intent, retrieval, formatting) while keeping the system observable and debuggable end-to-end.

At a glance, a request typically flows through:

  • FastAPI route (request entry)
  • Orchestration API (workflow boundary)
  • Orchestrator (planning + execution)
  • Guard (always first, may short-circuit)
  • LangGraph DAG execution (parallel where possible)
  • Formatting + response (UI-friendly output + trace metadata)

High-level diagram (request → response)

flowchart TD
  U[User / UI] --> R[FastAPI: /api/query]
  R --> OA[Orchestration API]
  OA --> ORCH[LangGraphOrchestrator.run]

  ORCH --> CORR[Correlation + workflow context\n(correlation_id, workflow_id)]
  CORR --> GUARD[Guard agent (always first)\nallow | block | needs_clarification]

  GUARD -->|halt| HALT[Return safe response\n(no graph execution)]
  GUARD -->|allow| PLAN[Planner: build_execution_plan\nselect agents + routing metadata]

  PLAN --> META[Preflight state injection\n_query_profile, _routing, effective_queries]
  META --> RAG{RAG prefetch enabled?}
  RAG -->|yes| PREFETCH[rag_prefetch_jsonl()\nrag_context + hits]
  RAG -->|no| GRAPH

  PREFETCH --> GRAPH[GraphFactory.compile(spec)\nCompiled LangGraph StateGraph]
  META --> GRAPH

  GRAPH --> EXEC[compiled_graph.ainvoke(initial_state)]
  EXEC --> FINAL[Final state\nsuccessful_agents / failed_agents / errors]
  FINAL --> BRIDGE[State bridge\nState → AgentContext]
  BRIDGE --> RESP[HTTP response payload\n(answer + sources + debug)]
  RESP --> U


### DAG diagram (agents + dependencies)

This shows the *agent-level* flow LangGraph executes once the orchestrator has selected an execution plan.
Nodes can run in parallel when their dependencies are satisfied.

```mermaid
flowchart LR
  %% --- Entry ---
  Q[Query / Initial State] --> G[guard]

  %% --- Guard outcomes ---
  G -->|halt| STOP[halt + safe_response]
  G -->|allow| P[planner-selected pipeline]

  %% --- Canonical "full" pipeline ---
  P --> R[refiner]

  R --> C[critic]
  R --> H[historian]

  %% Parallel fan-in
  C --> S[synthesis]
  H --> S

  %% Terminal
  S --> F[format_response]
  F --> OUT[final response\n(answer + sources + debug)]

  %% Optional / conditional formatting nodes
  F -. optional .-> FB[format_block]
  F -. optional .-> FC[format_requires_confirmation]


## ⚙️ MkDocs Configuration

`mkdocs.yml` at the repo root glues everything together. Key bits:

```yaml
site_name: OSSS Developer Documentation
site_url: https://rubelw.github.io/OSSS/
docs_dir: docs
site_dir: documentation

nav:
  - Overview: index.md
  - Frontend (Next.js):
      - Overview: frontend/overview.md
      - API (TypeScript): api/web/modules.md   # <-- match what TypeDoc emits (modules.md or index.md)
  - Backend (Python):
      - Overview: backend/overview.md
      - API (Python): api/python/OSSS.md
      - OpenAPI: backend/openapi.md

plugins:
  - search
  - mkdocstrings:
      handlers:
        python:
          paths: ["src"]           # import OSSS from ./src/OSSS
          options:
            show_source: false
            docstring_style: google
            members_order: source
  - gen-files:
      scripts:
        - tooling/generate_docs.py
        - tooling/export_openapi.py

# Optional: make pages wider site-wide, or include a page-class-based override
extra_css:
  - overrides/wide.css

# Load ReDoc globally so the OpenAPI page can initialize it
extra_javascript:
  - https://cdn.redoc.ly/redoc/latest/bundles/redoc.standalone.js
Loading

Helper scripts (run during mkdocs serve/build)

  • tooling/generate_docs.py — generates docs/api/python/OSSS.md that contains the ::: OSSS directive; mkdocstrings renders it into API docs.

    # tooling/generate_docs.py
    from pathlib import Path
    import mkdocs_gen_files as gen
    
    with gen.open("api/python/index.md", "w") as f:
        f.write("# Python API\n\n- [OSSS package](./OSSS.md)\n")
    
    with gen.open("api/python/OSSS.md", "w") as f:
        f.write("# `OSSS` package\n\n")
        f.write("::: OSSS\n")
        f.write("    handler: python\n")
        f.write("    options:\n")
        f.write("      show_root_heading: true\n")
        f.write("      show_source: false\n")
        f.write("      docstring_style: google\n")
        f.write("      members_order: source\n")
        f.write("      show_signature: true\n")
  • tooling/export_openapi.py — writes docs/api/openapi/openapi.json from the FastAPI app.

    # tooling/export_openapi.py
    import json
    import mkdocs_gen_files as gen
    from OSSS.main import app              # adjust if your FastAPI app lives elsewhere
    
    with gen.open("api/openapi/openapi.json", "w") as f:
        json.dump(app.openapi(), f, indent=2)

ReDoc page (docs/backend/openapi.md)

---
title: OSSS API (OpenAPI)
hide:
  - toc
class: full-width
---

> If the panel below stays blank, verify the JSON exists:
> **[OpenAPI JSON](../../api/openapi/openapi.json)**

<div id="redoc-container"></div>

<script>
(function () {
  function init() {
    var el = document.getElementById('redoc-container');
    if (window.Redoc && el) {
      // NOTE: two ".." segments from /backend/openapi → /api/openapi/openapi.json
      window.Redoc.init('../../api/openapi/openapi.json', {}, el);
    } else {
      setTimeout(init, 50);
    }
  }
  init();
})();
</script>

<noscript>
JavaScript is required to render the ReDoc UI. You can still download the
<a href="../../api/openapi/openapi.json">OpenAPI JSON</a>.
</noscript>

Optional: widen pages

docs/overrides/wide.css (site-wide) or docs/overrides/redoc-wide.css (only OpenAPI page):

/* Site-wide wider grid */
.md-grid { max-width: 1440px; }

/* Only pages with class: full-width */
.md-content__inner.full-width { max-width: none; padding-left: 0; padding-right: 0; }
#redoc-container { margin: 0; padding: 0; }

Reference in mkdocs.yml via extra_css.


🔐 Environment Notes

  • Python imports for docs: run mkdocs with PYTHONPATH=src so mkdocstrings and the OpenAPI export can import OSSS from src/OSSS.
  • Frontend generator: TypeDoc runs with your Next.js tsconfig. If the app declares "packageManager" in src/osss-web/package.json, use npm (not pnpm) for consistency.

🧪 CI Example (GitHub Actions)

.github/workflows/docs.yml

name: Build Docs
on:
  push:
    branches: [ main ]
  workflow_dispatch:

jobs:
  docs:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-node@v4
        with:
          node-version: 20

      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install deps
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements-docs.txt
          npm ci || npm i

      - name: Generate TypeScript API (TypeDoc → Markdown)
        run: npx typedoc --options typedoc.frontend.json

      - name: Build MkDocs site → ./documentation
        env:
          PYTHONPATH: src
        run: mkdocs build --clean

      - name: Upload artifact
        uses: actions/upload-artifact@v4
        with:
          name: osss-docs
          path: documentation

🧪 Creating New AI Agent

  1. Mental model: how agents hook in

At a high level:

  • RouterAgent decides an intent for the turn.
  • It calls AgentDispatcher, which does get_agent(intent_label).
  • get_agent comes from your registry (OSSS.ai.agents.registry).
  • Agents implement the Agent protocol and return AgentResult.

So to add a new agent you basically:

  • Define a new intent_name and agent class.
  • Register it with the registry (@register_agent).
  • Optionally add heuristics / aliases so RouterAgent actually routes to it.

(Optionally) give it a dedicated RAG index or rely on main.

  1. Pick an intent and file location
  • Decide:

Canonical intent name used inside OSSS, e.g.:

intent_name = "lunch_menu"
  • Module path for the agent.

For example, for a district-facing agent:

src/OSSS/ai/agents/district/lunch_menu_agent.py

Or if it’s more general:

src/OSSS/ai/agents/lunch_menu_agent.py

You’re already using OSSS/ai/agents/student/ for student stuff, so this keeps things tidy per domain.

  1. Implement the agent class

Use your AgentContext / AgentResult patterns and the registry decorator.

Example: OSSS/ai/agents/district/lunch_menu_agent.py

from __future__ import annotations

import logging
from typing import Any, Dict, List

from OSSS.ai.agents import register_agent
from OSSS.ai.agents.base import AgentContext, AgentResult

logger = logging.getLogger("OSSS.ai.agents.lunch_menu")

@register_agent("lunch_menu")
class LunchMenuAgent:
    """
    Example agent that answers questions about the school lunch menu.

    This is intentionally simple: it reads from a fixed source or RAG context
    and returns a formatted answer + debug metadata.
    """

    intent_name = "lunch_menu"

    async def run(self, ctx: AgentContext) -> AgentResult:
        logger.info(
            "[LunchMenuAgent.run] query=%r session_id=%s",
            ctx.query,
            ctx.session_id,
        )

        # TODO: replace this with your real data source / RAG lookup
        answer_text = (
            "Here’s the lunch menu for today at Dallas Center-Grimes "
            "Community School District:\n\n"
            "- Main: Cheese pizza\n"
            "- Side: Garden salad\n"
            "- Fruit: Apple slices\n"
            "- Milk: 1% or chocolate\n"
        )

        # Minimal debug chunk so the “Sources:” UI has something to show
        debug_neighbors: List[Dict[str, Any]] = [
            {
                "score": 1.0,
                "filename": "lunch_menu_stub",
                "chunk_index": None,
                "text_preview": answer_text[:800],
                "image_paths": None,
                "page_index": None,
                "page_chunk_index": None,
                "source": "lunch_menu_agent",
                "pdf_index_path": None,
            }
        ]

        data: Dict[str, Any] = {
            "agent_debug_information": {
                "phase": "final",
                "query": ctx.query,
                "session_mode": None,
                "registration_session_id": None,
                "extra": {
                    "notes": "This is a stub lunch menu agent.",
                },
            }
        }

        return AgentResult(
            answer_text=answer_text,
            intent=self.intent_name,
            index="main",  # or a dedicated index like "lunch"
            agent_id=ctx.agent_id or "lunch-menu-agent",
            agent_name=ctx.agent_name or "Lunch Menu",
            extra_chunks=debug_neighbors,
            status="ok",
            agent_session_id=ctx.session_id,
            data=data,
        )

Key points:

@register_agent("lunch_menu") wires it into your registry.

It returns an AgentResult with:

  • answer_text for the user
  • extra_chunks for Sources
    • data.agent_debug_information for your debug UI
    • You already have very nice patterns from RegisterNewStudentAgent—you can copy+adapt that structure.
  1. Make sure the agent module is auto-imported

You already have a dynamic loader like:

# src/OSSS/ai/agents/__init__.py (or similar)
import pkgutil
import importlib
import OSSS.ai.agents as agents_pkg

def load_all_agents() -> None:
    package_path = agents_pkg.__path__
    package_name = agents_pkg.__name__ + "."
    for finder, name, ispkg in pkgutil.walk_packages(package_path, package_name):
        # Avoid base/registry so we don’t register those as agents
        if name.endswith(".base") or name.endswith(".registry"):
            continue
        importlib.import_module(name)

Make sure this function is called at startup somewhere (for example in your FastAPI app init, or in router_agent module import path).

As long as your new agent file lives under OSSS.ai.agents, that import will run its @register_agent decorator.

  1. Add an intent alias (optional but nice)

In router_agent.py you have:

INTENT_ALIASES: dict[str, str] = {
    "enrollment": "register_new_student",
    "new_student_registration": "register_new_student",
    # ...
}

If your classifier returns labels like "lunch" or "cafeteria", map them:

INTENT_ALIASES.update(
    {
        "lunch": "lunch_menu",
        "cafeteria_menu": "lunch_menu",
        "today_lunch": "lunch_menu",
    }
)

That way, regardless of the raw label coming back from the classifier, the effective intent will be "lunch_menu" and your agent gets called.

  1. (Optional) Add routing heuristics in IntentResolver

If you want the router to force your agent when certain patterns appear, add a small heuristic in IntentResolver.resolve (in router_agent.py), just like you did for registration.

Something like:

class IntentResolver:
    async def resolve(...):
        ql = (query or "").lower()
        manual_label = rag.intent

        forced_intent: str | None = None

        # Existing registration heuristics...
        # ...

        # New lunch-menu heuristic
        if any(kw in ql for kw in ["lunch menu", "what's for lunch", "school lunch"]):
            forced_intent = "lunch_menu"
            logger.info(
                "RouterAgent: forcing intent to %s based on lunch keywords; query=%r",
                forced_intent,
                query[:200],
            )

        # ...rest of the resolve flow follows unchanged

Now even if the classifier is uncertain, the heuristic can route directly to your new agent.

  1. (Optional) Teach the intent classifier about it

If your classify_intent function is using a model / rule set you control, you may want to:

  • Add "lunch_menu" as a new Intent enum value (if you’re using an enum).
  • Feed it some few-shot examples so it learns to output lunch_menu or lunch (which you then alias).

But the OSSS router already has:

  • manual override (rag.intent),
  • forced intent heuristics,
  • aliasing via INTENT_ALIASES,

so you can ship a new agent even before classifier training is perfect.

  1. (Optional) Give your agent its own index / data

If the agent needs its own RAG index (like your planned tutor / agent indexes), you can:

  1. Add a new index kind in additional_index.py (INDEX_KINDS / load path).
  2. Set index="tutor" or whatever in your agent’s AgentResult.
  3. Or have the agent call top_k itself based on a dedicated index.

For simple agents (like the registration one), you don’t need an index at all.

  1. Frontend considerations (ChatClient)

If you want to force a specific agent from the UI (e.g., “Mentor mode” button), just:

Set intent in the RAGRequest payload from the client:

// In ChatClient.tsx when building `payload`
const payload = {
  ...,
  intent: "lunch_menu",  // manual override
};

Your IntentResolver already respects manual_label = rag.intent, so that will short-circuit a lot of routing ambiguity.

TL;DR – Add-a-new-agent recipe

  1. Create a module under OSSS/ai/agents/... with a new @register_agent("your_intent") class.
  2. Implement run(self, ctx) returning AgentResult (use RegisterNewStudentAgent as a pattern).
  3. Ensure auto-loading via load_all_agents() at startup.
  4. Add aliases in INTENT_ALIASES for classifier labels → your canonical intent.
  5. (Optional) Add heuristics in IntentResolver and train the intent classifier.
  6. (Optional) Wire a custom RAG index or external data source as needed.

🧪 Adding A New AI Query Agent Handler (API backed data views)

The query_data agent is a meta-agent that fans out to multiple small “dataset handlers” under OSSS.ai.agents.query_data. Each handler knows how to:

  • Call one OSSS API endpoint (students, scorecards, live scoring, materials, …)
  • Format the rows as a markdown table / CSV
  • Register itself in a small registry so QueryDataAgent can route to it

This lets you add hundreds of data-backed queries without creating a brand-new agent each time.

1. Files involved (where to change things)

To add a new dataset handler, you will usually touch:

  1. New handler module

    src/OSSS/ai/agents/query_data/handlers/<your_handler_name>_handler.py
    

QueryData registry (mode detection + registration)

src/OSSS/ai/agents/query_data/query_data_registry.py

QueryData agent (force-import handler once)

src/OSSS/ai/agents/query_data/query_data_agent.py

(Optional, but recommended) Intent classifier / routing

Classifier “action” / heuristics so the LLM can ask for your handler:

src/OSSS/ai/intent_classifier.py

High-level intent → query_data aliasing (already set up for materials/live scoring/etc):

src/OSSS/ai/agent_routing_config.py

Most of the time you’ll only need (1)–(3), plus a tiny tweak in (4) to teach the classifier the new action name.

  1. Implement a new handler Example: materials handler that calls GET /api/materials.

Create:

src/OSSS/ai/agents/query_data/handlers/materials_handler.py

with something like:

from __future__ import annotations

import csv
import io
import logging
from typing import Any, Dict, List

import httpx

from OSSS.ai.agents.base import AgentContext
from OSSS.ai.agents.query_data.query_data_registry import (
    FetchResult,
    QueryHandler,
    register_handler,
)

logger = logging.getLogger("OSSS.ai.agents.query_data.materials")

API_BASE = "http://host.containers.internal:8081"


class MaterialsHandler:
    """
    QueryData handler for the /api/materials endpoint.
    """

    mode = "materials"
    keywords = ["materials", "materials list", "supply list"]
    source_label = "your DCG OSSS materials service"

    async def fetch(self, ctx: AgentContext, skip: int, limit: int) -> FetchResult:
        url = f"{API_BASE}/api/materials"
        params = {"skip": skip, "limit": limit}

        async with httpx.AsyncClient(timeout=10.0) as client:
            resp = await client.get(url, params=params)
            resp.raise_for_status()
            rows: List[Dict[str, Any]] = resp.json()

        return {
            "rows": rows,
            "materials_count": len(rows),
            "materials_url": url,
        }

    def to_markdown(self, rows: List[Dict[str, Any]]) -> str:
        if not rows:
            return "No materials were found in the system."

        header = (
            "| # | Type | Title | URL | Drive File ID | Announcement ID | "
            "Coursework ID | Created At | Updated At |\n"
            "|---|------|-------|-----|--------------|-----------------|"
            "--------------|------------|------------|\n"
        )

        lines: List[str] = []
        for idx, r in enumerate(rows, start=1):
            lines.append(
                f"| {idx} | "
                f"{r.get('type', '')} | "
                f"{r.get('title', '')} | "
                f"{r.get('url', '')} | "
                f"{r.get('drive_file_id', '')} | "
                f"{r.get('announcement_id', '')} | "
                f"{r.get('coursework_id', '')} | "
                f"{r.get('created_at', '')} | "
                f"{r.get('updated_at', '')} |"
            )

        return header + "\n".join(lines)

    def to_csv(self, rows: List[Dict[str, Any]]) -> str:
        if not rows:
            return ""

        output = io.StringIO()
        fieldnames = [
            "type",
            "title",
            "url",
            "drive_file_id",
            "announcement_id",
            "coursework_id",
            "created_at",
            "updated_at",
            "id",
        ]
        writer = csv.DictWriter(output, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)
        return output.getvalue()

Register the handler at import time

register_handler(MaterialsHandler()) Key points:

Implements the QueryHandler protocol from query_data_registry.py.

Sets mode = "materials" — this becomes the key the registry uses.

Exposes keywords to help the registry match simple queries via text.

fetch returns a FetchResult with at least a rows list.

to_markdown and to_csv handle formatting in one place.

  1. Make sure the handler is imported once QueryDataAgent force-imports each handler module so the register_handler(...) call actually runs.

Update:

src/OSSS/ai/agents/query_data/query_data_agent.py

to include your new handler import (one line):

from OSSS.ai.agents.query_data.handlers import live_scorings_handler  # noqa: F401
from OSSS.ai.agents.query_data.handlers import students_handler       # noqa: F401
from OSSS.ai.agents.query_data.handlers import scorecards_handler     # noqa: F401
from OSSS.ai.agents.query_data.handlers import materials_handler      # noqa: F401  # ⬅️ new

No other changes are required in QueryDataAgent if you used the registry pattern correctly.

  1. Wire intent → mode in the QueryData registry QueryDataAgent does not see the raw classifier output directly. Instead:

The router passes ctx.metadata["intent_raw_model_output"] into detect_mode_from_context.

query_data_registry.py parses that blob and chooses a mode.

Open:

src/OSSS/ai/agents/query_data/query_data_registry.py

and make sure _mode_from_intent_raw_model_output knows about your action, for example:

def _mode_from_intent_raw_model_output(raw: str | None) -> str | None:
    # ...existing code...
    llm = obj.get("llm") or {}
    action = (llm.get("action") or "").lower()

    # Map specific actions to modes
    if action == "show_materials_list":
        return "materials"
    # existing mappings...

You can also lean on keywords and direct heuristics already present in detect_mode_from_context:

materials_handler.keywords is indexed in _KEYWORD_INDEX.

detect_mode_from_context checks both classifier metadata and plain-text query before falling back to the default students mode.

  1. (Optional) Teach the intent classifier / router about your action If you want the LLM classifier to explicitly emit an action like "show_materials_list", update:
src/OSSS/ai/intent_classifier.py

Ensure "show_materials_list" is a plausible action the prompt talks about (and/or add a heuristic rule so “materials list” → that action).

The classifier’s JSON gets stuffed into intent_raw_model_output, which query_data_registry already parses.

At the router level, all of these map to the same top-level intent "query_data" via aliases in:

src/OSSS/ai/agent_routing_config.py

For example:

INTENT_ALIASES: list[IntentAlias] = [
    # ...
    IntentAlias("show_materials_list", "query_data"),
]

After that:

Router sends the turn to QueryDataAgent (intent=query_data).

QueryDataAgent calls detect_mode_from_context(...).

The registry picks mode="materials" and hands off to MaterialsHandler.

  1. TL;DR – Add-a-new QueryData handler Create src/OSSS/ai/agents/query_data/handlers/_handler.py implementing QueryHandler and calling your OSSS API.

Register it with register_handler(...) at module import time.

Add one import line in query_data_agent.py so the handler module is imported on startup.

Map classifier output / action → mode inside query_data_registry.py (and optionally in intent_classifier.py + agent_routing_config.py).

Done: query_data can now answer “show me ” using the new handler, and the debug JSON will include mode="<your_mode>" and the raw rows/csv.


OSSS Query Execution Flow — query consents Walkthrough

This document provides a complete execution breakdown for the OSSS orchestration pipeline when processing the input:

User input: query consents
Workflow ID: 03878494-c75e-4091-b486-2c36de1b5cb7
Correlation ID: 720a15cb-115c-4f9f-9c60-1140b12277fb
Thread ID: osss_20251228_032800_502ad6f1


0. Background Noise: Health Checks

Health checks (GET /healthz) are logged repeatedly by uvicorn.access.
These are liveness probes and not part of the actual query workflow.


1. API Ingress & Thread Setup

  1. User sends POST /api/query with:

    • parallel execution
    • RAG enabled
    • markdown export requested
    • top_k = 6
    • timeout = 180s
  2. The system generates/assigns:

    • thread ID osss_20251228_032800_502ad6f1
    • correlation ID forwarded from request headers
  3. LangGraphOrchestrationAPI instance is retrieved and used for workflow execution.


2. ClassifierAgent Execution

Classifier is invoked before orchestration begins:

Field Value
intent action (0.946)
domain data_systems (0.924)
primary_topic consents (confidence low)

Classifier results are persisted into the shared execution_state.


3. Orchestrator Initialization

LangGraphOrchestrator:

  • Logs workflow start
  • Loads existing execution state
  • Sets up correlation span
  • Begins pattern/agent resolution

4. DBQueryRouter Selection

The router evaluates multiple heuristics and returns route_to_data_query = true:

Signal Evaluation
Text prefix matches query ✔️
consents table detected ✔️
Action hint present ✔️
Pattern lock requested ✔️

Result:

route: data_query
route_reason: db_query_heuristic
force_data_query: true

5. DecisionNode Failure (Validation Issue)

The Pydantic validation in NodeExecutionContext fails due to structural mismatches:

  • missing expected field task_type
  • additional unexpected fields such as intent_confidence, model_version

The orchestrator falls back to existing agents, bypassing dynamic pattern selection.


6. RAG Prefetch

Even though rag_mode = soft_disable, RAG still:

  1. Embeds "query consents" using nomic-embed-text
  2. Performs vector search in index: main
  3. Stores 6 retrieved chunks (~5KB context) in execution state

7. Graph Construction

GraphFactory normalizes agents and pattern based on router output:

  1. Initial agents normalize → ["refiner","data_query"]
  2. Standard pattern fallback → ["refiner","final"]
  3. Planning bridge re-routes to data_query
  4. Final effective agents → ["refiner","data_query"]

A cached compiled graph is used.

Execution order:

(refiner) → (data_query) → END

8. refiner Node

  • Uses llama3.3:latest
  • Output remains unchanged: "query consents"
  • Execution time: ~1.3s
  • Refiner confirms no need to modify the structured query form

9. data_query Node

Route Parsing

collection: consents path: /api/consents method: GET params: skip=0, limit=100

Query Execution

  • Backend call: GET http://localhost:8000/api/consents
  • Response: 5 rows
  • Enrichment: person name + compact column pruning

Final Columns

| consent_type | granted | effective_date | expires_on | created_at | updated_at | id | person_name |

Context Updates

  • Stores table in execution_state
  • Stores markdown rendering
  • Keys under: data_query:consents

10. Post-Execution Activities

Markdown Export

  • Performed by markdown_export_service
  • Includes topic analysis via llama3.3
  • Exported file:
    2025-12-28T03-28-09_query-consents_70b442.md
    

Persistence

  • Both workflow and markdown persistence skipped due to DB session disabled

11. Workflow Completion

Two log lines confirm workflow-level completion:

workflow.completed | 03878494 | ~1.5s
workflow.completed | 03878494 | ~1.5s

12. Analyzing audio

yt-dlp -f bestaudio -o "dcg_meeting.%(ext)s" "https://youtube.com/watch?v=DjzNdBXqSpU&t=111s"
cd projects
mkdir dcg_sb_audito
cd dcg_sb_audito
yt-dlp -f bestaudio -o "dcg_meeting.%(ext)s" "https://youtube.com/watch?v=3HXHI48vtI0&t=111s"
ffmpeg -i dcg_meeting.webm -ar 16000 -ac 1 dcg2.wav
pip install openai-whisper
whisper dcg.wav --model medium --language English --output_format all
/Applications/Python\ 3.13/Install\ Certificates.command
whisper dcg.wav --model medium --language English --output_format all
ls -latr

📜 License

This project is licensed under the Apache License 2.0.

Credits & Licenses

This project includes code derived from CogniVault
https://github.com/aucontraire/cognivault

Copyright (c) 2024 aucontraire

Licensed under the MIT License.
See the original license text below and in the LICENSES/cognivault-MIT.txt file.

About

Open Student Support System (K-12 SIS) — FastAPI + Keycloak + SQLAlchemy + Datalake + Ollama (AI); governance + student info.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Sponsor this project

Packages

 
 
 

Contributors