Skip to content

Commit

Permalink
ENH: Allow asking of asynchronous questions via Child.ask
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Mar 5, 2024
1 parent 2cc8bbf commit fe2dafb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
25 changes: 18 additions & 7 deletions octue/resources/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,32 @@ def ask(
record_messages=True,
save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", # This is repeated as a string here to avoid a circular import.
question_uuid=None,
push_endpoint=None,
bigquery_table_id=None,
timeout=86400,
maximum_heartbeat_interval=300,
):
"""Ask the child a question and wait for its answer - i.e. send it input values and/or an input manifest and
wait for it to analyse them and return output values and/or an output manifest. The input values and manifest
must conform to the schema in the child's twine.
:param any|None input_values: any input values for the question
:param octue.resources.manifest.Manifest|None input_manifest: an input manifest of any datasets needed for the question
"""Ask the child either:
- A synchronous (ask-and-wait) question and wait for it to return an output. Questions are synchronous if
neither the `push_endpoint` or the `bigquery_table_id` argument is provided.
- An asynchronous (fire-and-forget) question and return immediately. To make a question asynchronous, provide
either the `push_endpoint` or `bigquery_table_id` argument.
:param any|None input_values: any input values for the question, conforming with the schema in the child's twine
:param octue.resources.manifest.Manifest|None input_manifest: an input manifest of any datasets needed for the question, conforming with the schema in the child's twine
:param list(dict)|None children: a list of children for the child to use instead of its default children (if it uses children). These should be in the same format as in an app's app configuration file and have the same keys.
:param bool subscribe_to_logs: if `True`, subscribe to logs from the child and handle them with the local log handlers
:param bool allow_local_files: if `True`, allow the input manifest to contain references to local files - this should only be set to `True` if the child will have access to these local files
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive as an argument (note that this could be an array or object)
:param bool record_messages: if `True`, record messages received from the child in the `received_messages` property
:param str save_diagnostics: must be one of {"SAVE_DIAGNOSTICS_OFF", "SAVE_DIAGNOSTICS_ON_CRASH", "SAVE_DIAGNOSTICS_ON"}; if turned on, allow the input values and manifest (and its datasets) to be saved by the child either all the time or just if it fails while processing them
:param str|None question_uuid: the UUID to use for the question if a specific one is needed; a UUID is generated if not
:param str|None push_endpoint: if answers to the question should be pushed to an endpoint, provide its URL here (the returned subscription will be a push subscription); if not, leave this as `None`
:param str|None bigquery_table_id: if answers to the questions should be written to BigQuery, provide the ID of the table here (e.g. "your-project.your-dataset.your-table") (the returned subscription will be a BigQuery subscription); if not, leave this as `None`
:param float timeout: time in seconds to wait for an answer before raising a timeout error
:param float|int maximum_heartbeat_interval: the maximum amount of time (in seconds) allowed between child heartbeats before an error is raised
:raise TimeoutError: if the timeout is exceeded while waiting for an answer
:return dict: a dictionary containing the keys "output_values" and "output_manifest"
:return dict|None: for a synchronous question, a dictionary containing the keys "output_values" and "output_manifest"; for an asynchronous question, `None`
"""
subscription, _ = self._service.ask(
service_id=self.id,
Expand All @@ -93,9 +99,14 @@ def ask(
allow_local_files=allow_local_files,
save_diagnostics=save_diagnostics,
question_uuid=question_uuid,
push_endpoint=push_endpoint,
bigquery_table_id=bigquery_table_id,
timeout=timeout,
)

if push_endpoint or bigquery_table_id:
return None

return self._service.wait_for_answer(
subscription=subscription,
handle_monitor_message=handle_monitor_message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ def test_cloud_run_deployment(self):
# Check that the output dataset and its files can be accessed.
with answer["output_manifest"].datasets["example_dataset"].files.one() as (datafile, f):
self.assertEqual(f.read(), "This is some example service output.")

def test_cloud_run_deployment_asynchronously(self):
"""Test asking an asynchronous (BigQuery) question."""
answer = self.child.ask(
input_values={"n_iterations": 3},
bigquery_table_id="octue-sdk-python.octue_sdk_python_test_dataset.question-events",
)

self.assertIsNone(answer)

0 comments on commit fe2dafb

Please sign in to comment.