Skip to content

Commit

Permalink
Move poller map to matchingEngine (#4471)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jun 10, 2023
1 parent 65300d5 commit cee5ba3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 82 deletions.
91 changes: 60 additions & 31 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ const (
recordTaskStartedSyncMatchTimeout = 1 * time.Second
)

// Implements matching.Engine
// TODO: Switch implementation from lock/channel based to a partitioned agent
// to simplify code and reduce possibility of synchronization errors.
type (
pollerIDCtxKey string
identityCtxKey string
Expand All @@ -89,6 +86,11 @@ type (
queryTaskMap map[string]chan *queryResult
}

lockablePollMap struct {
sync.Mutex
polls map[string]context.CancelFunc
}

taskQueueCounterKey struct {
namespaceID namespace.ID
taskType enumspb.TaskQueueType
Expand All @@ -105,24 +107,31 @@ type (
replicationLock sync.Mutex
}

// Implements matching.Engine
matchingEngineImpl struct {
status int32
taskManager persistence.TaskManager
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
tokenSerializer common.TaskTokenSerializer
logger log.Logger
namespaceRegistry namespace.Registry
keyResolver membership.ServiceResolver
clusterMeta cluster.Metadata
timeSource clock.TimeSource
visibilityManager manager.VisibilityManager
metricsHandler metrics.Handler
taskQueuesLock sync.RWMutex // locks mutation of taskQueues
taskQueues map[taskQueueID]taskQueueManager
taskQueueCount map[taskQueueCounterKey]int // per-namespace task queue counter
config *Config
lockableQueryTaskMap lockableQueryTaskMap
namespaceRegistry namespace.Registry
keyResolver membership.ServiceResolver
clusterMeta cluster.Metadata
timeSource clock.TimeSource
visibilityManager manager.VisibilityManager
// pollMap is needed to keep track of all outstanding pollers for a particular
// taskqueue. PollerID generated by frontend is used as the key and CancelFunc is the
// value. This is used to cancel the context to unblock any outstanding poller when
// the frontend detects client connection is closed to prevent tasks being dispatched
// to zombie pollers.
pollMap lockablePollMap
// Only set if global namespaces are enabled on the cluster.
namespaceReplicationQueue persistence.NamespaceReplicationQueue
// Disables concurrent task queue user data updates and replication requests (due to a cassandra limitation)
Expand Down Expand Up @@ -167,19 +176,20 @@ func NewEngine(
status: common.DaemonStatusInitialized,
taskManager: taskManager,
historyClient: historyClient,
matchingClient: matchingClient,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
taskQueues: make(map[taskQueueID]taskQueueManager),
taskQueueCount: make(map[taskQueueCounterKey]int),
logger: log.With(logger, tag.ComponentMatchingEngine),
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.MatchingEngineScope)),
matchingClient: matchingClient,
config: config,
lockableQueryTaskMap: lockableQueryTaskMap{queryTaskMap: make(map[string]chan *queryResult)},
namespaceRegistry: namespaceRegistry,
keyResolver: resolver,
clusterMeta: clusterMeta,
timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment
visibilityManager: visibilityManager,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.MatchingEngineScope)),
taskQueues: make(map[taskQueueID]taskQueueManager),
taskQueueCount: make(map[taskQueueCounterKey]int),
config: config,
lockableQueryTaskMap: lockableQueryTaskMap{queryTaskMap: make(map[string]chan *queryResult)},
pollMap: lockablePollMap{polls: make(map[string]context.CancelFunc)},
namespaceReplicationQueue: namespaceReplicationQueue,
namespaceUpdateLockMap: make(map[string]*namespaceUpdateLocks),
}
Expand Down Expand Up @@ -735,22 +745,7 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(
ctx context.Context,
request *matchingservice.CancelOutstandingPollRequest,
) error {
namespaceID := namespace.ID(request.GetNamespaceId())
taskQueueType := request.GetTaskQueueType()
taskQueueName := request.TaskQueue.GetName()
stickyInfo := stickyInfoFromTaskQueue(request.TaskQueue)
pollerID := request.GetPollerId()

taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, taskQueueType)
if err != nil {
return err
}
tlMgr, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, false)
if err != nil || tlMgr == nil {
return err
}

tlMgr.CancelPoller(pollerID)
e.pollMap.cancel(request.PollerId)
return nil
}

