Skip to content

Commit

Permalink
Separate execution layer (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
goodoldneon committed Jun 14, 2024
1 parent c0dd996 commit e4b0d00
Show file tree
Hide file tree
Showing 19 changed files with 825 additions and 428 deletions.
3 changes: 2 additions & 1 deletion inngest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from ._internal.client_lib import Inngest, SendEventsResult
from ._internal.errors import NonRetriableError, RetryAfterError, StepError
from ._internal.event_lib import Event
from ._internal.function import Context, Function
from ._internal.execution import Context
from ._internal.function import Function
from ._internal.function_config import (
Batch,
Cancel,
Expand Down
8 changes: 4 additions & 4 deletions inngest/_internal/client_lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
env_lib,
errors,
event_lib,
execution,
function,
function_config,
middleware_lib,
Expand Down Expand Up @@ -205,7 +206,7 @@ def create_function(
] = None,
name: typing.Optional[str] = None,
on_failure: typing.Union[
function.FunctionHandlerAsync, function.FunctionHandlerSync, None
execution.FunctionHandlerAsync, execution.FunctionHandlerSync, None
] = None,
priority: typing.Optional[function_config.Priority] = None,
rate_limit: typing.Optional[function_config.RateLimit] = None,
Expand All @@ -223,7 +224,7 @@ def create_function(
) -> typing.Callable[
[
typing.Union[
function.FunctionHandlerAsync, function.FunctionHandlerSync
execution.FunctionHandlerAsync, execution.FunctionHandlerSync
]
],
function.Function,
Expand Down Expand Up @@ -252,11 +253,10 @@ def create_function(

def decorator(
func: typing.Union[
function.FunctionHandlerAsync, function.FunctionHandlerSync
execution.FunctionHandlerAsync, execution.FunctionHandlerSync
],
) -> function.Function:
triggers = trigger if isinstance(trigger, list) else [trigger]

return function.Function(
function.FunctionOpts(
batch_events=batch_events,
Expand Down
5 changes: 3 additions & 2 deletions inngest/_internal/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ async def call_function(
return await self._respond(Exception("events not in request"))

call_res = await fn.call(
call.ctx,
self._client,
function.Context(
execution.Context(
attempt=call.ctx.attempt,
event=call.event,
events=events,
Expand Down Expand Up @@ -423,7 +424,7 @@ def call_function_sync(

call_res = fn.call_sync(
self._client,
function.Context(
execution.Context(
attempt=call.ctx.attempt,
event=call.event,
events=events,
Expand Down
25 changes: 25 additions & 0 deletions inngest/_internal/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .consts import UNSPECIFIED_STEP_ID
from .models import (
Call,
CallContext,
CallResult,
Context,
FunctionHandlerAsync,
FunctionHandlerSync,
Output,
ReportedStep,
UserError,
)

__all__ = [
"Call",
"CallContext",
"CallResult",
"Context",
"FunctionHandlerAsync",
"FunctionHandlerSync",
"Output",
"ReportedStep",
"UNSPECIFIED_STEP_ID",
"UserError",
]
2 changes: 2 additions & 0 deletions inngest/_internal/execution/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# If the Executor sends this step ID then it isn't targeting a specific step.
UNSPECIFIED_STEP_ID = "step"
182 changes: 182 additions & 0 deletions inngest/_internal/execution/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
from __future__ import annotations

import asyncio
import dataclasses
import typing

import pydantic

from inngest._internal import errors, event_lib, transforms, types

if typing.TYPE_CHECKING:
from inngest._internal import step_lib


class Call(types.BaseModel):
ctx: CallContext
event: event_lib.Event
events: typing.Optional[list[event_lib.Event]] = None
steps: dict[str, object]
use_api: bool


class CallContext(types.BaseModel):
attempt: int
run_id: str
stack: CallStack


class CallStack(types.BaseModel):
stack: list[str]


@dataclasses.dataclass
class CallResult:
error: typing.Optional[Exception] = None

# Multiple results from a single call (only used for steps). This will only
# be longer than 1 for parallel steps. Otherwise, it will be 1 long for
# sequential steps
multi: typing.Optional[list[CallResult]] = None

# Need a sentinel value to differentiate between None and unset
output: object = types.empty_sentinel

# Step metadata (e.g. user-specified ID)
step: typing.Optional[step_lib.StepInfo] = None

@property
def is_empty(self) -> bool:
return all(
[
self.error is None,
self.multi is None,
self.output is types.empty_sentinel,
self.step is None,
]
)

@classmethod
def from_responses(
cls,
responses: list[step_lib.StepResponse],
) -> CallResult:
multi = []

for response in responses:
error = None
if isinstance(response.original_error, Exception):
error = response.original_error

multi.append(
cls(
error=error,
output=response.output,
step=response.step,
)
)

return cls(multi=multi)


@dataclasses.dataclass
class Context:
attempt: int
event: event_lib.Event
events: list[event_lib.Event]
logger: types.Logger
run_id: str


@typing.runtime_checkable
class FunctionHandlerAsync(typing.Protocol):
def __call__(
self,
ctx: Context,
step: step_lib.Step,
) -> typing.Awaitable[types.JSON]:
...


@typing.runtime_checkable
class FunctionHandlerSync(typing.Protocol):
def __call__(
self,
ctx: Context,
step: step_lib.StepSync,
) -> types.JSON:
...


class Output(types.BaseModel):
# Fail validation if any extra fields exist, because this will prevent
# accidentally assuming user data is nested data
model_config = pydantic.ConfigDict(extra="forbid")

data: object = None
error: typing.Optional[MemoizedError] = None


class MemoizedError(types.BaseModel):
message: str
name: str
stack: typing.Optional[str] = None

@classmethod
def from_error(cls, err: Exception) -> MemoizedError:
return cls(
message=str(err),
name=type(err).__name__,
stack=transforms.get_traceback(err),
)


class ReportedStep:
def __init__(
self,
step_signal: asyncio.Future[ReportedStep],
step_info: step_lib.StepInfo,
) -> None:
self.error: typing.Optional[errors.StepError] = None
self.info = step_info
self.output: object = types.empty_sentinel
self.skip = False
self._release_signal = step_signal
self._done_signal = asyncio.Future[None]()

async def __aenter__(self) -> ReportedStep:
return self

async def __aexit__(self, *args: object) -> None:
self._done_signal.set_result(None)

async def release(self) -> None:
if self._release_signal.done():
return

self._release_signal.set_result(self)
await self._release_signal

async def release_and_skip(self) -> None:
self.skip = True
await self.release()

def on_done(
self,
callback: typing.Callable[[], typing.Coroutine[None, None, None]],
) -> None:
self._done_signal.add_done_callback(
lambda _: asyncio.create_task(callback())
)

async def wait(self) -> None:
await self._done_signal


class UserError(Exception):
"""
Wrap an error that occurred in user code.
"""

def __init__(self, err: Exception) -> None:
self.err = err
Loading

0 comments on commit e4b0d00

Please sign in to comment.