Skip to content

Commit

Permalink
Event buffer size limit (#4296)
Browse files Browse the repository at this point in the history
* Event buffer size limit

* Make size limit inclusive

* Make default size limit more readable

* Add history builder test for proto size

* Split db and memory batch for loop

* Use values instead of pointers to avoid parallel test glitch

* Make a copy of the test case for parallel test to work
  • Loading branch information
MichaelSnowden committed May 8, 2023
1 parent 8fa4d3f commit 8e6203a
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 40 deletions.
5 changes: 4 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,11 @@ const (
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
// MaximumBufferedEventsBatch is max number of buffer event in mutable state
// MaximumBufferedEventsBatch is the maximum permissible number of buffered events for any given mutable state.
MaximumBufferedEventsBatch = "history.maximumBufferedEventsBatch"
// MaximumBufferedEventsSizeInBytes is the maximum permissible size of all buffered events for any given mutable
// state. The total size is determined by the sum of the size, in bytes, of each HistoryEvent proto.
MaximumBufferedEventsSizeInBytes = "history.maximumBufferedEventsSizeInBytes"
// MaximumSignalsPerExecution is max number of signals supported by single execution
MaximumSignalsPerExecution = "history.maximumSignalsPerExecution"
// ShardUpdateMinInterval is the minimal time interval which the shard info can be updated
Expand Down
2 changes: 1 addition & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ const (
FrontendClientListOpenWorkflowExecutionsScope = "FrontendClientListOpenWorkflowExecutions"
// FrontendClientPollActivityTaskQueueScope tracks RPC calls to frontend service
FrontendClientPollActivityTaskQueueScope = "FrontendClientPollActivityTaskQueue"
//FrontendClientPollWorkflowExecutionUpdateScope tracks RPC calls to frontend service
// FrontendClientPollWorkflowExecutionUpdateScope tracks RPC calls to frontend service
FrontendClientPollWorkflowExecutionUpdateScope = "FrontendClientPollWorkflowExecutionUpdate"
// FrontendClientPollWorkflowTaskQueueScope tracks RPC calls to frontend service
FrontendClientPollWorkflowTaskQueueScope = "FrontendClientPollWorkflowTaskQueue"
Expand Down
16 changes: 9 additions & 7 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ type Config struct {
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn

// System Limits
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
MaximumBufferedEventsSizeInBytes dynamicconfig.IntPropertyFn
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter

// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -408,11 +409,12 @@ func NewConfig(
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),

MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
MaximumBufferedEventsSizeInBytes: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsSizeInBytes, 2*1024*1024),
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),

// history client: client/history/client.go set the client timeout 30s
// TODO: Return this value to the client: go.temporal.io/server/issues/294
Expand Down
13 changes: 12 additions & 1 deletion service/history/workflow/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,10 +1119,21 @@ func (b *HistoryBuilder) HasAnyBufferedEvent(filter BufferedEventFilter) bool {
return false
}

func (b *HistoryBuilder) BufferEventSize() int {
func (b *HistoryBuilder) NumBufferedEvents() int {
return len(b.dbBufferBatch) + len(b.memBufferBatch)
}

func (b *HistoryBuilder) SizeInBytesOfBufferedEvents() int {
size := 0
for _, ev := range b.dbBufferBatch {
size += ev.Size()
}
for _, ev := range b.memBufferBatch {
size += ev.Size()
}
return size
}

func (b *HistoryBuilder) NextEventID() int64 {
return b.nextEventID
}
Expand Down
37 changes: 36 additions & 1 deletion service/history/workflow/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -2316,6 +2315,42 @@ func (s *historyBuilderSuite) TestReorder() {
)
}

func (s *historyBuilderSuite) TestBufferSize_Memory() {
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
s.historyBuilder.AddWorkflowExecutionSignaledEvent(
"signal-name",
&commonpb.Payloads{},
"identity",
&commonpb.Header{},
true,
)
s.Assert().Equal(1, s.historyBuilder.NumBufferedEvents())
// the size of the proto is non-deterministic, so just assert that it's non-zero, and it isn't really high
s.Assert().Greater(s.historyBuilder.SizeInBytesOfBufferedEvents(), 0)
s.Assert().Less(s.historyBuilder.SizeInBytesOfBufferedEvents(), 100)
s.flush()
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
}

func (s *historyBuilderSuite) TestBufferSize_DB() {
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{{
EventType: enumspb.EVENT_TYPE_TIMER_FIRED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}}
s.Assert().Equal(1, s.historyBuilder.NumBufferedEvents())
// the size of the proto is non-deterministic, so just assert that it's non-zero, and it isn't really high
s.Assert().Greater(s.historyBuilder.SizeInBytesOfBufferedEvents(), 0)
s.Assert().Less(s.historyBuilder.SizeInBytesOfBufferedEvents(), 100)
s.flush()
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
}

func (s *historyBuilderSuite) assertEventIDTaskID(
historyMutation *HistoryMutation,
) {
Expand Down
13 changes: 12 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4601,6 +4601,17 @@ func (ms *MutableStateImpl) closeTransactionWithPolicyCheck(
}
}

func (ms *MutableStateImpl) BufferSizeAcceptable() bool {
if ms.hBuilder.NumBufferedEvents() > ms.config.MaximumBufferedEventsBatch() {
return false
}

if ms.hBuilder.SizeInBytesOfBufferedEvents() > ms.config.MaximumBufferedEventsSizeInBytes() {
return false
}
return true
}

func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
transactionPolicy TransactionPolicy,
) error {
Expand All @@ -4610,7 +4621,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
return nil
}

