Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,27 @@ class MyPayload:
| Iterable (w/ streaming) | Array | stream of JSON |


### Advanced Usage 3 - Disabling Reporting restart:


If your runtime doesn't have the restart-notify endpoint defined, you can set report_restart to False in the `start_compute_module` call and it will not call the endpoint when the module is restarted. The default is True. If set to True it will automatically clear all jobs that the forwarder thinks are still running in the user container.

Reporting the restart is important, because if the container restarts, the forwarder container will wait up to 22 hours for the job to complete. As a result, your replicas could become blocked and make no progress on new jobs. Therefore, it is recommended that you always set report_restart to True.

```python
# app.py
from compute_modules.annotations import function
from compute_modules import start_compute_module


@function
def add(context, event) -> int:
return event["x"] + event["y"]

if __name__ == "__main__":
start_compute_module(report_restart=False)
```

### `QueryContext` typing

You can annotate the `context` param in any function with the `QueryContext` type to make it statically typed:
Expand Down
7 changes: 7 additions & 0 deletions changelog/@unreleased/pr-40.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: fix
fix:
description: "Inform the
forwarder whenever a node starts up so it will remove all existing jobs related
to it that it thinks are still running."
links:
- https://github.com/palantir/python-compute-module/pull/40
33 changes: 33 additions & 0 deletions compute_modules/client/internal_query_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
POST_RESULT_MAX_ATTEMPTS = 5
POST_ERROR_MAX_ATTEMPTS = 3
POST_SCHEMAS_MAX_ATTEMPTS = 5
POST_RESTART_MAX_ATTEMPTS = 5


def _extract_path_from_url(url: str) -> str:
Expand All @@ -61,6 +62,7 @@ def __init__(
self.get_job_path = _extract_path_from_url(os.environ["GET_JOB_URI"])
self.post_result_path = _extract_path_from_url(os.environ["POST_RESULT_URI"])
self.post_schema_path = _extract_path_from_url(os.environ["POST_SCHEMA_URI"])
self.post_restart_path = _extract_path_from_url(os.environ["RESTART_NOTIFICATION_URI"])
self._initialize_auth_token()
self._initialize_headers()
self.certPath = os.environ["CONNECTIONS_TO_OTHER_PODS_CA_PATH"]
Expand Down Expand Up @@ -97,6 +99,7 @@ def _initialize_headers(self) -> None:
"Module-Auth-Token": self.moduleAuthToken,
}
self.post_schema_headers = {"Content-Type": "application/json", "Module-Auth-Token": self.moduleAuthToken}
self.post_restart_headers = {"Module-Auth-Token": self.moduleAuthToken}

def _iterable_to_json_generator(self, iterable: Iterable[Any]) -> Iterable[bytes]:
self.logger.debug("iterating over result")
Expand Down Expand Up @@ -282,3 +285,33 @@ def get_result(
@staticmethod
def get_failed_query(exception: Exception) -> Dict[str, str]:
return {"exception": f"{str(exception)}: {traceback.format_exc()}"}

def report_restart(self) -> None:
post_restart_url = self.build_url(self.post_restart_path)
self.logger.debug(f"Reporting restart to {post_restart_url}")

for _ in range(POST_RESTART_MAX_ATTEMPTS):
try:
with self.session.request(
method="POST",
url=post_restart_url,
headers=self.post_restart_headers,
verify=self.certPath,
) as response:
self.logger.debug(
f"Reporting restart response status: {response.status_code} reason: {response.reason}"
)
if response.status_code == 200:
removed_jobs = ", ".join(response.json())
self.logger.warning(
f"Successfully reported restart. The following jobs got removed from the queue: {removed_jobs}"
)
return
else:
self.logger.error(
f"Unsuccessful in reporting restart: {response.status_code} {response.reason} {response.text}"
Comment thread
PaulJKathmann marked this conversation as resolved.
)
except Exception as e:
self.logger.error(f"Failed to report restart: {str(e)}")

raise RuntimeError(f"Unable to report restart after {POST_RESTART_MAX_ATTEMPTS} attempts")
3 changes: 3 additions & 0 deletions compute_modules/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def _worker_thread_init() -> None:

def start_compute_module(
concurrency_type: ConcurrencyType = ConcurrencyType.PROCESS_POOL,
report_restart: bool = True,
) -> None:
"""Starts a Compute Module that will Poll for jobs indefinitely"""
if DISABLE_STARTUP:
Expand All @@ -103,6 +104,8 @@ def start_compute_module(
streaming=STREAMING,
)
QUERY_CLIENT.post_query_schemas()
if report_restart:
QUERY_CLIENT.report_restart()
QUERY_CLIENT.logger.info(f"Starting to poll for jobs with concurrency {QUERY_CLIENT.concurrency}")
if concurrency_type == ConcurrencyType.PROCESS_POOL:
with Pool(QUERY_CLIENT.concurrency, initializer=_worker_process_init) as pool:
Expand Down