Skip to content

Commit

Permalink
Add a pluggable ExecutableWrapper to our queue factories
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 14, 2023
1 parent 00ffdbf commit 5e9ba7d
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 262 deletions.
4 changes: 2 additions & 2 deletions service/history/archival_queue_factory.go
Expand Up @@ -160,13 +160,12 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
metricsHandler,
)

factory := f.NewExecutableFactory(shard, executor, rescheduler, logger, metricsHandler, f.ExecutableWrapper)
return queues.NewScheduledQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
rescheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
ReaderOptions: queues.ReaderOptions{
BatchSize: f.Config.ArchivalTaskBatchSize,
Expand All @@ -188,5 +187,6 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
f.HostReaderRateLimiter,
logger,
metricsHandler,
factory,
)
}
27 changes: 26 additions & 1 deletion service/history/queue_factory_base.go
Expand Up @@ -73,7 +73,8 @@ type (
Logger log.SnTaggedLogger
SchedulerRateLimiter queues.SchedulerRateLimiter

ExecutorWrapper queues.ExecutorWrapper `optional:"true"`
ExecutorWrapper queues.ExecutorWrapper `optional:"true"`
ExecutableWrapper queues.ExecutableWrapper `optional:"true"`
}

QueueFactoryBase struct {
Expand Down Expand Up @@ -219,6 +220,30 @@ func (f *QueueFactoryBase) Stop() {
}
}

func (f *QueueFactoryBase) NewExecutableFactory(
shardContext shard.Context,
executor queues.Executor,
rescheduler queues.Rescheduler,
logger log.Logger,
metricsHandler metrics.Handler,
executableWrapper queues.ExecutableWrapper,
) queues.ExecutableFactory {
factory := queues.NewExecutableFactory(executor,
f.HostScheduler,
rescheduler,
f.HostPriorityAssigner,
shardContext.GetTimeSource(),
shardContext.GetNamespaceRegistry(),
shardContext.GetClusterMetadata(),
logger,
metricsHandler,
)
if executableWrapper == nil {
return factory
}
return queues.NewExecutableFactoryWrapper(factory, executableWrapper)
}

func NewQueueHostRateLimiter(
hostRPS dynamicconfig.IntPropertyFn,
persistenceMaxRPS dynamicconfig.IntPropertyFn,
Expand Down
117 changes: 117 additions & 0 deletions service/history/queues/executable_factory.go
@@ -0,0 +1,117 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/tasks"
)

type (
// ExecutableFactory and the related interfaces here are needed to make it possible to extend the functionality of
// task processing without changing its core logic. This is similar to ExecutorWrapper.
ExecutableFactory interface {
NewExecutable(task tasks.Task, readerID int64) Executable
}
// ExecutableFactoryFn is a convenience type to avoid having to create a struct that implements ExecutableFactory.
ExecutableFactoryFn func(readerID int64, t tasks.Task) Executable
ExecutableWrapper interface {
Wrap(e Executable) Executable
}

executableFactoryImpl struct {
executor Executor
scheduler Scheduler
rescheduler Rescheduler
priorityAssigner PriorityAssigner
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
clusterMetadata cluster.Metadata
logger log.Logger
metricsHandler metrics.Handler
}
executableFactoryWrapper struct {
factory ExecutableFactory
wrapper ExecutableWrapper
}
)

func (f ExecutableFactoryFn) NewExecutable(task tasks.Task, readerID int64) Executable {
return f(readerID, task)
}

func NewExecutableFactory(
executor Executor,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
timeSource clock.TimeSource,
namespaceRegistry namespace.Registry,
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
) *executableFactoryImpl {
return &executableFactoryImpl{
executor: executor,
scheduler: scheduler,
rescheduler: rescheduler,
priorityAssigner: priorityAssigner,
timeSource: timeSource,
namespaceRegistry: namespaceRegistry,
clusterMetadata: clusterMetadata,
logger: logger,
metricsHandler: metricsHandler,
}
}

// NewExecutableFactoryWrapper returns a new ExecutableFactory that wraps the executable created by the factory with the
// given wrapper.
func NewExecutableFactoryWrapper(factory ExecutableFactory, wrapper ExecutableWrapper) executableFactoryWrapper {
return executableFactoryWrapper{factory: factory, wrapper: wrapper}
}

func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) Executable {
return NewExecutable(
readerID,
task,
f.executor,
f.scheduler,
f.rescheduler,
f.priorityAssigner,
f.timeSource,
f.namespaceRegistry,
f.clusterMetadata,
f.logger,
f.metricsHandler,
)
}

