From 24895ea5db0357104e60fc50ffb57641255f0557 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Mon, 7 Nov 2022 19:22:41 -0800 Subject: [PATCH] Add the ability to skip queue factories --- service/history/queueFactoryBase.go | 17 +++- service/history/queueFactoryBase_mock.go | 113 +++++++++++++++++++++++ service/history/queueFactoryBase_test.go | 67 ++++++++++++++ 3 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 service/history/queueFactoryBase_mock.go create mode 100644 service/history/queueFactoryBase_test.go diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 66c9b3d0839..8260cbb9891 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -22,6 +22,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination queueFactoryBase_mock.go QueueFactory + package history import ( @@ -63,6 +65,7 @@ type ( // 2. Move this interface to queues package after 1 is done so that there's no cycle dependency // between workflow and queues package. CreateQueue(shard shard.Context, cache workflow.Cache) queues.Queue + Enabled() bool } QueueFactoryBaseParams struct { @@ -127,16 +130,22 @@ func QueueSchedulerRateLimiterProvider( func QueueFactoryLifetimeHooks( params QueueFactoriesLifetimeHookParams, ) { + var factories []QueueFactory + for _, f := range params.Factories { + if f.Enabled() { + factories = append(factories, f) + } + } params.Lifecycle.Append( fx.Hook{ OnStart: func(context.Context) error { - for _, factory := range params.Factories { + for _, factory := range factories { factory.Start() } return nil }, OnStop: func(context.Context) error { - for _, factory := range params.Factories { + for _, factory := range factories { factory.Stop() } return nil @@ -145,6 +154,10 @@ func QueueFactoryLifetimeHooks( ) } +func (f *QueueFactoryBase) Enabled() bool { + return true +} + func (f *QueueFactoryBase) Start() { if f.HostScheduler != nil { f.HostScheduler.Start() diff --git a/service/history/queueFactoryBase_mock.go b/service/history/queueFactoryBase_mock.go new file mode 100644 index 00000000000..6ba9366ac19 --- /dev/null +++ b/service/history/queueFactoryBase_mock.go @@ -0,0 +1,113 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: queueFactoryBase.go + +// Package history is a generated GoMock package. +package history + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + queues "go.temporal.io/server/service/history/queues" + shard "go.temporal.io/server/service/history/shard" + workflow "go.temporal.io/server/service/history/workflow" +) + +// MockQueueFactory is a mock of QueueFactory interface. +type MockQueueFactory struct { + ctrl *gomock.Controller + recorder *MockQueueFactoryMockRecorder +} + +// MockQueueFactoryMockRecorder is the mock recorder for MockQueueFactory. +type MockQueueFactoryMockRecorder struct { + mock *MockQueueFactory +} + +// NewMockQueueFactory creates a new mock instance. +func NewMockQueueFactory(ctrl *gomock.Controller) *MockQueueFactory { + mock := &MockQueueFactory{ctrl: ctrl} + mock.recorder = &MockQueueFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueueFactory) EXPECT() *MockQueueFactoryMockRecorder { + return m.recorder +} + +// CreateQueue mocks base method. +func (m *MockQueueFactory) CreateQueue(shard shard.Context, cache workflow.Cache) queues.Queue { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateQueue", shard, cache) + ret0, _ := ret[0].(queues.Queue) + return ret0 +} + +// CreateQueue indicates an expected call of CreateQueue. +func (mr *MockQueueFactoryMockRecorder) CreateQueue(shard, cache interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateQueue", reflect.TypeOf((*MockQueueFactory)(nil).CreateQueue), shard, cache) +} + +// Enabled mocks base method. +func (m *MockQueueFactory) Enabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Enabled indicates an expected call of Enabled. +func (mr *MockQueueFactoryMockRecorder) Enabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enabled", reflect.TypeOf((*MockQueueFactory)(nil).Enabled)) +} + +// Start mocks base method. +func (m *MockQueueFactory) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockQueueFactoryMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockQueueFactory)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockQueueFactory) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockQueueFactoryMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockQueueFactory)(nil).Stop)) +} diff --git a/service/history/queueFactoryBase_test.go b/service/history/queueFactoryBase_test.go new file mode 100644 index 00000000000..f8713d27a92 --- /dev/null +++ b/service/history/queueFactoryBase_test.go @@ -0,0 +1,67 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "go.uber.org/fx" +) + +func TestQueueFactoryLifetimeHooks(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + disabledQueue = NewMockQueueFactory(ctrl) + enabledQueue = NewMockQueueFactory(ctrl) + ) + + disabledQueue.EXPECT().Enabled().Return(false) + enabledQueue.EXPECT().Enabled().Return(true) + + app := fx.New( + fx.Provide(fx.Annotated{ + Group: QueueFactoryFxGroup, + Target: func() QueueFactory { + return disabledQueue + }, + }, fx.Annotated{ + Group: QueueFactoryFxGroup, + Target: func() QueueFactory { + return enabledQueue + }, + }), + fx.Invoke(QueueFactoryLifetimeHooks), + ) + require.NoError(t, app.Err()) + + enabledQueue.EXPECT().Start() + require.NoError(t, app.Start(context.Background())) + + enabledQueue.EXPECT().Stop() + require.NoError(t, app.Stop(context.Background())) +}