Skip to content

Commit

Permalink
Add missing with block
Browse files Browse the repository at this point in the history
  • Loading branch information
goodoldneon committed Jun 22, 2024
1 parent 5302a27 commit 7bdc09d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 54 deletions.
4 changes: 0 additions & 4 deletions inngest/_internal/step_lib/step_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ async def run(
elif not isinstance(step.output, types.EmptySentinel):
return step.output # type: ignore

err = await self._middleware.before_execution()
if isinstance(err, Exception):
raise err

try:
output = await transforms.maybe_await(handler(*handler_args))

Expand Down
81 changes: 31 additions & 50 deletions inngest/_internal/step_lib/step_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,66 +214,47 @@ def run(

parsed_step_id = self._parse_step_id(step_id)

memo = self._get_memo_sync(parsed_step_id.hashed)
if not isinstance(memo, types.EmptySentinel):
return memo.data # type: ignore

self._handle_skip(parsed_step_id)

is_targeting_enabled = self._target_hashed_id is not None
if self._inside_parallel and not is_targeting_enabled:
# Plan this step because we're in parallel mode.
raise base.ResponseInterrupt(
base.StepResponse(
step=base.StepInfo(
display_name=parsed_step_id.user_facing,
id=parsed_step_id.hashed,
name=parsed_step_id.user_facing,
op=server_lib.Opcode.PLANNED,
)
)
)

err = self._middleware.before_execution_sync()
if isinstance(err, Exception):
raise err

step_info = base.StepInfo(
display_name=parsed_step_id.user_facing,
id=parsed_step_id.hashed,
name=parsed_step_id.user_facing,
op=server_lib.Opcode.STEP_RUN,
)

if (
self._request.ctx.disable_immediate_execution is True
and not is_targeting_enabled
):
step_info.op = server_lib.Opcode.PLANNED
raise base.ResponseInterrupt(base.StepResponse(step=step_info))
with self._execution.report_step(
step_info,
self._inside_parallel,
) as step:
if step.skip:
raise base.SkipInterrupt(parsed_step_id.user_facing)
if step.error is not None:
raise step.error
elif not isinstance(step.output, types.EmptySentinel):
return step.output # type: ignore

try:
output = handler(*handler_args)
try:
output = handler(*handler_args)

raise base.ResponseInterrupt(
base.StepResponse(output=output, step=step_info)
)
except (errors.NonRetriableError, errors.RetryAfterError) as err:
# Bubble up these error types to the function level
raise err
except Exception as err:
transforms.remove_first_traceback_frame(err)

raise base.ResponseInterrupt(
base.StepResponse(
original_error=err,
step=base.StepInfo(
display_name=parsed_step_id.user_facing,
id=parsed_step_id.hashed,
op=server_lib.Opcode.STEP_ERROR,
),
raise base.ResponseInterrupt(
base.StepResponse(
output=output,
step=step_info,
)
)
except (errors.NonRetriableError, errors.RetryAfterError) as err:
# Bubble up these error types to the function level
raise err
except Exception as err:
transforms.remove_first_traceback_frame(err)

step_info.op = server_lib.Opcode.STEP_ERROR

raise base.ResponseInterrupt(
base.StepResponse(
original_error=err,
step=step_info,
)
)
)

def send_event(
self,
Expand Down

0 comments on commit 7bdc09d

Please sign in to comment.