Skip to content

Commit

Permalink
Update updateRegistry store on mutable state reload
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jun 23, 2023
1 parent b8feb95 commit 5d5237f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
2 changes: 1 addition & 1 deletion service/history/workflow/context.go
Expand Up @@ -814,7 +814,7 @@ func (c *ContextImpl) UpdateRegistry(ctx context.Context) update.Registry {
if c.updateRegistry == nil {
nsIDStr := c.MutableState.GetNamespaceEntry().ID().String()
c.updateRegistry = update.NewRegistry(
c.MutableState,
func() update.UpdateStore { return c.MutableState },
update.WithLogger(c.logger),
update.WithMetrics(c.metricsHandler),
update.WithTracerProvider(trace.SpanFromContext(ctx).TracerProvider()),
Expand Down
13 changes: 8 additions & 5 deletions service/history/workflow/update/registry.go
Expand Up @@ -82,7 +82,7 @@ type (
RegistryImpl struct {
mu sync.RWMutex
updates map[string]*Update
store UpdateStore
getStoreFn func() UpdateStore
instrumentation instrumentation
maxInFlight func() int
maxTotal func() int
Expand Down Expand Up @@ -133,10 +133,13 @@ func WithTracerProvider(t trace.TracerProvider) regOpt {

var _ Registry = (*RegistryImpl)(nil)

func NewRegistry(store UpdateStore, opts ...regOpt) *RegistryImpl {
func NewRegistry(
getStoreFn func() UpdateStore,
opts ...regOpt,
) *RegistryImpl {
r := &RegistryImpl{
updates: make(map[string]*Update),
store: store,
getStoreFn: getStoreFn,
instrumentation: noopInstrumentation,
maxInFlight: func() int { return math.MaxInt },
maxTotal: func() int { return math.MaxInt },
Expand All @@ -145,7 +148,7 @@ func NewRegistry(store UpdateStore, opts ...regOpt) *RegistryImpl {
opt(r)
}

store.VisitUpdates(func(updID string, updInfo *updatespb.UpdateInfo) {
getStoreFn().VisitUpdates(func(updID string, updInfo *updatespb.UpdateInfo) {
// need to eager load here so that Len and admit are correct.
if acc := updInfo.GetAcceptance(); acc != nil {
r.updates[updID] = newAccepted(
Expand Down Expand Up @@ -263,7 +266,7 @@ func (r *RegistryImpl) findLocked(ctx context.Context, id string) (*Update, bool

// update not found in ephemeral state, but could have already completed so
// check in registry storage
updOutcome, err := r.store.GetUpdateOutcome(ctx, id)
updOutcome, err := r.getStoreFn().GetUpdateOutcome(ctx, id)

// Swallow NotFound error because it means that update doesn't exist.
var notFound *serviceerror.NotFound
Expand Down
30 changes: 18 additions & 12 deletions service/history/workflow/update/registry_test.go
Expand Up @@ -80,7 +80,7 @@ func TestFind(t *testing.T) {
return nil, serviceerror.NewNotFound("not found")
},
}
reg = update.NewRegistry(store)
reg = update.NewRegistry(func() update.UpdateStore { return store })
)
_, ok := reg.Find(ctx, updateID)
require.False(t, ok)
Expand All @@ -105,7 +105,7 @@ func TestHasOutgoing(t *testing.T) {
return nil, serviceerror.NewNotFound("not found")
},
}
reg = update.NewRegistry(store)
reg = update.NewRegistry(func() update.UpdateStore { return store })
)

upd, _, err := reg.FindOrCreate(ctx, updateID)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestFindOrCreate(t *testing.T) {
return nil, serviceerror.NewNotFound("not found")
},
}
reg = update.NewRegistry(store)
reg = update.NewRegistry(func() update.UpdateStore { return store })
)

t.Run("new update", func(t *testing.T) {
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestUpdateRemovalFromRegistry(t *testing.T) {
visitor(storedAcceptedUpdateID, storedAcceptedUpdateInfo)
},
}
reg = update.NewRegistry(regStore)
reg = update.NewRegistry(func() update.UpdateStore { return regStore })
)

upd, found, err := reg.FindOrCreate(ctx, storedAcceptedUpdateID)
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestMessageGathering(t *testing.T) {
t.Parallel()
var (
ctx = context.Background()
reg = update.NewRegistry(emptyUpdateStore)
reg = update.NewRegistry(func() update.UpdateStore { return emptyUpdateStore })
)
updateID1, updateID2 := t.Name()+"-update-id-1", t.Name()+"-update-id-2"
upd1, _, err := reg.FindOrCreate(ctx, updateID1)
Expand Down Expand Up @@ -285,9 +285,12 @@ func TestInFlightLimit(t *testing.T) {
var (
ctx = context.Background()
limit = 1
reg = update.NewRegistry(emptyUpdateStore, update.WithInFlightLimit(
func() int { return limit },
))
reg = update.NewRegistry(
func() update.UpdateStore { return emptyUpdateStore },
update.WithInFlightLimit(
func() int { return limit },
),
)
)
upd1, existed, err := reg.FindOrCreate(ctx, "update1")
require.NoError(t, err)
Expand Down Expand Up @@ -351,9 +354,12 @@ func TestTotalLimit(t *testing.T) {
var (
ctx = context.Background()
limit = 1
reg = update.NewRegistry(emptyUpdateStore, update.WithTotalLimit(
func() int { return limit },
))
reg = update.NewRegistry(
func() update.UpdateStore { return emptyUpdateStore },
update.WithTotalLimit(
func() int { return limit },
),
)
)
upd1, existed, err := reg.FindOrCreate(ctx, "update1")
require.NoError(t, err)
Expand Down Expand Up @@ -430,7 +436,7 @@ func TestStorageErrorWhenLookingUpCompletedOutcome(t *testing.T) {
return nil, expectError
},
}
reg := update.NewRegistry(regStore)
reg := update.NewRegistry(func() update.UpdateStore { return regStore })
upd, found := reg.Find(context.TODO(), completedUpdateID)
require.True(t, found)
_, err := upd.WaitOutcome(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowTaskHandler_test.go
Expand Up @@ -83,7 +83,7 @@ func TestCommandProtocolMessage(t *testing.T) {
out.conf = map[dynamicconfig.Key]any{}
out.ms = workflow.NewMockMutableState(gomock.NewController(t))
out.ms.EXPECT().VisitUpdates(gomock.Any()).Times(1)
out.updates = update.NewRegistry(out.ms)
out.updates = update.NewRegistry(func() update.UpdateStore { return out.ms })
var effects effect.Buffer
config := configs.NewConfig(
dynamicconfig.NewCollection(
Expand Down

0 comments on commit 5d5237f

Please sign in to comment.