Skip to content

Commit

Permalink
Upstream expression and logging changes (#1309)
Browse files Browse the repository at this point in the history
* Upstream expression and logging changes

* Lints
  • Loading branch information
tonyhb committed Apr 29, 2024
1 parent d1de08e commit 2283874
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 32 deletions.
33 changes: 11 additions & 22 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,7 @@ func (e *executor) Schedule(ctx context.Context, req execution.ScheduleRequest)
}

// Evaluate the run priority based off of the input event data.
factor, err := req.Function.RunPriorityFactor(ctx, mapped[0])
if err != nil && e.log != nil {
e.log.Warn().Err(err).Msg("run priority errored")
}
factor, _ := req.Function.RunPriorityFactor(ctx, mapped[0])
if factor != 0 {
id.PriorityFactor = &factor
}
Expand Down Expand Up @@ -1130,7 +1127,7 @@ func (e *executor) HandlePauses(ctx context.Context, iter state.PauseIterator, e
// Use the aggregator for all funciton finished events, if there are more than
// 50 waiting. It only takes a few milliseconds to iterate and handle less
// than 50; anything more runs the risk of running slow.
if evt.GetEvent().IsFinishedEvent() && iter.Count() > 50 {
if iter.Count() > 10 {
aggRes, err := e.handleAggregatePauses(ctx, evt)
if err != nil {
log.From(ctx).Error().Err(err).Msg("error handling aggregate pauses")
Expand Down Expand Up @@ -1331,7 +1328,7 @@ func (e *executor) handleAggregatePauses(ctx context.Context, evt event.TrackedE
return execution.HandlePauseResult{}, fmt.Errorf("no expression evaluator found")
}

base := logger.From(ctx).With().Str("event_id", evt.GetInternalID().String()).Logger()
log := logger.StdlibLogger(ctx).With("event_id", evt.GetInternalID().String())
evtID := evt.GetInternalID()
evtIDStr := evtID.String()

Expand All @@ -1345,11 +1342,6 @@ func (e *executor) handleAggregatePauses(ctx context.Context, evt event.TrackedE
wg sync.WaitGroup
)

base.Debug().
Int("pause_len", len(evals)).
Int32("matched_len", count).
Msg("matched pauses via aggregator")

for _, i := range evals {
found, ok := i.(*state.Pause)
if !ok || found == nil {
Expand All @@ -1364,19 +1356,19 @@ func (e *executor) handleAggregatePauses(ctx context.Context, evt event.TrackedE

defer wg.Done()

l := base.With().
Str("pause_id", pause.ID.String()).
Str("run_id", pause.Identifier.RunID.String()).
Str("workflow_id", pause.Identifier.WorkflowID.String()).
Str("expires", pause.Expires.String()).
Logger()
l := log.With(
"pause_id", pause.ID.String(),
"run_id", pause.Identifier.RunID.String(),
"workflow_id", pause.Identifier.WorkflowID.String(),
"expires", pause.Expires.String(),
)

// NOTE: Some pauses may be nil or expired, as the iterator may take
// time to process. We handle that here and assume that the event
// did not occur in time.
if pause.Expires.Time().Before(time.Now()) {
// Consume this pause to remove it entirely
l.Debug().Msg("deleting expired pause")
l.Debug("deleting expired pause")
_ = e.sm.DeletePause(context.Background(), pause)
_ = e.exprAggregator.RemovePause(ctx, pause)
return
Expand Down Expand Up @@ -1456,7 +1448,7 @@ func (e *executor) handleAggregatePauses(ctx context.Context, evt event.TrackedE
// Add to the counter.
atomic.AddInt32(&res[1], 1)
if err := e.exprAggregator.RemovePause(ctx, pause); err != nil {
l.Error().Err(err).Msg("error removing pause from aggregator")
l.Error("error removing pause from aggregator")
}
}()
}
Expand Down Expand Up @@ -2020,7 +2012,6 @@ func (e *executor) handleGeneratorSleep(ctx context.Context, gen state.Generator

func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.GeneratorOpcode, item queue.Item, edge queue.PayloadEdge) error {
span := trace.SpanFromContext(ctx)
logger.From(ctx).Info().Msg("handling invoke function")
if e.handleSendingEvent == nil {
return fmt.Errorf("no handleSendingEvent function specified")
}
Expand All @@ -2042,8 +2033,6 @@ func (e *executor) handleGeneratorInvokeFunction(ctx context.Context, gen state.
return execError{err: fmt.Errorf("failed to create expression to wait for invoked function completion: %w", err)}
}

logger.From(ctx).Info().Interface("opts", opts).Time("expires", expires).Str("event", eventName).Str("expr", strExpr).Msg("parsed invoke function opts")

pauseID := uuid.NewSHA1(
uuid.NameSpaceOID,
[]byte(item.Identifier.RunID.String()+gen.ID),
Expand Down
3 changes: 0 additions & 3 deletions pkg/execution/history/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/inngest/inngest/pkg/execution/state/redis_state"
"github.com/inngest/inngest/pkg/inngest"
"github.com/inngest/inngest/pkg/inngest/log"
"github.com/inngest/inngest/pkg/logger"
"github.com/inngest/inngest/pkg/telemetry"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -575,8 +574,6 @@ func (l lifecycle) OnInvokeFunction(
eventID ulid.ULID,
corrID string,
) {
logger.From(ctx).Debug().Interface("id", id).Msg("OnInvokeFunction")

groupID, err := toUUID(item.GroupID)
if err != nil {
l.log.Error(
Expand Down
5 changes: 0 additions & 5 deletions pkg/execution/state/redis_state/queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,10 +1069,6 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s *

qi.Data.Attempt += 1
qi.AtMS = at.UnixMilli()
q.logger.Warn().Err(err).
Str("queue", qi.Queue()).
Int64("at_ms", at.UnixMilli()).
Msg("requeuing job")
if err := q.Requeue(context.WithoutCancel(ctx), p, qi, at); err != nil {
q.logger.Error().Err(err).Interface("item", qi).Msg("error requeuing job")
return err
Expand All @@ -1086,7 +1082,6 @@ func (q *queue) process(ctx context.Context, p QueuePartition, qi QueueItem, s *

// Dequeue this entirely, as this permanently failed.
// XXX: Increase permanently failed counter here.
q.logger.Info().Interface("item", qi).Msg("dequeueing failed job")
if err := q.Dequeue(context.WithoutCancel(ctx), p, qi); err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/execution/state/redis_state/redis_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,12 @@ func (m mgr) EvaluablesByID(ctx context.Context, ids ...uuid.UUID) ([]expr.Evalu

func (m mgr) LoadEvaluablesSince(ctx context.Context, workspaceID uuid.UUID, eventName string, since time.Time, do func(context.Context, expr.Evaluable) error) error {

// Keep a list of pauses that should be deleted because they've expired.
//
// Note that we don't do this in the iteration loop, as redis can use either HSCAN or
// MGET; deleting during iteration may lead to skipped items.
expired := []*state.Pause{}

it, err := m.PausesByEventSince(ctx, workspaceID, eventName, since)
if err != nil {
return err
Expand All @@ -1000,14 +1006,26 @@ func (m mgr) LoadEvaluablesSince(ctx context.Context, workspaceID uuid.UUID, eve
if pause == nil {
continue
}

if pause.Expires.Time().Before(time.Now()) {
expired = append(expired, pause)
continue
}

if err := do(ctx, pause); err != nil {
return err
}
}

// GC pauses on fetch.
for _, pause := range expired {
_ = m.DeletePause(ctx, *pause)
}

if it.Error() != context.Canceled && it.Error() != scanDoneErr {
return it.Error()
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/expressions/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ func (a *aggregator) EvaluateAsyncEvent(ctx context.Context, event event.Tracked
"evaluated aggregate expressions",
"workspace_id", event.GetWorkspaceID(),
"event", name,
"event_data", event.GetEvent(),
"eval_count", evalCount,
"matched_count", len(found),
"total_count", eval.Len(),
"found", found,
"found_count", len(found),
)

return found, evalCount, err
Expand Down

0 comments on commit 2283874

Please sign in to comment.