Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use updated twined #650

Merged
merged 18 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
- id: isort

- repo: https://github.com/psf/black
rev: 22.6.0
rev: 24.4.2
hooks:
- id: black
args: ["--line-length", "120"]
Expand Down
150 changes: 77 additions & 73 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from octue.cloud.events.counter import EventCounter
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
Expand Down Expand Up @@ -32,6 +33,7 @@ def answer_question(question, project_name):

service = Service(service_id=service_sruid, backend=GCPPubSubBackend(project_name=project_name))
question_uuid = get_nested_attribute(question, "attributes.question_uuid")
order = EventCounter()

try:
runner = Runner.from_configuration(
Expand All @@ -42,9 +44,9 @@ def answer_question(question, project_name):
)

service.run_function = runner.run
service.answer(question)
service.answer(question, order)
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

except BaseException as error: # noqa
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN")
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN", order=order)
logger.exception(error)
4 changes: 2 additions & 2 deletions octue/cloud/deployment/google/cloud_run/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ def index():
return _log_bad_request_and_return_400_response("No Pub/Sub message received.")

if not isinstance(envelope, dict) or "message" not in envelope:
return _log_bad_request_and_return_400_response("Invalid Pub/Sub message format.")
return _log_bad_request_and_return_400_response(f"Invalid Pub/Sub message format - received {envelope!r}.")

question = envelope["message"]

if "data" not in question or "attributes" not in question or "question_uuid" not in question["attributes"]:
return _log_bad_request_and_return_400_response("Invalid Pub/Sub message format.")
return _log_bad_request_and_return_400_response(f"Invalid Pub/Sub message format - received {envelope!r}.")

question_uuid = question["attributes"]["question_uuid"]

Expand Down
9 changes: 0 additions & 9 deletions octue/cloud/pub_sub/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter

from octue.cloud.events.validation import VALID_EVENT_KINDS
Expand Down Expand Up @@ -76,14 +74,7 @@ def get_events(table_id, sender, question_uuid, kind=None, include_backend_metad
)

df = result.to_dataframe()

# Convert JSON strings to python primitives.
df["event"] = df["event"].map(json.loads)
df["event"].apply(_deserialise_manifest_if_present)
df["other_attributes"] = df["other_attributes"].map(json.loads)

if "backend_metadata" in df:
df["backend_metadata"] = df["backend_metadata"].map(json.loads)

events = df.to_dict(orient="records")
return _unflatten_events(events)
Expand Down
6 changes: 4 additions & 2 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,19 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow

return future, subscriber

def answer(self, question, heartbeat_interval=120, timeout=30):
def answer(self, question, order=None, heartbeat_interval=120, timeout=30):
"""Answer a question from a parent - i.e. run the child's app on the given data and return the output values.
Answers conform to the output values and output manifest schemas specified in the child's Twine file.

:param dict|google.cloud.pubsub_v1.subscriber.message.Message question:
:param octue.cloud.events.counter.EventCounter|None order: an event counter keeping track of the order of emitted events
:param int|float heartbeat_interval: the time interval, in seconds, at which to send heartbeats
:param float|None timeout: time in seconds to keep retrying sending of the answer once it has been calculated
:raise Exception: if any exception arises during running analysis and sending its results
:return None:
"""
order = order or EventCounter()

try:
(
question,
Expand All @@ -226,7 +229,6 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
return

heartbeater = None
order = EventCounter()

try:
self._send_delivery_acknowledgment(question_uuid, originator, order)
Expand Down
2 changes: 2 additions & 0 deletions octue/metadata/recorded_questions.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@
{"parent_sdk_version": "0.51.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"3bf3b178-3cf1-49aa-a1d9-89cd8ec05b1a\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmppu85nw5c\"}}}", "attributes": {"question_uuid": "6f7f6e1c-7632-4b4d-b91d-6f58dcb43c40", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.51.0", "message_number": "0"}}}
{"parent_sdk_version": "0.52.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.0", "message_number": "0"}}}
{"parent_sdk_version": "0.52.1", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.1", "message_number": "0"}}}
{"parent_sdk_version": "0.52.2", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "sender_type": "PARENT", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "version": "0.52.2", "message_number": "0"}}}
{"parent_sdk_version": "0.53.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.53.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}
{"parent_sdk_version": "0.54.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.54.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}
{"parent_sdk_version": "0.55.0", "question": {"data": "{\"kind\": \"question\", \"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"6be875b3-33d8-4b00-b7ea-553f8f69bce5\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmphypwu9uh\"}}}", "attributes": {"datetime": "2024-04-11T10:46:48.236064", "uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f", "question_uuid": "cd0a78be-fda2-4730-bdba-ba6b04e4787f", "forward_logs": "1", "save_diagnostics": "SAVE_DIAGNOSTICS_ON_CRASH", "originator": "octue/test-service:0.1.0", "sender": "octue/test-service:0.1.0", "sender_type": "PARENT", "sender_sdk_version": "0.55.0", "recipient": "octue/another-service:1.0.12", "order": "0"}}}