func (f executableFactoryWrapper) NewExecutable(task tasks.Task, readerID int64) Executable {
return f.wrapper.Wrap(f.factory.NewExecutable(task, readerID))
}
67 changes: 67 additions & 0 deletions service/history/queues/executable_factory_test.go
@@ -0,0 +1,67 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues_test

import (
"testing"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/tasks"
)

type (
testWrapper struct{}
wrappedExecutable struct {
queues.Executable
}
)

func TestNewExecutableFactoryWrapper(t *testing.T) {
t.Parallel()

wrapper := testWrapper{}
factory := queues.NewExecutableFactory(
nil,
nil,
nil,
queues.NewNoopPriorityAssigner(),
clock.NewEventTimeSource(),
nil,
nil,
log.NewNoopLogger(),
nil,
)
wrappedFactory := queues.NewExecutableFactoryWrapper(factory, wrapper)
executable := wrappedFactory.NewExecutable(&tasks.WorkflowTask{}, 0)
_, ok := executable.(wrappedExecutable)
assert.True(t, ok, "expected executable to be wrapped")
}

func (t testWrapper) Wrap(e queues.Executable) queues.Executable {
return wrappedExecutable{e}
}
46 changes: 8 additions & 38 deletions service/history/queues/queue_base.go
Expand Up @@ -94,8 +94,8 @@ type (
logger log.Logger
metricsHandler metrics.Handler

paginationFnProvider PaginationFnProvider
executableInitializer ExecutableInitializer
paginationFnProvider PaginationFnProvider
executableFactory ExecutableFactory

lastRangeID int64
exclusiveDeletionHighWatermark tasks.Key
Expand Down Expand Up @@ -123,20 +123,7 @@ type (
}
)

func newQueueBase(
shard hshard.Context,
category tasks.Category,
paginationFnProvider PaginationFnProvider,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
hostReaderRateLimiter quotas.RequestRateLimiter,
completionFn ReaderCompletionFn,
logger log.Logger,
metricsHandler metrics.Handler,
) *queueBase {
func newQueueBase(shard hshard.Context, category tasks.Category, paginationFnProvider PaginationFnProvider, scheduler Scheduler, rescheduler Rescheduler, options *Options, hostReaderRateLimiter quotas.RequestRateLimiter, completionFn ReaderCompletionFn, logger log.Logger, metricsHandler metrics.Handler, executableInitializer ExecutableFactory) *queueBase {
var readerScopes map[int64][]Scope
var exclusiveReaderHighWatermark tasks.Key
if persistenceState, ok := shard.GetQueueState(category); ok {
Expand All @@ -154,24 +141,7 @@ func newQueueBase(
exclusiveReaderHighWatermark = ackLevel
}

timeSource := shard.GetTimeSource()
executableInitializer := func(readerID int64, t tasks.Task) Executable {
return NewExecutable(
readerID,
t,
executor,
scheduler,
rescheduler,
priorityAssigner,
timeSource,
shard.GetNamespaceRegistry(),
shard.GetClusterMetadata(),
logger,
metricsHandler,
)
}

monitor := newMonitor(category.Type(), timeSource, &options.MonitorOptions)
monitor := newMonitor(category.Type(), shard.GetTimeSource(), &options.MonitorOptions)
readerRateLimiter := newShardReaderRateLimiter(
options.MaxPollRPS,
hostReaderRateLimiter,
Expand All @@ -194,7 +164,7 @@ func newQueueBase(
&readerOptions,
scheduler,
rescheduler,
timeSource,
shard.GetTimeSource(),
readerRateLimiter,
monitor,
completionFn,
Expand Down Expand Up @@ -256,8 +226,8 @@ func newQueueBase(
logger: logger,
metricsHandler: metricsHandler,

paginationFnProvider: paginationFnProvider,
executableInitializer: executableInitializer,
paginationFnProvider: paginationFnProvider,
executableFactory: executableInitializer,

lastRangeID: -1, // start from an invalid rangeID
exclusiveDeletionHighWatermark: exclusiveDeletionHighWatermark,
Expand Down Expand Up @@ -332,7 +302,7 @@ func (p *queueBase) processNewRange() {
newReadScope, p.nonReadableScope = p.nonReadableScope.SplitByRange(newMaxKey)
slices = append(slices, NewSlice(
p.paginationFnProvider,
p.executableInitializer,
p.executableFactory,
p.monitor,
newReadScope,
))
Expand Down

0 comments on commit 5e9ba7d

Please sign in to comment.