Skip to content

Commit

Permalink
devserver: fix custom event IDs are not idempotent (#1202)
Browse files Browse the repository at this point in the history
  • Loading branch information
goodoldneon committed Feb 29, 2024
1 parent 7cec706 commit dfc3e81
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/execution/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,16 @@ func Initialize(ctx context.Context, fn inngest.Function, tracked event.TrackedE
fn.ID = inngest.DeterministicUUID(fn)
}

// Use the custom event ID (a.k.a. event idempotency key) if it exists, else
// use the internal event ID
idempotencyKey := tracked.GetEvent().ID

// If this is a debounced function, run this through a debouncer.

return e.Schedule(ctx, execution.ScheduleRequest{
Function: fn,
Events: []event.TrackedEvent{tracked},
Function: fn,
Events: []event.TrackedEvent{tracked},
IdempotencyKey: &idempotencyKey,
})
}

Expand Down
64 changes: 64 additions & 0 deletions tests/golang/event_idempotency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package golang

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/inngest/inngestgo"
"github.com/stretchr/testify/require"
)

func TestEventIdempotency(t *testing.T) {
ctx := context.Background()
r := require.New(t)
h, server, registerFuncs := NewSDKHandler(t, "test")
defer server.Close()

var counter int32
h.Register(inngestgo.CreateFunction(
inngestgo.FunctionOpts{Name: "test"},
inngestgo.EventTrigger("test", nil),
func(ctx context.Context, input inngestgo.Input[any]) (any, error) {
atomic.AddInt32(&counter, 1)
return nil, nil
},
))
registerFuncs()

sendEvent := func(id string) {
_, err := inngestgo.Send(ctx, inngestgo.GenericEvent[any, any]{
ID: &id,
Name: "test",
})
r.NoError(err)
}

t.Run("same ID", func(t *testing.T) {
// Only 1 run if multiple events have the same ID

sendEvent("abc")
sendEvent("abc")

r.Eventually(func() bool {
return atomic.LoadInt32(&counter) == 1
}, 2*time.Second, time.Second)

// Wait a little longer to make sure no more runs happen
<-time.After(100 * time.Millisecond)

r.Equal(int32(1), atomic.LoadInt32(&counter))
})

t.Run("different IDs", func(t *testing.T) {
// Multiple runs if each event has a different ID

sendEvent("abc")
sendEvent("def")

r.Eventually(func() bool {
return atomic.LoadInt32(&counter) == 2
}, 2*time.Second, time.Second)
})
}

0 comments on commit dfc3e81

Please sign in to comment.