feat: Kubernetes runner and S3 storage adapters#93
Conversation
…r backend
- Replace OciProvider with RunnerProvider supporting both OCI and K8s backends
- Add K8sConfig for Kubernetes-specific settings (namespace, PVC, etc.)
- Add RunnerConfig with backend selection ("oci" or "k8s")
- Remove runner field from SourceDefinition (now global config)
- Add deposition_srn to HookInputs and convention_srn to SourceInputs
- Extract shared runner utilities (memory parsing, progress parsing)
- Add K8s Job-based runners with security hardening and orphan handling
- Update storage adapter for cross-device compatibility (S3 CSI)
- Add kubernetes-asyncio as optional dependency with [k8s] extra
- Add proper error classification and health checks for K8s API
refactor: extract utility functions from OciHookRunner to shared module
Move parse_memory, parse_progress_file, and detect_rejection functions
to runner_utils module to improve code reusability and testability.
Update tests to use extracted functions directly and add missing
deposition_srn parameter to HookInputs test instances.
Greptile SummaryThis PR adds Kubernetes Job-based runner infrastructure ( Key changes:
Remaining minor items:
Confidence Score: 5/5
|
| Filename | Overview |
|---|---|
| server/osa/infrastructure/k8s/runner.py | New K8s hook runner: well-structured Job lifecycle with orphan detection, scheduling/completion phases, final-poll timeout safety net, and full security parity (read-only rootfs, dropped caps, non-root UID, no network policy for hooks). Label sanitization via label_value() correctly addresses the previous SRN-colon issue. |
| server/osa/infrastructure/k8s/source_runner.py | New K8s source runner mirrors hook runner structure. The final-poll safety net after the timeout loop is present (addresses previous thread concern). Minor: _cleanup_job warning log omits error details unlike runner.py. osa.io/digest label truncates SHA-256 hashes after 56 hex chars. |
| server/osa/infrastructure/k8s/naming.py | sanitize_label and label_value correctly handle K8s label constraints (alphanumeric/dash/dot/underscore, 63-char max). Full SHA-256 digests are silently truncated to 56 hex chars within the budget; functionally fine in practice but worth noting. |
| server/osa/infrastructure/persistence/adapter/storage.py | save_file S3 CSI fallback now wraps copy2 OSErrors in InfrastructureError (addresses previous thread concern). move_source_files_to_deposition already had consistent wrapping. Both atomic-write paths are covered by the new test suite. |
| server/osa/infrastructure/runner_utils.py | Extracted shared utilities (progress parsing, rejection detection, memory conversion, PVC-relative path) used by both OCI and K8s runners. Deferred import logfire inside parse_records_file/parse_session_file remains (previously flagged). |
| server/osa/config.py | Adds K8sConfig and RunnerConfig with a @model_validator that rejects backend=="k8s" when data_pvc_name is empty — addressing the previous thread's missing cross-field validation concern. Tests in test_config.py cover all four branches. |
| server/osa/infrastructure/k8s/di.py | Dishka conditional activation (Marker/when=) cleanly routes HookRunner and SourceRunner resolution to either OCI or K8s factories based on runner.backend. Startup health check is performed during APP-scope K8s client initialization. |
| server/osa/infrastructure/k8s/health.py | Startup health check validates namespace access (RBAC), namespace existence, and PVC availability with actionable error messages for each failure mode. |
| server/tests/unit/infrastructure/k8s/test_k8s_hook_runner.py | Comprehensive unit tests covering Job spec generation, security context, scheduling/execution watch, orphan handling, cleanup, and identity threading. Label assertions now verify sanitized SRN values. |
| server/tests/unit/infrastructure/test_file_storage_move.py | New test file covering both move_source_files_to_deposition and save_file fallback paths: rename works locally, copy+delete on OSError, idempotent retry, and InfrastructureError wrapping for copy failures. |
Sequence Diagram
sequenceDiagram
participant Svc as Service Layer
participant DI as Dishka DI
participant Runner as K8sHookRunner / K8sSourceRunner
participant K8s as Kubernetes API
participant PVC as PVC (S3 CSI / local)
Svc->>DI: resolve HookRunner (backend="k8s")
DI->>Runner: inject ApiClient + K8sConfig
Svc->>Runner: run(hook, inputs, work_dir)
Runner->>PVC: write input files (record.json, config.json)
Runner->>K8s: list_namespaced_job (orphan check, label selector)
K8s-->>Runner: [] (no existing job)
Runner->>K8s: create_namespaced_job (V1Job spec)
Note over Runner,K8s: labels: osa.io/role, osa.io/hook, osa.io/deposition (sanitized)
loop Phase 1 — Scheduling (120s timeout)
Runner->>K8s: list_namespaced_pod (job-name selector)
K8s-->>Runner: pod phase
end
loop Phase 2 — Completion (timeout_seconds + 30)
Runner->>K8s: read_namespaced_job
K8s-->>Runner: status.succeeded / conditions
end
alt succeeded
Runner->>PVC: parse progress.jsonl → HookResult
else failed
Runner->>K8s: list_namespaced_pod (diagnose OOM / exit code)
Runner-->>Svc: HookResult(FAILED)
end
Runner->>K8s: delete_namespaced_job (cleanup, propagation=Background)
Comments Outside Diff (2)
-
server/osa/infrastructure/k8s/source_runner.py, line 438-448 (link)Missing error details in cleanup warning log
_cleanup_jobinsource_runner.pylogs the warning without including the exception, making it harder to debug cleanup failures in production. The equivalent method inrunner.py(line 503) correctly includes"error": str(exc)in the extra fields. -
server/osa/infrastructure/k8s/naming.py, line 9-17 (link)SHA-256 digest labels silently truncate the hash prefix
sanitize_labeltruncates to 63 characters. A full digest likesha256:abc123...(71 chars after colon→dot substitution:sha256.+ 64 hex chars = 71 chars) is truncated tosha256.+ 56 hex chars. Only the first 56 of 64 hex characters are stored inosa.io/digest.While a SHA-256 collision on the truncated prefix is astronomically unlikely in practice, the label silently no longer uniquely identifies the full digest. Consider stripping the
sha256:prefix before sanitizing to preserve more entropy within the 63-char budget:def label_value_for_digest(digest: str) -> str: # "sha256:abc..." → "abc..." (algorithm prefix is implicit) raw = digest.split(":", 1)[-1] if ":" in digest else digest return sanitize_label(raw)
This is already the pattern used by
job_name(), which extracts only the ID fragment from SRN strings.
Reviews (4): Last reviewed commit: "feat: add k8s memory quantity conversion..." | Re-trigger Greptile
| labels = { | ||
| "osa.io/role": "hook", | ||
| "osa.io/hook": hook.name, | ||
| "osa.io/deposition": deposition_srn, | ||
| } |
There was a problem hiding this comment.
Invalid Kubernetes label values — SRN strings contain colons
The deposition_srn field (e.g., "urn:osa:localhost:dep:abc123") is used directly as a Kubernetes label value. Kubernetes label values may only contain alphanumerics, -, _, and . — colons (:) are not permitted. When create_namespaced_job is called against a real cluster the API server will reject the request with a validation error like:
Invalid value: "urn:osa:localhost:dep:abc123": a valid label must be an empty string or
consist of alphanumeric characters, '-', '_' or '.', and must start and end with an
alphanumeric character (regex: (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?)
The unit tests don't catch this because they use in-process Python objects, which don't apply K8s API validation.
The same problem affects the label selector in _check_existing_job:
label_selector = f"osa.io/hook={hook_name},osa.io/deposition={deposition_srn}"Fix: extract only the ID fragment from the SRN (the same way job_name() already does with srn_parts[-1]), or sanitize the string before embedding it in labels:
# In _build_job_spec (and _check_existing_job):
dep_id = deposition_srn.split(":")[-1] if deposition_srn else ""
labels = {
"osa.io/role": "hook",
"osa.io/hook": hook.name,
"osa.io/deposition": dep_id,
}The identical issue exists in source_runner.py for the osa.io/convention label.
| label_parts = ["osa.io/role=source"] | ||
| if convention_srn: | ||
| label_parts.append(f"osa.io/convention={convention_srn}") | ||
| label_selector = ",".join(label_parts) |
There was a problem hiding this comment.
Invalid Kubernetes label values — SRN contains colons (same issue as
runner.py)
convention_srn (e.g., "urn:osa:localhost:conv:xyz") is used directly as a label value and as part of a label selector. Both will be rejected by the K8s API server at runtime.
label_parts.append(f"osa.io/convention={convention_srn}")and
labels["osa.io/convention"] = convention_srnFix: use only the trailing ID fragment, consistent with the job_name() helper:
conv_id = convention_srn.split(":")[-1] if convention_srn else ""
labels["osa.io/convention"] = conv_id
...
label_parts.append(f"osa.io/convention={conv_id}")| async def _check_existing_job( | ||
| self, batch_api: BatchV1Api, namespace: str, convention_srn: str | ||
| ) -> str | None: | ||
| label_parts = ["osa.io/role=source"] | ||
| if convention_srn: | ||
| label_parts.append(f"osa.io/convention={convention_srn}") | ||
| label_selector = ",".join(label_parts) | ||
|
|
||
| try: | ||
| job_list = await batch_api.list_namespaced_job(namespace, label_selector=label_selector) | ||
| except Exception as exc: | ||
| raise classify_api_error(exc) from exc | ||
|
|
||
| for job in job_list.items: | ||
| if job.status.succeeded: | ||
| return "succeeded" | ||
| if job.status.active: | ||
| return f"active:{job.metadata.name}" | ||
| return None |
There was a problem hiding this comment.
Orphan detection missing source-level discriminator
The label selector used to detect an already-running or completed source job filters only on osa.io/role=source (and optionally osa.io/convention). The _build_job_spec method also does not emit a label for the source identity (image or any other identifier).
If more than one distinct source image is ever run for the same convention — either currently or in a retry scenario — _check_existing_job will find the first match (any source job for that convention) and either short-circuit to its output or attach to a running job that belongs to a completely different source container. Contrast this with the hook runner, which includes osa.io/hook={hook.name} as an additional discriminator.
Even if the current domain model enforces one source per convention, making the labelling explicit prevents accidental cross-matching if that constraint is relaxed later, and makes debugging in kubectl much easier.
Consider adding the image reference (or a sanitised form of it) as an additional label:
# sanitise colon / slash / @ so the value is k8s-label-safe
image_label = re.sub(r"[^a-zA-Z0-9._-]", "-", source.image)[:63].strip("-")
labels["osa.io/source-image"] = image_labeland including it in the selector in _check_existing_job.
|
|
||
| return "failed:WatchTimeout" |
There was a problem hiding this comment.
Missing final-poll safety net on timeout
K8sHookRunner._wait_for_completion (in runner.py) does one extra read after the deadline loop to guard against a race where the job completes in the last millisecond:
# Timed out — poll once more
try:
job = await batch_api.read_namespaced_job(job_name, namespace)
if job.status.succeeded:
return "succeeded"
except Exception:
passThis source-runner equivalent simply returns "failed:WatchTimeout" without the final poll, making it slightly more likely to report a false timeout under load. The two methods should be consistent.
| return "failed:WatchTimeout" | |
| # Timed out — poll once more to guard against a last-second completion | |
| try: | |
| job = await batch_api.read_namespaced_job(job_name, namespace) | |
| if job.status.succeeded: | |
| return "succeeded" | |
| except Exception: | |
| pass | |
| return "failed:WatchTimeout" |
| labels = spec.spec.template.metadata.labels | ||
| assert labels["osa.io/role"] == "hook" | ||
| assert labels["osa.io/hook"] == "validate_dna" | ||
| assert labels["osa.io/deposition"] == "urn:osa:localhost:dep:abc123" |
There was a problem hiding this comment.
Unit test doesn't catch invalid label value
The test at line 183 asserts that labels["osa.io/deposition"] == "urn:osa:localhost:dep:abc123", but this confirms the bug rather than catching it — Kubernetes Python client objects accept any string, so no validation error fires in the unit test. Once the label sanitization is fixed in the runner, this test assertion should be updated to verify the sanitized value (e.g., "abc123") is stored instead.
Install k8s extra dependencies across all CI jobs to ensure Kubernetes-related functionality is available during testing, type checking, and deployment processes.
Replace string-based SRN handling with proper SRN type objects in source and validation runners to improve type safety and enable better K8s label generation. Add K8s label utilities for converting SRNs to DNS-compliant label values within 63-character limit.
|
@greptile |
|
| try: | ||
| Path(tmp_path).rename(target) | ||
| except OSError: | ||
| shutil.copy2(tmp_path, target) | ||
| Path(tmp_path).unlink(missing_ok=True) | ||
| except Exception: | ||
| Path(tmp_path).unlink(missing_ok=True) | ||
| raise |
There was a problem hiding this comment.
save_file copy failure not wrapped in InfrastructureError
When rename() fails on an S3 CSI mount and the fallback shutil.copy2 itself raises, the raw OSError propagates uncaught out of save_file. The outer except Exception: block just cleans up the temp file and re-raises. This is inconsistent with move_source_files_to_deposition, which wraps the same failure in InfrastructureError.
Any caller that catches only InfrastructureError (the domain error type) for storage failures will silently miss this path, turning an expected infrastructure error into an unhandled exception. There is also no unit test for this scenario in TestSaveFileFallback, meaning the gap is invisible.
try:
Path(tmp_path).rename(target)
except OSError:
try:
shutil.copy2(tmp_path, target)
Path(tmp_path).unlink(missing_ok=True)
except OSError as e:
raise InfrastructureError(f"Failed to write file {filename}: {e}") from e| class K8sConfig(BaseModel): | ||
| """Kubernetes-specific runner settings, required when runner.backend == "k8s".""" | ||
|
|
||
| namespace: str = "osa" | ||
| service_account: str | None = None | ||
| data_pvc_name: str = "" | ||
| data_mount_path: str = "/data" | ||
| image_pull_secrets: list[str] = [] | ||
| job_ttl_seconds: int = 300 | ||
|
|
||
|
|
||
| class RunnerConfig(BaseModel): | ||
| """Runner backend selection and Kubernetes configuration.""" | ||
|
|
||
| backend: Literal["oci", "k8s"] = "oci" | ||
| k8s: K8sConfig = K8sConfig() |
There was a problem hiding this comment.
Missing cross-field validation:
data_pvc_name required when backend == "k8s"
K8sConfig.data_pvc_name defaults to "". If an operator enables the K8s backend but forgets to set this field, the PVC claim name embedded in every Job spec will be an empty string. The error will only surface at startup via the health-check call to read_namespaced_persistent_volume_claim("", namespace), which returns a cryptic K8s 404. A @model_validator on RunnerConfig would surface the problem at config-parse time with a clear message:
class RunnerConfig(BaseModel):
backend: Literal["oci", "k8s"] = "oci"
k8s: K8sConfig = K8sConfig()
@model_validator(mode="after")
def validate_k8s_required_fields(self) -> "RunnerConfig":
if self.backend == "k8s" and not self.k8s.data_pvc_name:
raise ValueError(
"runner.k8s.data_pvc_name is required when runner.backend == 'k8s'. "
"Set OSA_RUNNER__K8S__DATA_PVC_NAME."
)
return self| def parse_records_file(output_dir: Path) -> list[dict[str, Any]]: | ||
| """Parse records.jsonl from source output directory.""" | ||
| import logfire | ||
|
|
||
| records: list[dict[str, Any]] = [] | ||
| records_file = output_dir / "records.jsonl" | ||
| if not records_file.exists(): | ||
| return records | ||
|
|
||
| for line in records_file.read_text().strip().split("\n"): | ||
| if not line.strip(): | ||
| continue | ||
| try: | ||
| records.append(json.loads(line)) | ||
| except json.JSONDecodeError: | ||
| logfire.warn("Skipping invalid JSON line in records.jsonl") | ||
| continue | ||
| return records |
There was a problem hiding this comment.
Deferred
import logfire inside utility functions
logfire is imported lazily inside parse_records_file and parse_session_file. Since logfire is a declared runtime dependency used at module level throughout the rest of the codebase, there is no reason to defer the import here. Keeping the import at the top of the module makes the dependency explicit, avoids a small per-call overhead, and prevents surprising ImportErrors surfacing at parse time rather than module load time.
import logfire
import json
...
def parse_records_file(output_dir: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
...| def _relative_path(self, path: Path) -> str: | ||
| """Strip the data mount prefix to get a PVC-relative subpath.""" | ||
| mount = self._config.data_mount_path.rstrip("/") | ||
| path_str = str(path) | ||
| if not path_str.startswith(mount): | ||
| raise ValueError(f"Path {path} is outside the data mount prefix {mount}") | ||
| return path_str[len(mount) :].lstrip("/") |
There was a problem hiding this comment.
_relative_path duplicated verbatim in source_runner.py
K8sHookRunner._relative_path (lines 329–335) and K8sSourceRunner._relative_path (lines 313–318 of source_runner.py) are character-for-character identical. Extracting this to runner_utils.py or a shared base class would eliminate the duplication — a future change to the mount-path stripping logic would otherwise need to be applied in two places.
…method Remove unused CoreV1Api parameter from _wait_for_completion method in both K8sHookRunner and K8sSourceRunner classes to clean up the method signature and improve code maintainability.
- Add model validator to ensure data_pvc_name is set when using K8s backend - Extract relative_path utility function to reduce code duplication - Improve file storage error handling with proper exception wrapping - Add comprehensive tests for K8s config validation and file operations
|
@greptile |
Convert Docker-style memory strings to K8s resource quantities to ensure proper memory limit specification in Kubernetes jobs fix: improve temp file cleanup in storage adapter Move temp file cleanup outside exception handler and add warning logging when cleanup fails to prevent masking original errors
|
@greptile |
Summary
infrastructure/k8s/) with full security parity to Docker runnersrunner.backend: "oci" | "k8s"using Dishka conditional activation (Marker/when=)FilesystemStorageAdapterfor S3 CSI mounts (copy+delete fallback whenrename()fails)runner_utils.py) used by both OCI and K8s runnersHookInputs.deposition_srn,SourceInputs.convention_srnkubernetes-asyncioas optional[k8s]dependencyTest plan
just test— 901 unit tests passjust lint— 0 errors, 0 warnings (fully clean)Closes #92