Skip to content

feat: data agent connector for lumid.data#19

Merged
timzsu merged 11 commits into
mainfrom
zsu/agent-connector
May 7, 2026
Merged

feat: data agent connector for lumid.data#19
timzsu merged 11 commits into
mainfrom
zsu/agent-connector

Conversation

@timzsu
Copy link
Copy Markdown
Collaborator

@timzsu timzsu commented May 5, 2026

Purpose

This PR supports data access via our data plane (lumid.data), which supports natural language-based data retrieval via a data agent. A workflow node describes what it wants in natural language, optionally constrains the search to a schema scope, and gets back the materialized rows alongside the canonical SQL chain the agent emitted — so a downstream node can bind to either.

Changes

  • src/worker/connectors/agent_connector.py (new) + connectors/__init__.py — thin wrapper around lumid_data.sdk.Client.retrieve_to_file.
  • src/worker/executors/data_retrieval_executor.py — adds the data.type == "agent" branch and emits per-row items[] with table + access chain + run metadata; schema_scope is optional.
  • tests/worker/test_agent_connector.py (new) — connector tests with mocked SDK.
  • pyproject.toml, uv.lock, src/worker/requirements/requirements.txt — add lumid-data-sdk to the analytics extra; mypy follow_untyped_imports for lumid_data.sdk[.*].
  • scripts/dev/sync_requirements.py — accept the name @ git+url form in the package-name regex.
  • .github/workflows/security.yml — strip @ git+ deps before pip-audit --strict; rename the temp file to /tmp/requirements-worker-cpu-audit.txt; ignore two new vllm advisories (GHSA-x368-4g9h-fvv4, GHSA-83vm-p52w-f9pw) blocked by the same transformers 4.57 pin.
  • templates/data_retrieval_agent.yaml (new) — two-stage e2e: agent retrieval → Qwen 1.5B summary.
  • src/shared/utils/manifest.py (new) + deletions of src/{server,worker}/utils/manifest.py — single home for prepare_output_dir / sync_manifest. prepare_output_dir creates each directory at 0o0777, and sync_manifest chmods the manifest to 0o0666 after each write, so single-node deployments where the server (root) and worker (appuser) share the results volume can both overwrite the manifest from either UID.
  • src/worker/executors/utils/graph_templates.py — wrap the to_markdown sites in pd.option_context(max_columns=None, width=None, max_colwidth=None) so nested DataFrame cells render in full instead of pandas' default ...-clipped layout.

Test Plan

  • Live e2e on a GPU worker pointed at a running lumid.data:
flowmesh stack build
flowmesh stack up
flowmesh stack worker up gpu -t 1
flowmesh workflow submit templates/data_retrieval_agent.yaml
flowmesh result fetch <tsk_id>

Test Result

Output from the first node:

{
  "ok": true,
  "type": "agent",
  "items": [
    {
      "index": 0,
      "description": "Show me NVDA's last 10 quarters of fundamentals and key financial-health ratios.",
      "params": {
        "symbol": "NVDA"
      },
      "table": {
        "df": "***" # Serialized Pandas Dataframe
      },
      "rows": 10,
      "access_chain": [
        {
          "op": "sql",
          "query": "WITH *** SELECT ***", # The SQL query
          "bucket": null,
          "key": null,
          "rows_or_bytes": 10
        }
      ],
      "run_id": "run-***",
      "transcript_url": "http://127.0.0.1:9102/v1/admin/runs/run-***",
      "tokens_in": 20451,
      "tokens_out": 8861,
      "steps_taken": 8,
      "replay_latency_ms": 13,
      "materialized_uri": "s3://lumid-data/***/result.jsonl"
    }
  ],
  "count": 1,
  "_artifacts": {
    "base_dir": "/var/lib/flowmesh-results/tsk-***",
    "base_url": null
  }
}

Output of the second node:

{
  "ok": true,
  "model": "Qwen/Qwen2.5-1.5B-Instruct",
  "items": [
    {
      "output": [
        "The headline trend across the rows for the leading metric is a significant increase in revenue and earnings per share (EPS) from Q4 FY2025 to Q1 FY2025, followed by a decline in Q2 FY2025. This dataset enables analysts to assess NVDA's financial health and performance trends over the past 10 quarters, providing insights into its revenue growth, profitability, and market valuation."
      ]
    }
  ],
  "usage": {
    "prompt_tokens": 1518,
    "completion_tokens": 89,
    "total_tokens": 1607,
    "latency_sec": 0.30570387840270996,
    "num_requests": 1
  },
  "_artifacts": {
    "base_dir": "/var/lib/flowmesh-results/tsk-***",
    "base_url": null
  }
}

@timzsu timzsu force-pushed the zsu/agent-connector branch from 489b8ca to 6d8c5ba Compare May 6, 2026 10:06
timzsu and others added 3 commits May 6, 2026 10:21
Wire ``data.type == "agent"`` into ``DataRetrievalExecutor`` so a
worker task can describe what it wants in natural language plus a
schema scope and let lumid.data's ``/retrieve/v1`` plan + replay
the chain server-side. Each item carries the materialized DataFrame
and the typed access chain so a downstream consumer binds to either
via the existing ``path: items.X`` resolver.

Worker delivery wiring: ``analytics`` extra picks up the
``lumid-data-sdk`` git+ pin; ``sync_requirements.py`` regex extended
for the PEP 508 ``name @ git+url`` form; the security workflow
filters ``@ git+`` deps before ``pip-audit --strict`` since PyPI
doesn't carry git-source deps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Two-stage workflow: agent retrieval against lumid.data, then a
Qwen 1.5B summary node consuming the materialized table and
access chain. One ``flowmesh workflow submit`` exercises the
connector and downstream consumption end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/agent-connector branch from 6d8c5ba to 31ddd4a Compare May 6, 2026 10:25
timzsu and others added 2 commits May 6, 2026 10:48
Drop ``schema_scope`` from the executor's required-keys validation
and let the connector forward ``None`` to the SDK so a workflow can
omit the field when it wants lumid.data to default to all visible
schemas. The e2e template drops its explicit scope to exercise the
new path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Track ``a800051`` so the SDK pin reflects PR #8 (optional
``schema_scope``) on lumid.data main. Wire contract is unchanged
from the FlowMesh side — the connector already passes ``None``
through ``model_dump(exclude_none=True)``.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu requested a review from kaiitunnz May 6, 2026 11:13
@timzsu timzsu marked this pull request as ready for review May 6, 2026 11:13
@timzsu timzsu changed the title feat: agent connector for lumid.data NL-driven retrieval feat: data agent connector for lumid.data May 6, 2026
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments.

Comment thread .github/workflows/security.yml Outdated
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments.

Comment thread src/server/utils/manifest.py Outdated
Comment thread src/shared/utils/manifest.py Outdated
Comment thread src/worker/executors/data_retrieval_executor.py Outdated
Comment thread src/worker/connectors/agent_connector.py Outdated
@timzsu timzsu force-pushed the zsu/agent-connector branch 3 times, most recently from 6fd20d9 to 0bc2ca3 Compare May 7, 2026 02:50
timzsu and others added 4 commits May 7, 2026 03:22
… mismatch

Single-node deployments can share the results volume between the server
(root) and supervisor-spawned workers (appuser). Both call sync_manifest,
so the prior direct write_text raced to EACCES on the second writer when
the manifest was already owned by the first writer's UID.

prepare_output_dir now chmods each managed directory to 0o0777
(best-effort, tolerant of cross-UID ownership). sync_manifest writes the
manifest with write_text and then chmods it to 0o0666 so the next
sync_manifest call from a peer UID can overwrite the file directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…prompts

Aggregate templates that bind a per-row pd.DataFrame value into the prompt
rendered the cell via tabulate's to_markdown, which falls back to pandas'
default __str__ on each DataFrame entry. The default 80-col display width
clipped middle columns to '...', so the consumer LLM only saw the first
and last few columns of any wide retrieval result.

