Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 55 additions & 79 deletions servc/svc/com/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from servc.svc.com.bus import BusComponent, OnConsuming
from servc.svc.com.cache import CacheComponent
from servc.svc.com.worker.hooks import evaluate_post_hooks, evaluate_pre_hooks
from servc.svc.com.worker.methods import evaluate_exit, get_artifact
from servc.svc.com.worker.types import RESOLVER, RESOLVER_CONTEXT, RESOLVER_MAPPING
from servc.svc.config import Config
from servc.svc.io.input import InputType
Expand Down Expand Up @@ -105,7 +106,7 @@ def connect(self):

def run_resolver(
self, method: RESOLVER, context: RESOLVER_CONTEXT, args: Tuple[str, Any]
) -> Tuple[StatusCode, ResponseArtifact | None]:
) -> Tuple[StatusCode, ResponseArtifact | None, Any | None]:
id, payload = args
statuscode: StatusCode = StatusCode.OK
response: ResponseArtifact | None = None
Expand All @@ -132,20 +133,10 @@ def run_resolver(
statuscode = StatusCode.SERVER_ERROR
response = getErrorArtifact(id, str(e), StatusCode.SERVER_ERROR)

if self._config.get(f"conf.{self.name}.exiton5xx") and statuscode.value >= 500:
print("Exiting due to 5xx error", error, flush=True)
exit(1)
if (
self._config.get(f"conf.{self.name}.exiton4xx")
and statuscode.value >= 400
and statuscode.value < 500
):
print("Exiting due to 4xx error", error, flush=True)
exit(1)

return statuscode, response
return statuscode, response, error

def inputProcessor(self, message: Any) -> StatusCode:
workerConfig = self._config.get(f"conf.{self.name}")
bus = self._busClass(
self._config.get(f"conf.{self._bus.name}"),
)
Expand All @@ -157,6 +148,10 @@ def inputProcessor(self, message: Any) -> StatusCode:
"config": self._config,
}

status_code: StatusCode = StatusCode.OK
response: ResponseArtifact | None = None
error: Any | None = None

if "type" not in message or "route" not in message:
return StatusCode.INVALID_INPUTS

Expand All @@ -166,85 +161,66 @@ def inputProcessor(self, message: Any) -> StatusCode:
or "details" not in message
or "instanceId" not in message
):
return StatusCode.INVALID_INPUTS
status_code = StatusCode.INVALID_INPUTS
response = getErrorArtifact(
message["id"] if "id" in message else "",
"Invalid input type for event. event, details or instanceId not specified",
StatusCode.INVALID_INPUTS,
)
if message["event"] not in self._eventResolvers:
return StatusCode.METHOD_NOT_FOUND

