Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cli/src/flowmesh_cli/core/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ def asset_path(package: str, *parts: str) -> Path:
resource = resources.files(package)
for part in parts:
resource /= part
if not resources.is_resource(package, Path(*parts).as_posix()):
# is_resource only checks files, so we allow directories to pass through.
pass
try:
with resources.as_file(resource) as path:
return Path(path)
Expand Down
199 changes: 159 additions & 40 deletions cli/stack/src/flowmesh_cli_stack/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,54 @@
import sys
import tarfile
import tempfile
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path

import typer
from flowmesh.models.nodes import NodeRole
from flowmesh_cli.core import logging
from flowmesh_cli.core.typer import get_typer
from packaging.version import InvalidVersion, Version

from . import stack as stack_module
from .utils import DEFAULT_ENV_FILE, parse_node_role, resolve_package_version

app = get_typer(
help="Package FlowMesh deployments into portable bundles for distribution."
)

_TLS_SERVER_SUBDIR = "secrets/tls/server"
_TLS_REDIS_SUBDIR = "secrets/tls/redis"
_WORKER_CONFIG_FILE = "configs/worker_config.yaml"

_SERVER_TLS_SOURCES = (
Path(_TLS_SERVER_SUBDIR) / "server-ca.pem",
Path(_TLS_SERVER_SUBDIR) / "server.key",
Path(_TLS_SERVER_SUBDIR) / "server.pem",
)
_REDIS_TLS_CA_SOURCE = Path(_TLS_REDIS_SUBDIR) / "redis-ca.pem"
_REDIS_TLS_CERT_SOURCES = (
Path(_TLS_REDIS_SUBDIR) / "redis-server.pem",
Path(_TLS_REDIS_SUBDIR) / "redis-server.key",
)
_WORKER_CONFIG_SOURCE = Path(_WORKER_CONFIG_FILE)


def _copy_redis_tls_assets(dest: Path, include_tls: bool, *, ca_only: bool) -> None:
if not include_tls:
return
tls_dir = dest / "tls" / "redis"
ca_src = Path("secrets/tls/redis/redis-ca.pem")
cert_src = Path("secrets/tls/redis/redis-server.pem")
key_src = Path("secrets/tls/redis/redis-server.key")
tls_dir = dest / _TLS_REDIS_SUBDIR
sources: tuple[Path, ...] = (_REDIS_TLS_CA_SOURCE,)
if not ca_only:
sources = sources + _REDIS_TLS_CERT_SOURCES
copied = False
missing: list[str] = []
if ca_src.exists():
tls_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(ca_src, tls_dir / ca_src.name)
copied = True
else:
missing.append(ca_src.name)
if not ca_only:
for src in (cert_src, key_src):
if src.exists():
tls_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, tls_dir / src.name)
copied = True
else:
missing.append(src.name)
for src in sources:
if src.exists():
tls_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, tls_dir / src.name)
copied = True
else:
missing.append(src.name)
if not copied:
logging.warning(
"Warning: Redis TLS assets not found; bundle created without TLS."
Expand All @@ -50,15 +62,14 @@ def _copy_redis_tls_assets(dest: Path, include_tls: bool, *, ca_only: bool) -> N
logging.warning(f"Warning: Redis TLS assets missing: {missing_str}")


def _copy_server_assets(dest: Path, include_tls: bool) -> None:
def _copy_server_assets(
dest: Path, include_tls: bool, role: NodeRole = NodeRole.ROOT
) -> None:
if include_tls:
tls_dir = dest / "tls" / "server"
ca_src = Path("secrets/tls/server/server-ca.pem")
key_src = Path("secrets/tls/server/server.key")
pem_src = Path("secrets/tls/server/server.pem")
tls_dir = dest / _TLS_SERVER_SUBDIR
copied = False
missing: list[str] = []
for src in (ca_src, key_src, pem_src):
for src in _SERVER_TLS_SOURCES:
if src.exists():
tls_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, tls_dir / src.name)
Expand All @@ -72,13 +83,36 @@ def _copy_server_assets(dest: Path, include_tls: bool) -> None:
elif missing:
missing_str = ", ".join(missing)
logging.warning(f"Warning: TLS assets missing: {missing_str}")
_copy_redis_tls_assets(dest, include_tls, ca_only=True)
# Worker nodes don't host Redis (compose root profile gates it), so they
# only need the CA to verify the root's TLS.
_copy_redis_tls_assets(dest, include_tls, ca_only=role == NodeRole.WORKER)

default_worker_config = Path("configs/worker_config.yaml")
if default_worker_config.exists():
shutil.copy2(default_worker_config, dest / "worker_config.yaml")
if _WORKER_CONFIG_SOURCE.exists():
worker_config_dest = dest / _WORKER_CONFIG_FILE
worker_config_dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(_WORKER_CONFIG_SOURCE, worker_config_dest)
else:
logging.warning(f"Warning: worker config not found: {default_worker_config}")
logging.warning(f"Warning: worker config not found: {_WORKER_CONFIG_SOURCE}")


