-
Notifications
You must be signed in to change notification settings - Fork 6
feat: process injection via LiveServerlessMixin #260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,54 @@ | ||||||
| """Process injection utilities for flash-worker tarball delivery.""" | ||||||
|
|
||||||
| from .constants import FLASH_WORKER_TARBALL_URL_TEMPLATE, FLASH_WORKER_VERSION | ||||||
|
|
||||||
|
|
||||||
| def build_injection_cmd( | ||||||
| worker_version: str = FLASH_WORKER_VERSION, | ||||||
| tarball_url: str | None = None, | ||||||
| ) -> str: | ||||||
| """Build the dockerArgs command that downloads, extracts, and runs flash-worker. | ||||||
|
|
||||||
| Supports remote URLs (curl/wget) and local file paths (file://) for testing. | ||||||
| Includes version-based caching to skip re-extraction on warm workers. | ||||||
| Network volume caching stores extracted tarball at /runpod-volume/.flash-worker/v{version}. | ||||||
| """ | ||||||
| if tarball_url is None: | ||||||
| tarball_url = FLASH_WORKER_TARBALL_URL_TEMPLATE.format(version=worker_version) | ||||||
|
|
||||||
| if tarball_url.startswith("file://"): | ||||||
| local_path = tarball_url[7:] | ||||||
| return ( | ||||||
| "bash -c '" | ||||||
| "set -e; FW_DIR=/opt/flash-worker; " | ||||||
| "mkdir -p $FW_DIR; " | ||||||
| f"tar xzf {local_path} -C $FW_DIR --strip-components=1; " | ||||||
| "exec $FW_DIR/bootstrap.sh'" | ||||||
| ) | ||||||
|
|
||||||
| return ( | ||||||
| "bash -c '" | ||||||
| f"set -e; FW_DIR=/opt/flash-worker; FW_VER={worker_version}; " | ||||||
| # Network volume cache check | ||||||
| 'NV_CACHE="/runpod-volume/.flash-worker/v$FW_VER"; ' | ||||||
| 'if [ -d "$NV_CACHE" ] && [ -f "$NV_CACHE/.version" ]; then ' | ||||||
| 'cp -r "$NV_CACHE" "$FW_DIR"; ' | ||||||
| # Local cache check (container disk persistence between restarts) | ||||||
| 'elif [ -f "$FW_DIR/.version" ] && [ "$(cat $FW_DIR/.version)" = "$FW_VER" ]; then ' | ||||||
| "true; " | ||||||
| "else " | ||||||
| "mkdir -p $FW_DIR; " | ||||||
| f'DL_URL="{tarball_url}"; ' | ||||||
| "dl() { " | ||||||
| '(command -v curl >/dev/null 2>&1 && curl -sSL "$1" || ' | ||||||
| 'command -v wget >/dev/null 2>&1 && wget -qO- "$1" || ' | ||||||
| 'python3 -c "import urllib.request,sys;sys.stdout.buffer.write(urllib.request.urlopen(sys.argv[1]).read())" "$1"); ' | ||||||
| "}; " | ||||||
| 'dl "$DL_URL" ' | ||||||
| "| tar xz -C $FW_DIR --strip-components=1; " | ||||||
| # Cache to network volume if available | ||||||
| "if [ -d /runpod-volume ]; then " | ||||||
| 'mkdir -p "$NV_CACHE" && cp -r "$FW_DIR"/* "$NV_CACHE/" 2>/dev/null || true; fi; ' | ||||||
|
||||||
| 'mkdir -p "$NV_CACHE" && cp -r "$FW_DIR"/* "$NV_CACHE/" 2>/dev/null || true; fi; ' | |
| 'mkdir -p "$NV_CACHE" && cp -r "$FW_DIR"/. "$NV_CACHE"/ 2>/dev/null || true; fi; ' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,66 +1,92 @@ | ||
| # Ship serverless code as you write it. No builds, no deploys — just run. | ||
| # Ship serverless code as you write it. No builds, no deploys -- just run. | ||
| from pydantic import model_validator | ||
|
|
||
| from .constants import ( | ||
| FLASH_CPU_BASE_IMAGE, | ||
| FLASH_CPU_IMAGE, | ||
| FLASH_CPU_LB_IMAGE, | ||
| FLASH_GPU_BASE_IMAGE, | ||
| FLASH_GPU_IMAGE, | ||
| FLASH_LB_IMAGE, | ||
| ) | ||
| from .injection import build_injection_cmd | ||
| from .load_balancer_sls_resource import ( | ||
| CpuLoadBalancerSlsResource, | ||
| LoadBalancerSlsResource, | ||
| ) | ||
| from .serverless import ServerlessEndpoint | ||
| from .serverless_cpu import CpuServerlessEndpoint | ||
| from .template import PodTemplate | ||
|
|
||
|
|
||
| class LiveServerlessMixin: | ||
| """Common mixin for live serverless endpoints that locks the image.""" | ||
| """Configures process injection via dockerArgs for any base image. | ||
|
|
||
| Sets a default base image (user can override via imageName) and generates | ||
| dockerArgs to download, extract, and run the flash-worker tarball at container | ||
| start time. QB vs LB mode is determined by FLASH_ENDPOINT_TYPE env var at | ||
| runtime, not by the Docker image. | ||
| """ | ||
|
|
||
| @property | ||
| def _live_image(self) -> str: | ||
| """Override in subclasses to specify the locked image.""" | ||
| raise NotImplementedError("Subclasses must define _live_image") | ||
| def _default_base_image(self) -> str: | ||
| raise NotImplementedError("Subclasses must define _default_base_image") | ||
|
|
||
| @property | ||
| def imageName(self): | ||
| # Lock imageName to specific image | ||
| return self._live_image | ||
| def _legacy_image(self) -> str: | ||
| """Legacy Docker Hub image for preview mode.""" | ||
| raise NotImplementedError("Subclasses must define _legacy_image") | ||
|
|
||
| def _create_new_template(self) -> PodTemplate: | ||
| """Create template with dockerArgs for process injection.""" | ||
| template = super()._create_new_template() # type: ignore[misc] | ||
| template.dockerArgs = build_injection_cmd() | ||
| return template | ||
|
|
||
| @imageName.setter | ||
| def imageName(self, value): | ||
| # Prevent manual setting of imageName | ||
| pass | ||
| def _configure_existing_template(self) -> None: | ||
| """Configure existing template, adding dockerArgs for injection if not user-set.""" | ||
| super()._configure_existing_template() # type: ignore[misc] | ||
| if self.template is not None and not self.template.dockerArgs: # type: ignore[attr-defined] | ||
| self.template.dockerArgs = build_injection_cmd() # type: ignore[attr-defined] | ||
|
Comment on lines
+23
to
+50
|
||
|
|
||
|
|
||
| class LiveServerless(LiveServerlessMixin, ServerlessEndpoint): | ||
| """GPU-only live serverless endpoint.""" | ||
|
|
||
| @property | ||
| def _live_image(self) -> str: | ||
| def _default_base_image(self) -> str: | ||
| return FLASH_GPU_BASE_IMAGE | ||
|
|
||
| @property | ||
| def _legacy_image(self) -> str: | ||
| return FLASH_GPU_IMAGE | ||
|
|
||
| @model_validator(mode="before") | ||
| @classmethod | ||
| def set_live_serverless_template(cls, data: dict): | ||
| """Set default GPU image for Live Serverless.""" | ||
| data["imageName"] = FLASH_GPU_IMAGE | ||
| """Set default GPU base image for Live Serverless.""" | ||
| if not data.get("imageName"): | ||
| data["imageName"] = FLASH_GPU_BASE_IMAGE | ||
| return data | ||
|
|
||
|
|
||
| class CpuLiveServerless(LiveServerlessMixin, CpuServerlessEndpoint): | ||
| """CPU-only live serverless endpoint with automatic disk sizing.""" | ||
|
|
||
| @property | ||
| def _live_image(self) -> str: | ||
| def _default_base_image(self) -> str: | ||
| return FLASH_CPU_BASE_IMAGE | ||
|
|
||
| @property | ||
| def _legacy_image(self) -> str: | ||
| return FLASH_CPU_IMAGE | ||
|
Comment on lines
31
to
82
|
||
|
|
||
| @model_validator(mode="before") | ||
| @classmethod | ||
| def set_live_serverless_template(cls, data: dict): | ||
| """Set default CPU image for Live Serverless.""" | ||
| data["imageName"] = FLASH_CPU_IMAGE | ||
| """Set default CPU base image for Live Serverless.""" | ||
| if not data.get("imageName"): | ||
| data["imageName"] = FLASH_CPU_BASE_IMAGE | ||
| return data | ||
|
|
||
|
|
||
|
|
@@ -71,12 +97,6 @@ class LiveLoadBalancer(LiveServerlessMixin, LoadBalancerSlsResource): | |
| Enables local testing of @remote decorated functions with LB endpoints | ||
| before deploying to production. | ||
|
|
||
| Features: | ||
| - Locks to Flash LB image (flash-lb) | ||
| - Direct HTTP execution (not queue-based) | ||
| - Local development with flash run | ||
| - Same @remote decorator pattern as LoadBalancerSlsResource | ||
|
|
||
| Usage: | ||
| from runpod_flash import LiveLoadBalancer, remote | ||
|
|
||
|
|
@@ -85,32 +105,22 @@ class LiveLoadBalancer(LiveServerlessMixin, LoadBalancerSlsResource): | |
| @remote(api, method="POST", path="/api/process") | ||
| async def process_data(x: int, y: int): | ||
| return {"result": x + y} | ||
|
|
||
| # Test locally | ||
| result = await process_data(5, 3) | ||
|
|
||
| Local Development Flow: | ||
| 1. Create LiveLoadBalancer with routing | ||
| 2. Decorate functions with @remote(lb_resource, method=..., path=...) | ||
| 3. Run with `flash run` to start local endpoint | ||
| 4. Call functions directly in tests or scripts | ||
| 5. Deploy to production with `flash build` and `flash deploy` | ||
|
|
||
| Note: | ||
| The endpoint_url is configured by the Flash runtime when the | ||
| endpoint is deployed locally. For true local testing without | ||
| deployment, use the functions directly or mock the HTTP layer. | ||
| """ | ||
|
|
||
| @property | ||
| def _live_image(self) -> str: | ||
| def _default_base_image(self) -> str: | ||
| return FLASH_GPU_BASE_IMAGE | ||
|
|
||
| @property | ||
| def _legacy_image(self) -> str: | ||
| return FLASH_LB_IMAGE | ||
|
|
||
| @model_validator(mode="before") | ||
| @classmethod | ||
| def set_live_lb_template(cls, data: dict): | ||
| """Set default image for Live Load-Balanced endpoint.""" | ||
| data["imageName"] = FLASH_LB_IMAGE | ||
| if not data.get("imageName"): | ||
| data["imageName"] = FLASH_GPU_BASE_IMAGE | ||
| return data | ||
|
|
||
|
|
||
|
|
@@ -120,13 +130,6 @@ class CpuLiveLoadBalancer(LiveServerlessMixin, CpuLoadBalancerSlsResource): | |
| Similar to LiveLoadBalancer but configured for CPU instances with | ||
| automatic disk sizing and validation. | ||
|
|
||
| Features: | ||
| - Locks to CPU Flash LB image (flash-lb-cpu) | ||
| - CPU instance support with automatic disk sizing | ||
| - Direct HTTP execution (not queue-based) | ||
| - Local development with flash run | ||
| - Same @remote decorator pattern as CpuLoadBalancerSlsResource | ||
|
|
||
| Usage: | ||
| from runpod_flash import CpuLiveLoadBalancer, remote | ||
|
|
||
|
|
@@ -135,25 +138,20 @@ class CpuLiveLoadBalancer(LiveServerlessMixin, CpuLoadBalancerSlsResource): | |
| @remote(api, method="POST", path="/api/process") | ||
| async def process_data(x: int, y: int): | ||
| return {"result": x + y} | ||
|
|
||
| # Test locally | ||
| result = await process_data(5, 3) | ||
|
|
||
| Local Development Flow: | ||
| 1. Create CpuLiveLoadBalancer with routing | ||
| 2. Decorate functions with @remote(lb_resource, method=..., path=...) | ||
| 3. Run with `flash run` to start local endpoint | ||
| 4. Call functions directly in tests or scripts | ||
| 5. Deploy to production with `flash build` and `flash deploy` | ||
| """ | ||
|
|
||
| @property | ||
| def _live_image(self) -> str: | ||
| def _default_base_image(self) -> str: | ||
| return FLASH_CPU_BASE_IMAGE | ||
|
|
||
| @property | ||
| def _legacy_image(self) -> str: | ||
| return FLASH_CPU_LB_IMAGE | ||
|
|
||
| @model_validator(mode="before") | ||
| @classmethod | ||
| def set_live_cpu_lb_template(cls, data: dict): | ||
| """Set default CPU image for Live Load-Balanced endpoint.""" | ||
| data["imageName"] = FLASH_CPU_LB_IMAGE | ||
| if not data.get("imageName"): | ||
| data["imageName"] = FLASH_CPU_BASE_IMAGE | ||
| return data | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,11 +125,11 @@ def test_live_serverless_cpu_integration(self): | |
| ) | ||
|
|
||
| # Verify integration: | ||
| # 1. Uses CPU image (locked) | ||
| # 1. Uses CPU base image (default) | ||
| # 2. CPU utilities calculate minimum disk size | ||
| # 3. Template creation with auto-sizing | ||
| # 4. Validation passes | ||
| assert "flash-cpu:" in live_serverless.imageName | ||
| assert live_serverless.imageName == "python:3.11-slim" | ||
|
Comment on lines
+128
to
+132
|
||
| assert live_serverless.instanceIds == [ | ||
| CpuInstanceType.CPU5C_1_2, | ||
| CpuInstanceType.CPU5C_2_4, | ||
|
|
@@ -244,28 +244,24 @@ def test_mixed_cpu_generations_integration(self): | |
| assert "cpu5c-1-2: max 15GB" in error_msg | ||
|
|
||
|
|
||
| class TestLiveServerlessImageLockingIntegration: | ||
| """Test image locking integration in live serverless variants.""" | ||
| class TestLiveServerlessImageDefaultsIntegration: | ||
| """Test image defaults in live serverless variants.""" | ||
|
|
||
| def test_live_serverless_image_consistency(self): | ||
| """Test that LiveServerless variants maintain image consistency.""" | ||
| def test_live_serverless_image_defaults(self): | ||
| """Test that LiveServerless variants use correct base images.""" | ||
| gpu_live = LiveServerless(name="gpu-live") | ||
| cpu_live = CpuLiveServerless(name="cpu-live") | ||
|
|
||
| # Verify different images are used | ||
| # Verify different base images are used | ||
| assert gpu_live.imageName != cpu_live.imageName | ||
| assert "flash:" in gpu_live.imageName | ||
| assert "flash-cpu:" in cpu_live.imageName | ||
| assert "pytorch" in gpu_live.imageName | ||
| assert "python" in cpu_live.imageName | ||
|
Comment on lines
+255
to
+258
|
||
|
|
||
| # Verify images remain locked despite attempts to change | ||
| original_gpu_image = gpu_live.imageName | ||
| original_cpu_image = cpu_live.imageName | ||
|
|
||
| gpu_live.imageName = "custom/image:latest" | ||
| cpu_live.imageName = "custom/image:latest" | ||
|
|
||
| assert gpu_live.imageName == original_gpu_image | ||
| assert cpu_live.imageName == original_cpu_image | ||
| # Verify images can be overridden (BYOI) | ||
| custom_gpu = LiveServerless( | ||
| name="custom-gpu", imageName="nvidia/cuda:12.8.0-runtime" | ||
| ) | ||
| assert custom_gpu.imageName == "nvidia/cuda:12.8.0-runtime" | ||
|
|
||
| def test_live_serverless_template_integration(self): | ||
| """Test live serverless template integration with disk sizing.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -114,22 +114,18 @@ async def echo(message: str): | |
| # Verify resource is correctly configured | ||
| # Note: name may have "-fb" appended by flash boot validator | ||
| assert "test-live-api" in lb.name | ||
| assert "flash-lb" in lb.imageName | ||
| assert "pytorch" in lb.imageName # GPU base image | ||
| assert echo.__remote_config__["method"] == "POST" | ||
|
|
||
| def test_live_load_balancer_image_locked(self): | ||
| """Test that LiveLoadBalancer locks the image to Flash LB image.""" | ||
| def test_live_load_balancer_default_image(self): | ||
| """Test that LiveLoadBalancer uses GPU base image by default.""" | ||
| lb = LiveLoadBalancer(name="test-api") | ||
| assert "pytorch" in lb.imageName | ||
|
Comment on lines
+117
to
+123
|
||
|
|
||
| # Verify image is locked and cannot be overridden | ||
| original_image = lb.imageName | ||
| assert "flash-lb" in original_image | ||
|
|
||
| # Try to set a different image (should be ignored due to property) | ||
| lb.imageName = "custom-image:latest" | ||
|
|
||
| # Image should still be locked to Flash | ||
| assert lb.imageName == original_image | ||
| def test_live_load_balancer_allows_custom_image(self): | ||
| """Test that LiveLoadBalancer allows user to set custom image (BYOI).""" | ||
| lb = LiveLoadBalancer(name="test-api", imageName="custom-image:latest") | ||
| assert lb.imageName == "custom-image:latest" | ||
|
|
||
| def test_load_balancer_vs_queue_based_endpoints(self): | ||
| """Test that LB and QB endpoints have different characteristics.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The network-volume cache restore path looks incorrect:
cp -r "$NV_CACHE" "$FW_DIR"will copy the directory into$FW_DIR(e.g.,$FW_DIR/v1.1.1/...) rather than populating$FW_DIRitself, soexec $FW_DIR/bootstrap.shmay fail even when the cache is present. Consider copying the contents into$FW_DIR(preserving permissions) instead of copying the directory as a nested subdir.