status_code, response = self.run_resolver(
status_code, response, error = self.run_resolver(
self._eventResolvers[message["event"]],
context,
("", {**message}),
)

return status_code

if message["type"] in [InputType.INPUT.value, InputType.INPUT]:
if "id" not in message:
return StatusCode.INVALID_INPUTS
if "argumentId" not in message:
cache.setKey(
message["id"],
getErrorArtifact(
message["id"],
"Invalid input type. Id and argumentId not specified",
StatusCode.INVALID_INPUTS,
),
elif message["type"] in [InputType.INPUT.value, InputType.INPUT]:
if "id" not in message or "argumentId" not in message:
status_code = StatusCode.INVALID_INPUTS
response = getErrorArtifact(
message["id"] if "id" in message else "",
"Invalid input type. Id and argumentId not specified",
StatusCode.INVALID_INPUTS,
)
return StatusCode.INVALID_INPUTS
status_code = StatusCode.INVALID_INPUTS
if "instanceId" in message and message["instanceId"] != bus.instanceId:
return StatusCode.NO_PROCESSING

if message["argumentId"] in ["raw", "plain"] and message["inputs"]:
artifact = message["argument"]
# get the artifact from the message
artifact = get_artifact(message, cache)
if isinstance(artifact, tuple):
status_code, response = artifact
else:
artifact = cache.getKey(message["argumentId"])
if artifact is None or "method" not in artifact or "inputs" not in artifact:
cache.setKey(
message["id"],
getErrorArtifact(
message["id"],
"Invalid argument. Need to specify method and inputs in payload",
StatusCode.USER_ERROR,
),
)
return StatusCode.USER_ERROR
if artifact["method"] not in self._resolvers:
cache.setKey(
message["id"],
getErrorArtifact(
if artifact["method"] not in self._resolvers:
status_code = StatusCode.METHOD_NOT_FOUND
response = getErrorArtifact(
message["id"], "Method not found", StatusCode.METHOD_NOT_FOUND
),
)
if self._config.get(f"conf.{self.name}.exiton4xx"):
print("Exiting due to 4xx error:", "Method not found", flush=True)
exit(1)
return StatusCode.METHOD_NOT_FOUND

continueExecution = evaluate_pre_hooks(
self._resolvers,
message,
artifact,
context,
)
if not continueExecution:
return StatusCode.OK

statusCode, response = self.run_resolver(
self._resolvers[artifact["method"]],
context,
(message["id"], artifact["inputs"]),
)
if statusCode == StatusCode.NO_PROCESSING:
return StatusCode.NO_PROCESSING
)
else:
continueExecution = evaluate_pre_hooks(
self._resolvers,
message,
artifact,
context,
)
if not continueExecution:
return StatusCode.OK

status_code, response, error = self.run_resolver(
self._resolvers[artifact["method"]],
context,
(message["id"], artifact["inputs"]),
)
if status_code == StatusCode.NO_PROCESSING:
return StatusCode.NO_PROCESSING

evaluate_exit(
message, response, cache, status_code, workerConfig, error
)
evaluate_post_hooks(bus, cache, message, artifact)

evaluate_exit(message, response, cache, status_code, workerConfig, error)

cache.setKey(message["id"], response)
evaluate_post_hooks(bus, cache, message, artifact)
return statusCode

cache.setKey(
message["id"],
getErrorArtifact(
message["id"], "Invalid input type", StatusCode.INVALID_INPUTS
),
)
return StatusCode.INVALID_INPUTS
54 changes: 54 additions & 0 deletions servc/svc/com/worker/methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Any, Tuple

from servc.svc.com.cache import CacheComponent
from servc.svc.config import Config
from servc.svc.io.input import ArgumentArtifact, InputPayload
from servc.svc.io.output import ResponseArtifact, StatusCode
from servc.svc.io.response import getErrorArtifact


def evaluate_exit(
message: InputPayload,
response: ResponseArtifact | None,
cache: CacheComponent,
statusCode: StatusCode,
config: Config,
error: Any | None,
):
if config.get("exiton5xx") and statusCode.value >= 500:
print("Exiting due to 5xx error: ", error, flush=True)
exit(1)
if config.get("exiton4xx") and statusCode.value >= 400 and statusCode.value < 500:
print("Exiting due to 4xx error: ", error, flush=True)
exit(1)

# allow specific exit to an error code
error_str: str = str(statusCode.value)
if config.get(f"exiton{error_str}"):
print(f"Exiting due to {error_str} error: ", error, flush=True)
exit(1)

if response is not None and "id" in message and message["id"]:
cache.setKey(message["id"], response)


def get_artifact(
message: InputPayload, cache: CacheComponent
) -> ArgumentArtifact | Tuple[StatusCode, ResponseArtifact]:
artifact = (
cache.getKey(message["argumentId"])
if message["argumentId"] not in ["raw", "plain"]
else message["argument"]
)

if artifact is None or "method" not in artifact or "inputs" not in artifact:
return (
StatusCode.USER_ERROR,
getErrorArtifact(
message["id"],
"Invalid argument. Need to specify method and inputs in payload",
StatusCode.USER_ERROR,
),
)

return artifact
12 changes: 11 additions & 1 deletion servc/svc/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@

BOOLEAN_CONFIGS = os.getenv(
"SERVC_BOOLEAN_CONFIGS",
"conf.worker.exiton4xx,conf.worker.exiton5xx,conf.worker.bindtoeventexchange",
",".join(
[
"conf.worker.exiton400",
"conf.worker.exiton404",
"conf.worker.exiton401",
"conf.worker.exiton422",
"conf.worker.exiton4xx",
"conf.worker.exiton5xx",
"conf.worker.bindtoeventexchange",
]
),
).split(",")
DOT_MARKER = os.getenv("SERVC_DOT_MARKER", "_DOT_")
DASH_MARKER = os.getenv("SERVC_DASH_MARKER", "_DASH_")
Expand Down