From 1ed5f1eb3ed167ae0f559fd585e6ad6cf2699fa3 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 11:32:22 +0800 Subject: [PATCH 1/6] add flmexec runner numpy e2e --- e2e/tests/test_flmexec.py | 84 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 e2e/tests/test_flmexec.py diff --git a/e2e/tests/test_flmexec.py b/e2e/tests/test_flmexec.py new file mode 100644 index 00000000..6ea823b2 --- /dev/null +++ b/e2e/tests/test_flmexec.py @@ -0,0 +1,84 @@ +""" +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import json +import textwrap + +import flamepy +import pytest + +RESULT_PREFIX = "FLMEXEC_RUNNER_NUMPY_RESULT=" + + +@pytest.fixture(scope="module") +def check_flmexec_runner_environment(): + context = flamepy.FlameContext() + package_config = getattr(context, "package", None) + cache_config = getattr(context, "cache", None) + has_package_storage = package_config is not None and getattr(package_config, "storage", None) is not None + has_cache_endpoint = cache_config is not None + if not has_package_storage and not has_cache_endpoint: + pytest.skip("Runner package storage is not configured") + + try: + if flamepy.get_application("flmexec") is None: + pytest.skip("flmexec application is not registered") + if flamepy.get_application("flmrun") is None: + pytest.skip("flmrun application is not registered") + except Exception as exc: + pytest.skip(f"Flame cluster is not available: {exc}") + + +@pytest.mark.timeout(600) +def test_flmexec_python_script_starts_runner_with_numpy_dependency(check_flmexec_runner_environment): + script = textwrap.dedent( + f""" + import json + import uuid + + from flamepy.runner import Runner + + def numpy_summary(limit): + import numpy as np + + values = np.arange(1, limit + 1, dtype=np.int64) + return {{ + "dtype": str(values.dtype), + "shape": list(values.shape), + "sum": int(values.sum()), + }} + + app_name = f"test-flmexec-runner-numpy-{{uuid.uuid4().hex[:8]}}" + + with Runner(app_name, dependencies=["numpy"]) as rr: + service = rr.service(numpy_summary) + result = service(5).get() + + print("{RESULT_PREFIX}" + json.dumps(result, sort_keys=True)) + """ + ) + + session = flamepy.create_session("flmexec") + try: + request = {"language": "python", "code": script, "input": None} + raw_response = session.invoke(json.dumps(request).encode("utf-8")) + finally: + session.close() + + response = json.loads(raw_response.decode("utf-8")) + output = bytes(response["data"]).decode("utf-8") + result_line = next((line for line in output.splitlines() if line.startswith(RESULT_PREFIX)), None) + assert result_line is not None, f"missing result line in flmexec output:\n{output}" + + result = json.loads(result_line.removeprefix(RESULT_PREFIX)) + assert result == {"dtype": "int64", "shape": [5], "sum": 15} From 256eec75aac52dc7e1358f4e1b371ea31ed57601 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 12:23:43 +0800 Subject: [PATCH 2/6] propagate flame env to flmexec python scripts --- flmexec/src/script/lang/python.rs | 52 +++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/flmexec/src/script/lang/python.rs b/flmexec/src/script/lang/python.rs index ad733746..606799d0 100644 --- a/flmexec/src/script/lang/python.rs +++ b/flmexec/src/script/lang/python.rs @@ -29,6 +29,29 @@ use crate::api::Script; use crate::script::{ScriptEngine, ScriptRuntime}; const DEFAULT_ENTRYPOINT: &str = "main.py"; +const FLAME_ENDPOINT_ENV: &str = "FLAME_ENDPOINT"; +const FLAME_CACHE_ENDPOINT_ENV: &str = "FLAME_CACHE_ENDPOINT"; +const FLAME_CA_FILE_ENV: &str = "FLAME_CA_FILE"; +const PROPAGATED_ENV_VARS: &[&str] = &[ + // Python/Flame + "PYTHONPATH", + "FLAME_HOME", + FLAME_PYTHON_VERSION_ENV, + FLAME_ENDPOINT_ENV, + FLAME_CACHE_ENDPOINT_ENV, + FLAME_CA_FILE_ENV, + // uv cache and config + "UV_CACHE_DIR", + "UV_PYTHON_INSTALL_DIR", + "XDG_CACHE_HOME", + // System essentials + "PATH", + "HOME", + "USER", + "TMPDIR", + "TMP", + "TEMP", +]; /// Get the uv command path from FLAME_HOME or fallback to system uv fn get_uv_cmd() -> String { @@ -69,23 +92,6 @@ impl PythonScript { // Propagate essential environment variables from parent process let mut env = HashMap::new(); - const PROPAGATED_ENV_VARS: &[&str] = &[ - // Python/Flame - "PYTHONPATH", - "FLAME_HOME", - FLAME_PYTHON_VERSION_ENV, - // uv cache and config - "UV_CACHE_DIR", - "UV_PYTHON_INSTALL_DIR", - "XDG_CACHE_HOME", - // System essentials - "PATH", - "HOME", - "USER", - "TMPDIR", - "TMP", - "TEMP", - ]; for key in PROPAGATED_ENV_VARS { if let Ok(value) = std::env::var(key) { env.insert((*key).to_string(), value); @@ -178,3 +184,15 @@ impl Drop for PythonScript { fs::remove_dir_all(Path::new(&self.runtime.work_dir)).unwrap(); } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn propagates_nested_flame_client_env_vars() { + assert!(PROPAGATED_ENV_VARS.contains(&FLAME_ENDPOINT_ENV)); + assert!(PROPAGATED_ENV_VARS.contains(&FLAME_CACHE_ENDPOINT_ENV)); + assert!(PROPAGATED_ENV_VARS.contains(&FLAME_CA_FILE_ENV)); + } +} From 273dff40b39b2dfef21ad4fea226d40b893f59b4 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 16:46:34 +0800 Subject: [PATCH 3/6] Fix flmexec Python runner environment --- Cargo.lock | 1 + e2e/tests/test_flmexec.py | 35 ++++--- flmexec/Cargo.toml | 3 + flmexec/src/script/lang/python.rs | 165 +++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c500be4c..6d2b53c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2207,6 +2207,7 @@ dependencies = [ "serde", "serde_derive", "stdng", + "tempfile", "tokio", "tracing", ] diff --git a/e2e/tests/test_flmexec.py b/e2e/tests/test_flmexec.py index 6ea823b2..d0a03d8f 100644 --- a/e2e/tests/test_flmexec.py +++ b/e2e/tests/test_flmexec.py @@ -44,27 +44,34 @@ def test_flmexec_python_script_starts_runner_with_numpy_dependency(check_flmexec script = textwrap.dedent( f""" import json + import sys + import traceback import uuid - from flamepy.runner import Runner + try: + from flamepy.runner import Runner - def numpy_summary(limit): - import numpy as np + def numpy_summary(limit): + import numpy as np - values = np.arange(1, limit + 1, dtype=np.int64) - return {{ - "dtype": str(values.dtype), - "shape": list(values.shape), - "sum": int(values.sum()), - }} + values = np.arange(1, limit + 1, dtype=np.int64) + return {{ + "dtype": str(values.dtype), + "shape": list(values.shape), + "sum": int(values.sum()), + }} - app_name = f"test-flmexec-runner-numpy-{{uuid.uuid4().hex[:8]}}" + app_name = f"test-flmexec-runner-numpy-{{uuid.uuid4().hex[:8]}}" - with Runner(app_name, dependencies=["numpy"]) as rr: - service = rr.service(numpy_summary) - result = service(5).get() + with Runner(app_name, dependencies=["numpy"]) as rr: + service = rr.service(numpy_summary) + result = service(5).get() - print("{RESULT_PREFIX}" + json.dumps(result, sort_keys=True)) + print("{RESULT_PREFIX}" + json.dumps(result, sort_keys=True)) + except BaseException: + traceback.print_exc(file=sys.stdout) + sys.stdout.flush() + raise """ ) diff --git a/flmexec/Cargo.toml b/flmexec/Cargo.toml index 2fa4f2a1..36ac2f37 100644 --- a/flmexec/Cargo.toml +++ b/flmexec/Cargo.toml @@ -18,6 +18,9 @@ clap = { workspace = true } indicatif = {version = "*", features = ["rayon"]} rand = { workspace = true } +[dev-dependencies] +tempfile = { workspace = true } + [[bin]] name = "flmexec" path = "src/client.rs" diff --git a/flmexec/src/script/lang/python.rs b/flmexec/src/script/lang/python.rs index 606799d0..f9b99ee4 100644 --- a/flmexec/src/script/lang/python.rs +++ b/flmexec/src/script/lang/python.rs @@ -13,9 +13,10 @@ limitations under the License. use std::{ collections::HashMap, + env, fs::{self, File}, io::{Read, Write}, - path::Path, + path::{Path, PathBuf}, process::{Command, Stdio}, thread, }; @@ -29,13 +30,16 @@ use crate::api::Script; use crate::script::{ScriptEngine, ScriptRuntime}; const DEFAULT_ENTRYPOINT: &str = "main.py"; +const DEFAULT_FLAME_HOME: &str = "/usr/local/flame"; +const PYTHONPATH_ENV: &str = "PYTHONPATH"; +const FLAME_HOME_ENV: &str = "FLAME_HOME"; const FLAME_ENDPOINT_ENV: &str = "FLAME_ENDPOINT"; const FLAME_CACHE_ENDPOINT_ENV: &str = "FLAME_CACHE_ENDPOINT"; const FLAME_CA_FILE_ENV: &str = "FLAME_CA_FILE"; const PROPAGATED_ENV_VARS: &[&str] = &[ // Python/Flame - "PYTHONPATH", - "FLAME_HOME", + PYTHONPATH_ENV, + FLAME_HOME_ENV, FLAME_PYTHON_VERSION_ENV, FLAME_ENDPOINT_ENV, FLAME_CACHE_ENDPOINT_ENV, @@ -55,7 +59,7 @@ const PROPAGATED_ENV_VARS: &[&str] = &[ /// Get the uv command path from FLAME_HOME or fallback to system uv fn get_uv_cmd() -> String { - let flame_home = std::env::var("FLAME_HOME").unwrap_or_else(|_| "/usr/local/flame".to_string()); + let flame_home = env::var(FLAME_HOME_ENV).unwrap_or_else(|_| DEFAULT_FLAME_HOME.to_string()); let uv_path = format!("{}/bin/uv", flame_home); // Check if uv exists in FLAME_HOME, otherwise fallback to system uv @@ -66,6 +70,104 @@ fn get_uv_cmd() -> String { } } +fn configure_python_env(envs: &mut HashMap) -> String { + let flame_home = flame_home(envs); + envs.entry(FLAME_HOME_ENV.to_string()) + .or_insert_with(|| flame_home.to_string_lossy().to_string()); + + let python_version = python_version(envs, &flame_home); + envs.insert(FLAME_PYTHON_VERSION_ENV.to_string(), python_version.clone()); + + let site_packages = site_packages_path(&flame_home, &python_version); + if site_packages.is_dir() { + prepend_path_env(envs, PYTHONPATH_ENV, &site_packages); + } else { + tracing::debug!( + "Flame Python site-packages not found: {}", + site_packages.display() + ); + } + + python_version +} + +fn flame_home(envs: &HashMap) -> PathBuf { + envs.get(FLAME_HOME_ENV) + .filter(|path| !path.is_empty()) + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from(DEFAULT_FLAME_HOME)) +} + +fn python_version(envs: &HashMap, flame_home: &Path) -> String { + envs.get(FLAME_PYTHON_VERSION_ENV) + .map(|version| version_number(version)) + .filter(|version| !version.is_empty()) + .map(ToOwned::to_owned) + .or_else(|| latest_installed_python_version(flame_home)) + .unwrap_or_else(|| DEFAULT_PYTHON_VERSION.to_string()) +} + +fn version_number(version: &str) -> &str { + version.strip_prefix("python").unwrap_or(version) +} + +fn latest_installed_python_version(flame_home: &Path) -> Option { + let lib_path = flame_home.join("lib"); + let mut versions = fs::read_dir(lib_path) + .ok()? + .flatten() + .filter_map(|entry| { + let path = entry.path(); + if !path.is_dir() || !path.join("site-packages").is_dir() { + return None; + } + + let name = entry.file_name(); + let name = name.to_string_lossy(); + name.strip_prefix("python") + .filter(|version| !version.is_empty()) + .map(|version| version.to_string()) + }) + .collect::>(); + + versions.sort_by_key(|version| minor_version(version)); + versions.pop() +} + +fn minor_version(version: &str) -> Vec { + version + .split('.') + .map(|part| part.parse::().unwrap_or(0)) + .collect() +} + +fn site_packages_path(flame_home: &Path, version: &str) -> PathBuf { + flame_home + .join("lib") + .join(format!("python{}", version_number(version))) + .join("site-packages") +} + +fn prepend_path_env(envs: &mut HashMap, key: &str, path: &Path) { + let mut paths = vec![path.to_path_buf()]; + if let Some(existing) = envs.get(key) { + paths.extend( + env::split_paths(existing) + .filter(|existing_path| !existing_path.as_os_str().is_empty()) + .filter(|existing_path| existing_path != path), + ); + } + + match env::join_paths(paths) { + Ok(joined) => { + envs.insert(key.to_string(), joined.to_string_lossy().to_string()); + } + Err(e) => { + tracing::warn!("Failed to build {key} with Flame site-packages: {e}"); + } + } +} + pub struct PythonScript { runtime: ScriptRuntime, } @@ -97,6 +199,7 @@ impl PythonScript { env.insert((*key).to_string(), value); } } + configure_python_env(&mut env); let runtime = ScriptRuntime { entrypoint: full_path.to_string_lossy().to_string(), @@ -195,4 +298,58 @@ mod tests { assert!(PROPAGATED_ENV_VARS.contains(&FLAME_CACHE_ENDPOINT_ENV)); assert!(PROPAGATED_ENV_VARS.contains(&FLAME_CA_FILE_ENV)); } + + #[test] + fn configures_python_env_from_installed_flame_site_packages() { + let temp = tempfile::tempdir().unwrap(); + let site_packages = temp.path().join("lib/python3.12/site-packages"); + fs::create_dir_all(&site_packages).unwrap(); + + let existing_path = temp.path().join("existing-pythonpath"); + let mut envs = HashMap::from([ + ( + FLAME_HOME_ENV.to_string(), + temp.path().to_string_lossy().to_string(), + ), + ( + PYTHONPATH_ENV.to_string(), + existing_path.to_string_lossy().to_string(), + ), + ]); + + let python_version = configure_python_env(&mut envs); + + assert_eq!(python_version, "3.12"); + assert_eq!(envs.get(FLAME_PYTHON_VERSION_ENV).unwrap(), "3.12"); + + let python_paths = env::split_paths(envs.get(PYTHONPATH_ENV).unwrap()).collect::>(); + assert_eq!(python_paths, vec![site_packages, existing_path]); + } + + #[test] + fn configures_requested_python_version() { + let temp = tempfile::tempdir().unwrap(); + let site_packages = temp.path().join("lib/python3.11/site-packages"); + fs::create_dir_all(&site_packages).unwrap(); + + let mut envs = HashMap::from([ + ( + FLAME_HOME_ENV.to_string(), + temp.path().to_string_lossy().to_string(), + ), + ( + FLAME_PYTHON_VERSION_ENV.to_string(), + "python3.11".to_string(), + ), + ]); + + let python_version = configure_python_env(&mut envs); + + assert_eq!(python_version, "3.11"); + assert_eq!(envs.get(FLAME_PYTHON_VERSION_ENV).unwrap(), "3.11"); + assert_eq!( + env::split_paths(envs.get(PYTHONPATH_ENV).unwrap()).collect::>(), + vec![site_packages] + ); + } } From a2fadf857d43594b96010817430aa9de27b3abcf Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 17:15:41 +0800 Subject: [PATCH 4/6] Avoid compose work volume copy race --- compose.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/compose.yaml b/compose.yaml index eeea72e5..ac20d3ab 100644 --- a/compose.yaml +++ b/compose.yaml @@ -58,7 +58,11 @@ services: - ./ci/certs:/etc/flame/certs:ro - ./examples:/opt/examples - ./e2e:/opt/e2e - - flame-work:/usr/local/flame/work + - type: volume + source: flame-work + target: /usr/local/flame/work + volume: + nocopy: true - ./logs:/usr/local/flame/logs - ./work/cache:/usr/local/flame/data/cache networks: From 485bb3cf3813ab02f582cd90cdf1c566151a5971 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 17:33:07 +0800 Subject: [PATCH 5/6] Run flmexec e2e in bare-metal CI --- .github/workflows/e2e-bm.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e-bm.yaml b/.github/workflows/e2e-bm.yaml index 0301fae6..d4f5e498 100644 --- a/.github/workflows/e2e-bm.yaml +++ b/.github/workflows/e2e-bm.yaml @@ -118,13 +118,13 @@ jobs: echo "=== Verify object cache is accessible ===" curl -s http://127.0.0.1:9090/ || echo "Note: Object cache gRPC endpoint (expected no HTTP response)" - - name: Run test_runner + - name: Run Python E2E tests timeout-minutes: 20 run: | export PYTHONPATH="$(pwd)/e2e/src:$PYTHONPATH" export FLAME_LOG=DEBUG cd e2e - pytest -vv --durations=0 tests/test_runner.py + pytest -vv --durations=0 tests/test_runner.py tests/test_flmexec.py - name: Show service logs on failure if: failure() || cancelled() From 880a14c715d41ca0d8d8fde28943c7340480a373 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Tue, 26 May 2026 17:34:29 +0800 Subject: [PATCH 6/6] Simplify code verify workflow --- .github/workflows/code-verify.yaml | 29 +++++------------------------ 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/.github/workflows/code-verify.yaml b/.github/workflows/code-verify.yaml index 56930b8c..c0f6f448 100644 --- a/.github/workflows/code-verify.yaml +++ b/.github/workflows/code-verify.yaml @@ -5,21 +5,14 @@ env: jobs: ci: name: Code Verify - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - rust: [stable] - os: [ubuntu-latest] + runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v2 - - name: Install rust - uses: actions-rs/toolchain@v1 + uses: actions/checkout@v4 + - name: Install Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: ${{ matrix.rust }} - profile: minimal - override: true + toolchain: stable components: rustfmt, clippy - name: Install gRPC run: | @@ -28,17 +21,5 @@ jobs: run: cargo fmt --all -- --check - name: Cargo clippy run: cargo clippy --all-targets --all-features -- -D warnings - - name: Cargo build - run: cargo build --release - name: Cargo test run: cargo test --workspace --exclude flame-rs --exclude cri-rs - - name: Install cargo-tarpaulin - run: cargo install cargo-tarpaulin - - name: Generate coverage report - run: cargo tarpaulin --workspace --exclude flame-rs --exclude cri-rs --exclude candle-based-example --out xml --output-dir coverage - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v4 - with: - files: coverage/cobertura.xml - fail_ci_if_error: false - token: ${{ secrets.CODECOV_TOKEN }}