Skip to content

Commit

Permalink
Turns on the history scavenger for SQL backends (#3462)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden authored and dnr committed Oct 17, 2022
1 parent 8cd481a commit 45bd55d
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 15 deletions.
28 changes: 18 additions & 10 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package scanner

import (
"context"
"sync"
"time"

"go.temporal.io/api/serviceerror"
Expand All @@ -40,17 +41,13 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/sdk"

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
)

const (
// scannerStartUpDelay is to let services warm up
scannerStartUpDelay = time.Second * 4
)

type (
// Config defines the configuration for scanner
Config struct {
Expand Down Expand Up @@ -84,13 +81,15 @@ type (
executionManager persistence.ExecutionManager
taskManager persistence.TaskManager
historyClient historyservice.HistoryServiceClient
workerFactory sdk.WorkerFactory
}

// Scanner is the background sub-system that does full scans
// of database tables to cleanup resources, monitor anamolies
// and emit stats for analytics
Scanner struct {
context scannerContext
wg sync.WaitGroup
}
)

Expand All @@ -107,6 +106,7 @@ func New(
executionManager persistence.ExecutionManager,
taskManager persistence.TaskManager,
historyClient historyservice.HistoryServiceClient,
workerFactory sdk.WorkerFactory,
) *Scanner {
return &Scanner{
context: scannerContext{
Expand All @@ -117,6 +117,7 @@ func New(
executionManager: executionManager,
taskManager: taskManager,
historyClient: historyClient,
workerFactory: workerFactory,
},
}
}
Expand All @@ -137,20 +138,25 @@ func (s *Scanner) Start() error {

var workerTaskQueueNames []string
if s.context.cfg.ExecutionsScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(executionsScannerWFStartOptions, executionsScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, executionsScannerTaskQueueName)
}

if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL && s.context.cfg.TaskQueueScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(tlScannerWFStartOptions, tqScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, tqScannerTaskQueueName)
} else if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeNoSQL && s.context.cfg.HistoryScannerEnabled() {
}

if s.context.cfg.HistoryScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName)
}