Wrap the to_markdown sites in pd.option_context with max_columns/width/
max_colwidth set to None so DataFrames render in full regardless of the
calling environment's pandas display defaults.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Four cleanups raised in code review:

- Move the duplicated ``prepare_output_dir`` / ``sync_manifest`` (and
  helpers) from ``src/server/utils/manifest.py`` and
  ``src/worker/utils/manifest.py`` into a single
  ``src/shared/utils/manifest.py`` exposing everything either side uses
  (including the worker-only ``scratch_dir`` / ``SCRATCH_DIR``); rewrite
  every call site to import from ``shared.utils.manifest`` and move the
  helper tests under ``tests/shared/utils/``.
- Drop two unnecessary ``# type: ignore`` comments on
  ``self._normalize_params`` calls in ``data_retrieval_executor`` — mypy
  resolves them cleanly without an escape.
- Replace the ``# type: ignore[import-untyped]`` on
  ``lumid_data.sdk.Client`` with a ``follow_untyped_imports`` override
  in ``pyproject.toml`` so the override applies to the whole SDK and
  goes away as soon as upstream ships type stubs.
- Name the worker-CPU pip-audit input file
  ``/tmp/requirements-worker-cpu-audit.txt`` so its purpose reads at a
  glance.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
GHSA-x368-4g9h-fvv4 (vllm 0.18.0, fix 0.19.1) and GHSA-83vm-p52w-f9pw
(vllm 0.18.0, fix 0.20.0) join the existing list — both are blocked by
the same transformers 4.57 / inference-deps pin that already keeps the
other vllm advisories on the ignore list.

GHSA-j7w6-vpvq-j3gm (diffusers 0.36.0, fix 0.38.0) is added separately:
diffusers 0.38 requires safetensors>=0.8.0rc0, which uv lock refuses to
resolve without an explicit pre-release opt-in. Holding the floor at
0.36 and ignoring until safetensors ships a non-rc 0.8.

Update the upgrade-blocker table in CODE_STYLE.md alongside the
workflow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/agent-connector branch from 0bc2ca3 to b3d8fb5 Compare May 7, 2026 03:48
Replaces the tempfile + os.replace approach with a direct write_text +
chmod 0o0666 on the manifest, and a guarded mkdir + chmod 0o0777 on the
output directories. Both rely on the file/dir's owner being the only
caller that needs to run chmod, which holds because sync_manifest is
the sole writer of the manifest and prepare_output_dir's chmod runs
only on creation. The "best-effort" PermissionError swallow on chmod
is gone — a chmod failure now propagates loudly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/agent-connector branch from b3d8fb5 to 7bf2e63 Compare May 7, 2026 03:55
@timzsu timzsu requested a review from kaiitunnz May 7, 2026 04:10
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments.

Comment thread src/shared/utils/manifest.py
Comment thread src/shared/utils/manifest.py
Comment thread src/worker/executors/utils/graph_templates.py
Comment thread tests/worker/test_task_output.py Outdated
Comment thread src/shared/utils/manifest.py Outdated
…arties

Promote ``shared.utils.manifest`` 's tempfile + os.replace path to a
standalone ``shared.utils.atomic.atomic_write_text`` helper and apply it
to every file the server and the worker can both write: the per-task
``manifest.json`` and ``results.json``. Each write goes through a
tempfile in the same directory, gets chmodded to ``0o0666``, and is
swapped in via ``os.replace`` so a peer-UID writer can replace it
without permission issues and a crash mid-write leaves either the old
file or the new one — never a half-written one.

Drop the manifest-permission/overwrite tests in
``tests/worker/test_task_output.py`` that the shared-utils suite
already covers, and add a one-line comment over the two ``pd.option_context``
sites in graph_templates so the intent of the width-cap toggles is
obvious.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu requested a review from kaiitunnz May 7, 2026 05:01
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Fix the pip-audit issues, and then you can merge.

@timzsu timzsu merged commit f67629a into main May 7, 2026
13 of 14 checks passed
@timzsu timzsu deleted the zsu/agent-connector branch May 7, 2026 05:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants