Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/runpod_flash/cli/commands/_run_server_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ async def lb_execute(resource_config, func, body: dict):
if routing and routing.get("method")
else func.__name__
)
log.info(f"{resource_config} | {route_label}")
log.info(f"[REMOTE] {resource_config} | {route_label}")

try:
result = await stub(
func, dependencies, system_dependencies, accelerate_downloads, **kwargs
)
log.info(f"{resource_config} | Execution complete")
log.info(f"[REMOTE] {resource_config} | Execution complete")
return result
except TimeoutError as e:
raise HTTPException(status_code=504, detail=str(e))
Expand Down
4 changes: 2 additions & 2 deletions src/runpod_flash/core/resources/serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ async def runsync(self, payload: Dict[str, Any]) -> "JobOutput":
)

def _fetch_job():
log.info(f"{self} | API /runsync")
log.info(f"[REMOTE] {self} | API /runsync")
return self.endpoint.rp_client.post(
f"{self.id}/runsync", payload, timeout=timeout_s
)
Expand Down Expand Up @@ -1294,7 +1294,7 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput":

try:
# Create a job using the endpoint
log.info(f"{self} | API /run")
log.info(f"[REMOTE] {self} | API /run")
job = await asyncio.to_thread(self.endpoint.run, request_input=payload)
Comment thread
deanq marked this conversation as resolved.

log_subgroup = f"Job:{job.job_id}"
Expand Down
4 changes: 2 additions & 2 deletions src/runpod_flash/stubs/load_balancer_sls.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def _execute_via_user_route(

# Construct full URL
url = f"{self.server.endpoint_url}{path}"
log.info(f"{self.server} | {method} {path}")
log.info(f"[REMOTE] {self.server} | {method} {path}")

try:
async with get_authenticated_httpx_client(
Expand All @@ -330,7 +330,7 @@ async def _execute_via_user_route(
response = await client.request(method, url, json=body)
response.raise_for_status()
result = response.json()
log.info(f"{self.server} | Execution complete")
log.info(f"[REMOTE] {self.server} | Execution complete")
log.debug(
f"User route execution successful (type={type(result).__name__})"
)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/cli/commands/test_run_server_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ async def fake_func(x: int):

assert result == 42
info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO]
assert any("GET /images/{filename}" in m for m in info_messages)
assert any("Execution complete" in m for m in info_messages)
assert any("[REMOTE]" in m and "GET /images/{filename}" in m for m in info_messages)
assert any("[REMOTE]" in m and "Execution complete" in m for m in info_messages)


@pytest.mark.asyncio
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/resources/test_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,87 @@ async def test_run_async_success(self):
assert result.id == "job-123"
assert result.status == "COMPLETED"

@pytest.mark.asyncio
async def test_run_logs_remote_prefix_on_dispatch(self):
"""run() must emit [REMOTE] prefix so users know execution is on Runpod cloud."""
Comment thread
deanq marked this conversation as resolved.
serverless = ServerlessResource(name="test")
serverless.id = "endpoint-123"

mock_job = MagicMock()
mock_job.job_id = "job-123"
mock_job.status.return_value = "COMPLETED"
mock_job._fetch_job.return_value = {
"id": "job-123",
"workerId": "worker-456",
"status": "COMPLETED",
"delayTime": 1000,
"executionTime": 2000,
"output": {"result": "success"},
}

mock_endpoint = MagicMock()
mock_endpoint.run.return_value = mock_job

with patch.object(
type(serverless),
"endpoint",
new_callable=lambda: property(lambda self: mock_endpoint),
):
with patch("asyncio.sleep"):
with patch("runpod_flash.core.resources.serverless.log") as mock_log:
await serverless.run({"input": "test data"})

dispatch_calls = [
call for call in mock_log.info.call_args_list if "API /run" in str(call)
]
assert len(dispatch_calls) == 1, "Expected exactly one 'API /run' log call"
log_message = dispatch_calls[0].args[0]
assert "[REMOTE]" in log_message, (
f"Expected [REMOTE] in log message, got: {log_message!r}"
)
assert "API /run" in log_message, (
f"Expected 'API /run' in log message, got: {log_message!r}"
)

@pytest.mark.asyncio
async def test_runsync_logs_remote_prefix_on_dispatch(self):
"""runsync() must emit [REMOTE] prefix so users know execution is on Runpod cloud."""
serverless = ServerlessResource(name="test")
serverless.id = "endpoint-123"

mock_rp_client = MagicMock()
mock_rp_client.post.return_value = {
"id": "job-123",
"workerId": "worker-456",
"status": "COMPLETED",
"delayTime": 1000,
"executionTime": 2000,
"output": {"result": "success"},
}

mock_endpoint = MagicMock()
mock_endpoint.rp_client = mock_rp_client

with patch.object(
type(serverless),
"endpoint",
new_callable=lambda: property(lambda self: mock_endpoint),
):
with patch("runpod_flash.core.resources.serverless.log") as mock_log:
await serverless.runsync({"input": "test data"})

dispatch_calls = [
call for call in mock_log.info.call_args_list if "API /runsync" in str(call)
]
assert len(dispatch_calls) == 1, "Expected exactly one 'API /runsync' log call"
log_message = dispatch_calls[0].args[0]
assert "[REMOTE]" in log_message, (
f"Expected [REMOTE] in log message, got: {log_message!r}"
)
assert "API /runsync" in log_message, (
f"Expected 'API /runsync' in log message, got: {log_message!r}"
)

@pytest.mark.asyncio
async def test_run_async_dedupes_stdout_against_streamed_pod_logs(self):
serverless = ServerlessResource(name="test")
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_load_balancer_sls_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,5 +482,5 @@ def add(x, y):
await stub._execute_via_user_route(add, "POST", "/api/add", 5, 3)

info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO]
assert any("POST /api/add" in m for m in info_messages)
assert any("Execution complete" in m for m in info_messages)
assert any("[REMOTE]" in m and "POST /api/add" in m for m in info_messages)
assert any("[REMOTE]" in m and "Execution complete" in m for m in info_messages)
Loading