Skip to content

Commit

Permalink
Introduce update.Storage (#4277)
Browse files Browse the repository at this point in the history
A slice of the MutableState API that the update.Registry can use for
writing events and loading in-flight update information into memory.
  • Loading branch information
Matt McShane committed May 4, 2023
1 parent ea4946c commit bb7d4af
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
6 changes: 6 additions & 0 deletions service/history/api/workflow_context.go
Expand Up @@ -29,6 +29,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.temporal.io/server/service/history/workflow/update"
)

type WorkflowContext interface {
Expand All @@ -38,6 +39,7 @@ type WorkflowContext interface {

GetNamespaceEntry() *namespace.Namespace
GetWorkflowKey() definition.WorkflowKey
GetUpdateRegistry() update.Registry
}

type WorkflowContextImpl struct {
Expand Down Expand Up @@ -96,3 +98,7 @@ func (w *WorkflowContextImpl) GetNamespaceEntry() *namespace.Namespace {
func (w *WorkflowContextImpl) GetWorkflowKey() definition.WorkflowKey {
return w.context.GetWorkflowKey()
}

func (w *WorkflowContextImpl) GetUpdateRegistry() update.Registry {
return w.context.UpdateRegistry()
}
4 changes: 3 additions & 1 deletion service/history/workflow/context.go
Expand Up @@ -176,7 +176,6 @@ func NewContext(
timeSource: shard.GetTimeSource(),
config: shard.GetConfig(),
mutex: locks.NewPriorityMutex(),
updateRegistry: update.NewRegistry(),
transaction: NewTransaction(shard),
stats: &persistencespb.ExecutionStats{
HistorySize: 0,
Expand Down Expand Up @@ -850,6 +849,9 @@ func (c *ContextImpl) ReapplyEvents(
}

func (c *ContextImpl) UpdateRegistry() update.Registry {
if c.updateRegistry == nil {
c.updateRegistry = update.NewRegistry(c.MutableState)
}
return c.updateRegistry
}

Expand Down
22 changes: 21 additions & 1 deletion service/history/workflow/update/registry.go
Expand Up @@ -25,10 +25,12 @@
package update

import (
"context"
"fmt"
"sync"

"github.com/gogo/protobuf/types"
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
Expand All @@ -49,17 +51,35 @@ type (

RemoveFunc func()

// Storage represents the update package's requirements for writing
// events and restoring ephemeral state from an event index.
Storage interface {
// AddWorkflowExecutionUpdateAcceptedEvent writes an update accepted
// event.
AddWorkflowExecutionUpdateAcceptedEvent(updateID string, accpt *updatepb.Acceptance) (*historypb.HistoryEvent, error)

// AddWorkflowExecutionUpdateCompletedEvent writes an update completed
// event.
AddWorkflowExecutionUpdateCompletedEvent(resp *updatepb.Response) (*historypb.HistoryEvent, error)

// GetAcceptedWorkflowExecutionUpdateIDs reads from durable state the
// set of update IDs that are known to be in the accepted state.
GetAcceptedWorkflowExecutionUpdateIDs(ctx context.Context) ([]string, error)
}

RegistryImpl struct {
sync.RWMutex
updates map[string]*Update
store Storage
}
)

var _ Registry = (*RegistryImpl)(nil)

func NewRegistry() *RegistryImpl {
func NewRegistry(store Storage) *RegistryImpl {
return &RegistryImpl{
updates: make(map[string]*Update),
store: store,
}
}

Expand Down
30 changes: 29 additions & 1 deletion service/history/workflow/update/registry_test.go
Expand Up @@ -25,13 +25,15 @@
package update

import (
"context"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
updatepb "go.temporal.io/api/update/v1"
)
Expand All @@ -43,8 +45,34 @@ type (
*require.Assertions
suite.Suite
}

mockStore struct {
Storage
AddWorkflowExecutionUpdateAcceptedEventFunc func(string, *updatepb.Acceptance) (*historypb.HistoryEvent, error)
AddWorkflowExecutionUpdateCompletedEventFunc func(*updatepb.Response) (*historypb.HistoryEvent, error)
GetAcceptedWorkflowExecutionUpdateIDsFunc func(context.Context) ([]string, error)
}
)

func (m mockStore) AddWorkflowExecutionUpdateAcceptedEvent(
updateID string,
accpt *updatepb.Acceptance,
) (*historypb.HistoryEvent, error) {
return m.AddWorkflowExecutionUpdateAcceptedEventFunc(updateID, accpt)
}

func (m mockStore) AddWorkflowExecutionUpdateCompletedEvent(
resp *updatepb.Response,
) (*historypb.HistoryEvent, error) {
return m.AddWorkflowExecutionUpdateCompletedEventFunc(resp)
}

func (m mockStore) GetAcceptedWorkflowExecutionUpdateIDs(
ctx context.Context,
) ([]string, error) {
return m.GetAcceptedWorkflowExecutionUpdateIDsFunc(ctx)
}

func (s *updateSuite) SetupSuite() {
s.Assertions = require.New(s.T())
}
Expand All @@ -55,7 +83,7 @@ func TestUpdateSuite(t *testing.T) {

func (s *updateSuite) TestValidateMessages() {

reg := NewRegistry()
reg := NewRegistry(mockStore{})
upd1, _, _ := reg.Add(&updatepb.Request{Meta: &updatepb.Meta{UpdateId: "update-1"}})

testCases := []struct {
Expand Down

0 comments on commit bb7d4af

Please sign in to comment.