if ms.hBuilder.BufferEventSize() < ms.config.MaximumBufferedEventsBatch() {
if ms.BufferSizeAcceptable() {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
}))
err := s.mutableState.ReplicateWorkflowTaskCompletedEvent(newWorkflowTaskCompletedEvent)
s.NoError(err)
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
}

func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskTimeout() {
Expand All @@ -175,7 +175,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
newWorkflowTask,
)
s.NoError(err)
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
}

func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskFailed() {
Expand All @@ -202,7 +202,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
"", "", "", 0,
)
s.NoError(err)
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
}

func (s *mutableStateSuite) TestChecksum() {
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskSchedule_CurrentVersionChan
s.NotNil(wt)

s.Equal(int32(1), s.mutableState.GetExecutionInfo().WorkflowTaskAttempt)
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
}

func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged() {
Expand Down Expand Up @@ -461,7 +461,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
"random identity",
)
s.NoError(err)
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
}

func (s *mutableStateSuite) TestSanitizedMutableState() {
Expand Down
67 changes: 44 additions & 23 deletions service/history/workflow/workflow_test/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package workflow_test

import (
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -59,27 +60,39 @@ func TestMutableStateImpl_ForceFlushBufferedEvents(t *testing.T) {

for _, tc := range []mutationTestCase{
{
name: "signals<maxNumEvents",
signals: 1,
maxNumEvents: 2,
name: "Number of events ok",
transactionPolicy: workflow.TransactionPolicyActive,
signals: 2,
maxEvents: 2,
maxSizeInBytes: math.MaxInt,
expectFlush: false,
},
{
name: "signals=maxNumEvents",
signals: 2,
maxNumEvents: 2,
name: "Max number of events exceeded",
transactionPolicy: workflow.TransactionPolicyActive,
signals: 3,
maxEvents: 2,
maxSizeInBytes: math.MaxInt,
expectFlush: true,
},
{
name: "signals>maxNumEvents",
signals: 3,
maxNumEvents: 2,
name: "Number of events ok but byte size limit exceeded",
transactionPolicy: workflow.TransactionPolicyActive,
signals: 2,
maxEvents: 2,
maxSizeInBytes: 25,
expectFlush: true,
},
{
name: "Max number of events and size of events both exceeded",
transactionPolicy: workflow.TransactionPolicyActive,
signals: 3,
maxEvents: 2,
maxSizeInBytes: 25,
expectFlush: true,
},
} {
tc := tc
t.Run(tc.name, tc.Run)
}
}
Expand All @@ -88,11 +101,12 @@ type mutationTestCase struct {
name string
transactionPolicy workflow.TransactionPolicy
signals int
maxNumEvents int
maxEvents int
expectFlush bool
maxSizeInBytes int
}

func (c mutationTestCase) Run(t *testing.T) {
func (c *mutationTestCase) Run(t *testing.T) {
t.Parallel()

nsEntry := tests.LocalNamespaceEntry
Expand All @@ -118,7 +132,7 @@ func (c mutationTestCase) Run(t *testing.T) {
}
}

func (c mutationTestCase) startWFT(
func (c *mutationTestCase) startWFT(
t *testing.T,
ms *workflow.MutableStateImpl,
) *workflow.WorkflowTaskInfo {
Expand All @@ -137,7 +151,7 @@ func (c mutationTestCase) startWFT(
return wft
}

func (c mutationTestCase) startWorkflowExecution(
func (c *mutationTestCase) startWorkflowExecution(
t *testing.T,
ms *workflow.MutableStateImpl,
nsEntry *namespace.Namespace,
Expand Down Expand Up @@ -165,7 +179,7 @@ func (c mutationTestCase) startWorkflowExecution(
}
}

func (c mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *workflow.MutableStateImpl) {
func (c *mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *workflow.MutableStateImpl) {
t.Helper()

payload := &commonpb.Payloads{}
Expand All @@ -184,7 +198,7 @@ func (c mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *
}
}

func (c mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Namespace) *workflow.MutableStateImpl {
func (c *mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Namespace) *workflow.MutableStateImpl {
t.Helper()

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -223,16 +237,23 @@ func (c mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Na
return ms
}

func (c mutationTestCase) createConfig() *configs.Config {
func (c *mutationTestCase) createConfig() *configs.Config {
cfg := tests.NewDynamicConfig()
cfg.MaximumBufferedEventsBatch = func() int {
return c.maxNumEvents
}
cfg.MaximumBufferedEventsBatch = c.getMaxEvents
cfg.MaximumBufferedEventsSizeInBytes = c.getMaxSizeInBytes

return cfg
}

func (c mutationTestCase) testWFTFailedEvent(
func (c *mutationTestCase) getMaxEvents() int {
return c.maxEvents
}

func (c *mutationTestCase) getMaxSizeInBytes() int {
return c.maxSizeInBytes
}

func (c *mutationTestCase) testWFTFailedEvent(
t *testing.T,
wft *workflow.WorkflowTaskInfo,
event *history.HistoryEvent,
Expand All @@ -256,7 +277,7 @@ func (c mutationTestCase) testWFTFailedEvent(
}
}

func (c mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEvents []*persistence.WorkflowEvents) (
func (c *mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEvents []*persistence.WorkflowEvents) (
*history.HistoryEvent,
bool,
) {
Expand All @@ -271,7 +292,7 @@ func (c mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEven
return nil, false
}

func (c mutationTestCase) testFailure(
func (c *mutationTestCase) testFailure(
t *testing.T,
ms *workflow.MutableStateImpl,
wft *workflow.WorkflowTaskInfo,
Expand Down Expand Up @@ -310,7 +331,7 @@ func (c mutationTestCase) testFailure(
c.testWFTFailedEvent(t, wft, event)
}

func (c mutationTestCase) testSuccess(
func (c *mutationTestCase) testSuccess(
t *testing.T,
ms *workflow.MutableStateImpl,
workflowEvents []*persistence.WorkflowEvents,
Expand Down

0 comments on commit 8e6203a

Please sign in to comment.