for _, tl := range workerTaskQueueNames {
work := worker.New(s.context.sdkSystemClient, tl, workerOpts)
work := s.context.workerFactory.New(s.context.sdkSystemClient, tl, workerOpts)

work.RegisterWorkflowWithOptions(TaskQueueScannerWorkflow, workflow.RegisterOptions{Name: tqScannerWFTypeName})
work.RegisterWorkflowWithOptions(HistoryScannerWorkflow, workflow.RegisterOptions{Name: historyScannerWFTypeName})
Expand All @@ -167,14 +173,16 @@ func (s *Scanner) Start() error {
return nil
}

func (s *Scanner) Stop() {
s.wg.Wait()
}

func (s *Scanner) startWorkflowWithRetry(
options sdkclient.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) {

// let history / matching service warm up
time.Sleep(scannerStartUpDelay)
defer s.wg.Done()

policy := backoff.NewExponentialRetryPolicy(time.Second).
WithMaximumInterval(time.Minute).
Expand Down
210 changes: 210 additions & 0 deletions service/worker/scanner/scanner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package scanner

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"go.temporal.io/server/api/historyservicemock/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/testing/mocksdk"
)

type scannerTestSuite struct {
suite.Suite
}

func TestScanner(t *testing.T) {
suite.Run(t, new(scannerTestSuite))
}

func (s *scannerTestSuite) TestScannerEnabled() {
type expectedScanner struct {
WFTypeName string
TaskQueueName string
}
executionScanner := expectedScanner{
WFTypeName: executionsScannerWFTypeName,
TaskQueueName: executionsScannerTaskQueueName,
}
_ = executionScanner
taskQueueScanner := expectedScanner{
WFTypeName: tqScannerWFTypeName,
TaskQueueName: tqScannerTaskQueueName,
}
historyScanner := expectedScanner{
WFTypeName: historyScannerWFTypeName,
TaskQueueName: historyScannerTaskQueueName,
}

type testCase struct {
Name string
ExecutionsScannerEnabled bool
TaskQueueScannerEnabled bool
HistoryScannerEnabled bool
DefaultStore string
ExpectedScanners []expectedScanner
}

for _, c := range []testCase{
{
Name: "NothingEnabledNoSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeNoSQL,
ExpectedScanners: []expectedScanner{},
},
{
Name: "NothingEnabledSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeSQL,
ExpectedScanners: []expectedScanner{},
},
{
Name: "HistoryScannerNoSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: true,
DefaultStore: config.StoreTypeNoSQL,
ExpectedScanners: []expectedScanner{historyScanner},
},
{
Name: "HistoryScannerSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: true,
DefaultStore: config.StoreTypeSQL,
ExpectedScanners: []expectedScanner{historyScanner},
},
{
Name: "TaskQueueScannerNoSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: true,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeNoSQL,
ExpectedScanners: []expectedScanner{}, // TODO: enable task queue scanner for NoSQL?
},
{
Name: "TaskQueueScannerSQL",
ExecutionsScannerEnabled: false,
TaskQueueScannerEnabled: true,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeSQL,
ExpectedScanners: []expectedScanner{taskQueueScanner},
},
{
Name: "ExecutionsScannerNoSQL",
ExecutionsScannerEnabled: true,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeNoSQL,
ExpectedScanners: []expectedScanner{executionScanner},
},
{
Name: "ExecutionsScannerSQL",
ExecutionsScannerEnabled: true,
TaskQueueScannerEnabled: false,
HistoryScannerEnabled: false,
DefaultStore: config.StoreTypeSQL,
ExpectedScanners: []expectedScanner{executionScanner},
},
{
Name: "AllScannersSQL",
ExecutionsScannerEnabled: true,
TaskQueueScannerEnabled: true,
HistoryScannerEnabled: true,
DefaultStore: config.StoreTypeSQL,
ExpectedScanners: []expectedScanner{historyScanner, taskQueueScanner, executionScanner},
},
} {
s.Run(c.Name, func() {
ctrl := gomock.NewController(s.T())
mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl)
mockSdkClient := mocksdk.NewMockClient(ctrl)
scanner := New(
log.NewNoopLogger(),
&Config{
MaxConcurrentActivityExecutionSize: func() int {
return 1
},
MaxConcurrentWorkflowTaskExecutionSize: func() int {
return 1
},
MaxConcurrentActivityTaskPollers: func() int {
return 1
},
MaxConcurrentWorkflowTaskPollers: func() int {
return 1
},
ExecutionsScannerEnabled: func() bool {
return c.ExecutionsScannerEnabled
},
HistoryScannerEnabled: func() bool {
return c.HistoryScannerEnabled
},
TaskQueueScannerEnabled: func() bool {
return c.TaskQueueScannerEnabled
},
Persistence: &config.Persistence{
DefaultStore: c.DefaultStore,
DataStores: map[string]config.DataStore{
config.StoreTypeNoSQL: {},
config.StoreTypeSQL: {
SQL: &config.SQL{},
},
},
},
},
mockSdkClient,
metrics.NoopClient,
p.NewMockExecutionManager(ctrl),
p.NewMockTaskManager(ctrl),
historyservicemock.NewMockHistoryServiceClient(ctrl),
mockWorkerFactory,
)
for _, sc := range c.ExpectedScanners {
worker := mocksdk.NewMockWorker(ctrl)
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().Start()
mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker)
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any())
}
err := scanner.Start()
s.NoError(err)
scanner.Stop()
})
}
}
21 changes: 16 additions & 5 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type (

workerManager *workerManager
perNamespaceWorkerManager *perNamespaceWorkerManager
scanner *scanner.Scanner
workerFactory sdk.WorkerFactory
}

// Config contains all the service config for worker
Expand Down Expand Up @@ -148,13 +150,14 @@ func NewService(
workerManager *workerManager,
perNamespaceWorkerManager *perNamespaceWorkerManager,
visibilityManager manager.VisibilityManager,
workerFactory sdk.WorkerFactory,
) (*Service, error) {
workerServiceResolver, err := membershipMonitor.GetResolver(common.WorkerServiceName)
if err != nil {
return nil, err
}

return &Service{
s := &Service{
status: common.DaemonStatusInitialized,
config: serviceConfig,
sdkClientFactory: sdkClientFactory,
Expand All @@ -181,7 +184,10 @@ func NewService(

workerManager: workerManager,
perNamespaceWorkerManager: perNamespaceWorkerManager,
}, nil
workerFactory: workerFactory,
}
s.initScanner()
return s, nil
}

// NewConfig builds the new Config for worker service
Expand Down Expand Up @@ -401,6 +407,7 @@ func (s *Service) Stop() {

close(s.stopC)

s.scanner.Stop()
s.perNamespaceWorkerManager.Stop()
s.workerManager.Stop()
s.namespaceRegistry.Stop()
Expand Down Expand Up @@ -449,17 +456,21 @@ func (s *Service) startBatcher() {
}
}

func (s *Service) startScanner() {
sc := scanner.New(
func (s *Service) initScanner() {
s.scanner = scanner.New(
s.logger,
s.config.ScannerCfg,
s.sdkClientFactory.GetSystemClient(),
s.metricsClient,
s.executionManager,
s.taskManager,
s.historyClient,
s.workerFactory,
)
if err := sc.Start(); err != nil {
}

func (s *Service) startScanner() {
if err := s.scanner.Start(); err != nil {
s.logger.Fatal(
"error starting scanner",
tag.Error(err),
Expand Down

0 comments on commit 45bd55d

Please sign in to comment.