Expand Down Expand Up @@ -1195,6 +1190,19 @@ func (e *matchingEngineImpl) getTask(
if err != nil {
return nil, err
}

// We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
ctx, cancel := newChildContext(ctx, tlMgr.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()

if pollerID, ok := ctx.Value(pollerIDKey).(string); ok && pollerID != "" {
e.pollMap.add(pollerID, cancel)
defer e.pollMap.remove(pollerID)
}

return tlMgr.GetTask(ctx, pollMetadata)
}

Expand Down Expand Up @@ -1488,10 +1496,31 @@ func (m *lockableQueryTaskMap) delete(key string) {
delete(m.queryTaskMap, key)
}

func (m *lockablePollMap) add(cancelId string, cancel context.CancelFunc) {
m.Lock()
defer m.Unlock()
m.polls[cancelId] = cancel
}

func (m *lockablePollMap) remove(cancelId string) {
m.Lock()
defer m.Unlock()
delete(m.polls, cancelId)
}

func (m *lockablePollMap) cancel(cancelId string) {
m.Lock()
defer m.Unlock()
if cancel, ok := m.polls[cancelId]; ok {
cancel()
delete(m.polls, cancelId)
}
}

// newRecordTaskStartedContext creates a context for recording
// activity or workflow task started. The parentCtx from
// pollActivity/WorkflowTaskQueue endpoint (which is a long poll
// API) has long timeout and unsutiable for recording task started,
// API) has long timeout and unsuitable for recording task started,
// especially if the task is doing sync match and has caller
// (history transfer queue) waiting for response.
func newRecordTaskStartedContext(
Expand Down
66 changes: 15 additions & 51 deletions service/matching/taskQueueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -139,14 +138,14 @@ type (
// UpdateUserData updates user data for this task queue and replicates across clusters if necessary.
// Extra care should be taken to avoid mutating the existing data in the update function.
UpdateUserData(ctx context.Context, options UserDataUpdateOptions, updateFn UserDataUpdateFunc) error
CancelPoller(pollerID string)
GetAllPollerInfo() []*taskqueuepb.PollerInfo
HasPollerAfter(accessTime time.Time) bool
// DescribeTaskQueue returns information about the target task queue
DescribeTaskQueue(includeTaskQueueStatus bool) *matchingservice.DescribeTaskQueueResponse
String() string
QueueID() *taskQueueID
TaskQueueKind() enumspb.TaskQueueKind
LongPollExpirationInterval() time.Duration
}

// Single task queue in memory state
Expand All @@ -170,17 +169,11 @@ type (
namespace namespace.Name
taggedMetricsHandler metrics.Handler // namespace/taskqueue tagged metric scope
// pollerHistory stores poller which poll from this taskqueue in last few minutes
pollerHistory *pollerHistory
// outstandingPollsMap is needed to keep track of all outstanding pollers for a
// particular taskqueue. PollerID generated by frontend is used as the key and
// CancelFunc is the value. This is used to cancel the context to unblock any
// outstanding poller when the frontend detects client connection is closed to
// prevent tasks being dispatched to zombie pollers.
outstandingPollsLock sync.Mutex
outstandingPollsMap map[string]context.CancelFunc
clusterMeta cluster.Metadata
goroGroup goro.Group
initializedError *future.FutureImpl[struct{}]
pollerHistory *pollerHistory
currentPolls atomic.Int64
clusterMeta cluster.Metadata
goroGroup goro.Group
initializedError *future.FutureImpl[struct{}]
// userDataInitialFetch is fulfilled once versioning data is fetched from the root partition. If this TQ is
// the root partition, it is fulfilled as soon as it is fetched from db.
userDataInitialFetch *future.FutureImpl[struct{}]
Expand Down Expand Up @@ -250,7 +243,6 @@ func newTaskQueueManager(
taskGC: newTaskGC(db, taskQueueConfig),
config: taskQueueConfig,
pollerHistory: newPollerHistory(),
outstandingPollsMap: make(map[string]context.CancelFunc),
clusterMeta: clusterMeta,
namespace: nsName,
taggedMetricsHandler: taggedMetricsHandler,
Expand Down Expand Up @@ -433,26 +425,8 @@ func (c *taskQueueManagerImpl) GetTask(
) (*internalTask, error) {
c.liveness.markAlive()

// We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
childCtx, cancel := newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()

pollerID, ok := ctx.Value(pollerIDKey).(string)
if ok && pollerID != "" {
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = cancel
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
delete(c.outstandingPollsMap, pollerID)
c.outstandingPollsLock.Unlock()
}()
}
c.currentPolls.Add(1)
defer c.currentPolls.Add(-1)

identity, ok := ctx.Value(identityKey).(string)
if ok && identity != "" {
Expand All @@ -476,10 +450,10 @@ func (c *taskQueueManagerImpl) GetTask(
c.matcher.UpdateRatelimit(pollMetadata.ratePerSecond)

if !namespaceEntry.ActiveInCluster(c.clusterMeta.GetCurrentClusterName()) {
return c.matcher.PollForQuery(childCtx, pollMetadata)
return c.matcher.PollForQuery(ctx, pollMetadata)
}

task, err := c.matcher.Poll(childCtx, pollMetadata)
task, err := c.matcher.Poll(ctx, pollMetadata)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -558,27 +532,13 @@ func (c *taskQueueManagerImpl) GetAllPollerInfo() []*taskqueuepb.PollerInfo {
}

func (c *taskQueueManagerImpl) HasPollerAfter(accessTime time.Time) bool {
inflightPollerCount := 0
c.outstandingPollsLock.Lock()
inflightPollerCount = len(c.outstandingPollsMap)
c.outstandingPollsLock.Unlock()
if inflightPollerCount > 0 {
if c.currentPolls.Load() > 0 {
return true
}
recentPollers := c.pollerHistory.getPollerInfo(accessTime)
return len(recentPollers) > 0
}

func (c *taskQueueManagerImpl) CancelPoller(pollerID string) {
c.outstandingPollsLock.Lock()
cancel, ok := c.outstandingPollsMap[pollerID]
c.outstandingPollsLock.Unlock()

if ok && cancel != nil {
cancel()
}
}

// DescribeTaskQueue returns information about the target taskqueue, right now this API returns the
// pollers which polled this taskqueue in last few minutes and status of taskqueue's ackManager
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
Expand Down Expand Up @@ -739,6 +699,10 @@ func (c *taskQueueManagerImpl) TaskQueueKind() enumspb.TaskQueueKind {
return c.kind
}

func (c *taskQueueManagerImpl) LongPollExpirationInterval() time.Duration {
return c.config.LongPollExpirationInterval()
}

func (c *taskQueueManagerImpl) callerInfoContext(ctx context.Context) context.Context {
namespace, _ := c.namespaceRegistry.GetNamespaceName(c.taskQueueID.namespaceID)
return headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace.String()))
Expand Down

0 comments on commit cee5ba3

Please sign in to comment.