def _scaffold_server_assets(dest: Path, include_tls: bool) -> None:
"""Scaffold the bundle directory layout in-place at ``dest``."""
dest.mkdir(parents=True, exist_ok=True)

if include_tls:
for subdir in (_TLS_SERVER_SUBDIR, _TLS_REDIS_SUBDIR):
target = dest / subdir
existed = target.is_dir()
target.mkdir(parents=True, exist_ok=True)
logging.log(f"{'kept' if existed else 'created'} {subdir}/")

worker_config = dest / _WORKER_CONFIG_FILE
if worker_config.exists():
logging.log(f"kept {_WORKER_CONFIG_FILE}")
else:
worker_config.parent.mkdir(parents=True, exist_ok=True)
worker_config.touch()
logging.log(f"created {_WORKER_CONFIG_FILE}")


def _build_cli_wheels(wheel_dir: Path) -> None:
Expand Down Expand Up @@ -115,9 +149,8 @@ def _build_cli_wheels(wheel_dir: Path) -> None:

def _published_cli_spec() -> str:
"""Return the published FlowMesh CLI package spec for this release."""
try:
package_version = version("flowmesh-cli-stack")
except PackageNotFoundError:
package_version = resolve_package_version()
if package_version is None:
logging.error("Unable to resolve installed flowmesh-cli-stack version.")
raise typer.Exit(code=1) from None
try:
Expand All @@ -143,10 +176,16 @@ def _published_cli_spec() -> str:


def _write_install_script(
dest: Path, *, package_spec: str | None = None, include_wheels: bool = False
dest: Path,
*,
package_spec: str | None = None,
include_wheels: bool = False,
role: NodeRole = NodeRole.ROOT,
) -> None:
"""Write an install.sh script to set up a venv and install FlowMesh CLI."""
script_path = dest / "install.sh"
role_arg = "" if role == NodeRole.ROOT else f" --role {role.value}"
deploy_arg = " --deploy"
if include_wheels:
install_block = '"$UV_BIN" pip install ./wheels/*.whl'
else:
Expand All @@ -169,6 +208,10 @@ def _write_install_script(
script = f"""#!/usr/bin/env bash
set -euo pipefail

# Anchor all relative paths (./wheels, .venv, .env, secrets/, configs/) to
# the bundle directory so the operator can run this from anywhere.
cd "$(dirname "$0")"

VENV_DIR="${{VENV_DIR:-.venv}}"
UV_BIN="${{UV_BIN:-uv}}"
PYTHON_REQ="${{FLOWMESH_PYTHON:-3.12}}"
Expand Down Expand Up @@ -197,7 +240,7 @@ def _write_install_script(
echo "Installed flowmesh CLI into $VENV_DIR."
echo "Activate it with 'source $VENV_DIR/bin/activate'."
if [ ! -f "$ENV_FILE" ]; then
flowmesh stack init --env-file "$ENV_FILE"
flowmesh stack init --env-file "$ENV_FILE"{role_arg}{deploy_arg}
fi
echo "Configure $ENV_FILE before executing FlowMesh."
echo "Then run:"
Expand All @@ -211,24 +254,29 @@ def _write_install_script(


def _create_bundle_tarball(
tar_path: Path, include_tls: bool, *, include_wheels: bool
tar_path: Path,
include_tls: bool,
*,
include_wheels: bool,
role: NodeRole = NodeRole.ROOT,
) -> None:
"""Create a deployable bundle as a tar.gz with a top-level prefix."""
tar_path.parent.mkdir(parents=True, exist_ok=True)
prefix = "flowmesh_server_bundle"
with tempfile.TemporaryDirectory(prefix="flowmesh-bundle-") as tmp:
staging_root = Path(tmp) / prefix
staging_root.mkdir(parents=True, exist_ok=True)
_copy_server_assets(staging_root, include_tls=include_tls)
_copy_server_assets(staging_root, include_tls=include_tls, role=role)
if include_wheels:
wheel_dir = staging_root / "wheels"
_build_cli_wheels(wheel_dir)
_write_install_script(staging_root, include_wheels=True)
_write_install_script(staging_root, include_wheels=True, role=role)
else:
_write_install_script(
staging_root,
package_spec=_published_cli_spec(),
include_wheels=False,
role=role,
)
with tarfile.open(tar_path, mode="w:gz") as tf:
# Ensure we archive the top-level prefix directory.
Expand All @@ -237,6 +285,10 @@ def _create_bundle_tarball(

@app.command("export")
def bundle_export(
role: str = typer.Argument(
NodeRole.ROOT.value,
help="Target NODE_ROLE for the bundle (root|worker).",
),
output: Path = typer.Option(
None,
"--output",
Expand All @@ -254,12 +306,79 @@ def bundle_export(
),
) -> None:
"""Create a deployment bundle for the server."""
logging.info("Creating server bundle...")
node_role = parse_node_role(role)
logging.info(f"Creating server bundle for role={node_role.value}...")
tar_path = output
if tar_path is None:
tar_path = Path("./dist/flowmesh_server_bundle.tar.gz")
tar_path.parent.mkdir(parents=True, exist_ok=True)
_create_bundle_tarball(
tar_path, include_tls=not no_tls, include_wheels=include_wheels
tar_path,
include_tls=not no_tls,
include_wheels=include_wheels,
role=node_role,
)
logging.success(f"Bundle created at {tar_path}")


@app.command("init")
def bundle_init(
dest: Path = typer.Option(
Path("."),
"--dest",
help="Directory to scaffold the bundle layout in (default: current dir).",
),
no_tls: bool = typer.Option(
False, "--no-tls", help="Skip TLS placeholder directories."
),
env_file: Path = typer.Option(
DEFAULT_ENV_FILE,
"--env-file",
help="Env file to write under --dest (or absolute path).",
),
force: bool = typer.Option(
False,
"--force",
"-f",
help="Overwrite an existing env file without prompting.",
),
role: str = typer.Option(
NodeRole.ROOT.value,
"--role",
help="Target NODE_ROLE for the scaffolded .env (root|worker).",
),
) -> None:
"""Scaffold an empty bundle layout in --dest."""
node_role = parse_node_role(role)
logging.info(f"Initializing server bundle layout in '{dest}'...")
_scaffold_server_assets(dest, include_tls=not no_tls)
resolved_env = env_file if env_file.is_absolute() else dest / env_file
resolved_env.parent.mkdir(parents=True, exist_ok=True)
stack_module.init(
env_file=resolved_env, force=force, role=node_role.value, deploy=True
)
# Paths in the next-steps block are intentionally relative to dest so
# they remain accurate after the user runs the cd line.
env_hint = env_file if not env_file.is_absolute() else resolved_env
cd_hint = "" if dest == Path(".") else f" cd {dest}\n"
env_arg = "" if env_file == DEFAULT_ENV_FILE else f" --env-file {env_hint}"
if no_tls:
tls_hint = ""
elif node_role == NodeRole.WORKER:
# Worker nodes don't host Redis, so only the CA is needed there.
tls_hint = (
f" drop server TLS certs into {_TLS_SERVER_SUBDIR}/ "
f"and the Redis CA into {_TLS_REDIS_SUBDIR}/redis-ca.pem\n"
)
else:
tls_hint = (
f" drop TLS certs into {_TLS_SERVER_SUBDIR}/ and {_TLS_REDIS_SUBDIR}/\n"
)
logging.success(f"Bundle layout ready at '{dest}'.")
logging.log(
f"Next steps:\n{cd_hint}"
f" edit {env_hint} and {_WORKER_CONFIG_FILE}\n"
f"{tls_hint}"
f" flowmesh stack pull{env_arg}\n"
f" flowmesh stack up{env_arg}"
)
26 changes: 24 additions & 2 deletions cli/stack/src/flowmesh_cli_stack/env_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Stack env schema."""

from flowmesh.models.nodes import NodeRole
from flowmesh_stack.env_schema import (
EnvSchema,
EnvSection,
Expand Down Expand Up @@ -46,9 +47,9 @@
),
EnvVar(
"NODE_ROLE",
"root",
NodeRole.ROOT.value,
var_type=EnvVarType.ENUM,
choices={"root", "worker"},
choices=NodeRole,
),
EnvVar("NODE_NAMESPACE", "flowmesh"),
EnvVar("NODE_CLUSTER", "dev"),
Expand Down Expand Up @@ -622,3 +623,24 @@
),
],
)


# Schema-default overrides applied when rendering a worker-role .env.
# Unused vars are blanked out to avoid confusion and misconfiguration.
WORKER_ROLE_OVERRIDES = {
"NODE_ROLE": NodeRole.WORKER.value,
"REDIS_TLS_CERT_FILE": "",
"REDIS_TLS_KEY_FILE": "",
}


def role_overrides(role: NodeRole) -> dict[str, str]:
"""Return the schema-default overrides for a given role's rendered .env."""
return WORKER_ROLE_OVERRIDES.copy() if role == NodeRole.WORKER else {}


def deploy_overrides(deploy: bool, version: str | None = None) -> dict[str, str]:
"""Return the schema-default overrides for a deploy-shaped rendered .env."""
if not (deploy and version):
return {}
return {"FLOWMESH_VERSION": version}
Loading