Skip to content

Commit

Permalink
Merge branch 'master' into unused-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
ketsiambaku committed May 19, 2023
2 parents 527c831 + 5f39543 commit 84a279e
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 35 deletions.
8 changes: 4 additions & 4 deletions common/cluster/metadata.go
Expand Up @@ -154,16 +154,16 @@ func (m Metadata) GetRemoteClusterInfo() map[string]config.ClusterInformation {
}

// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (m Metadata) ClusterNameForFailoverVersion(failoverVersion int64) string {
func (m Metadata) ClusterNameForFailoverVersion(failoverVersion int64) (string, error) {
if failoverVersion == common.EmptyVersion {
return m.currentClusterName
return m.currentClusterName, nil
}
server, err := m.resolveServerName(failoverVersion)
if err != nil {
m.metrics.IncCounter(metrics.ClusterMetadataResolvingFailoverVersionCounter)
panic(fmt.Sprintf("failed to resolve failover version: %v", err))
return "", fmt.Errorf("failed to resolve failover version: %v", err)
}
return server
return server, nil
}

// gets the initial failover version for a cluster / domain
Expand Down
8 changes: 6 additions & 2 deletions common/cluster/metadata_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"testing/quick"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/loggerimpl"
Expand Down Expand Up @@ -822,7 +823,8 @@ func TestServerResolution(t *testing.T) {
nextFailoverVersion = fo
}
// do a round-trip
clusterNameResolved := impl.ClusterNameForFailoverVersion(nextFailoverVersion)
clusterNameResolved, err := impl.ClusterNameForFailoverVersion(nextFailoverVersion)
require.NoError(t, err)
return clusterName1 == clusterNameResolved
}, &quick.Config{})
assert.NoError(t, err)
Expand Down Expand Up @@ -937,7 +939,9 @@ func TestFailoverVersionResolution(t *testing.T) {

for name, td := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, td.expectedOut, sut.ClusterNameForFailoverVersion(td.in))
out, err := sut.ClusterNameForFailoverVersion(td.in)
assert.NoError(t, err)
assert.Equal(t, td.expectedOut, out)
})
}
}
12 changes: 12 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -915,3 +915,15 @@ func VisibilityQuery(query string) Tag {
func Dynamic(key string, v interface{}) Tag {
return newPredefinedDynamicTag(key, v)
}

func IsolationGroup(group string) Tag {
return newStringTag("isolation-group", group)
}

func PartitionConfig(p map[string]string) Tag {
return newObjectTag("partition-config", p)
}

func PollerGroups(pollers []string) Tag {
return newObjectTag("poller-isolation-groups", pollers)
}
11 changes: 11 additions & 0 deletions common/task/parallelTaskProcessor.go
Expand Up @@ -23,10 +23,13 @@ package task
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/common/log/tag"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down Expand Up @@ -152,6 +155,14 @@ func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struc
sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
defer sw.Stop()

defer func() {
if r := recover(); r != nil {
p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
task.HandleErr(fmt.Errorf("recovered panic: %v", r))
task.Nack()
}
}()

op := func() error {
if err := task.Execute(); err != nil {
return task.HandleErr(err)
Expand Down
18 changes: 18 additions & 0 deletions common/task/parallelTaskProcessor_test.go
Expand Up @@ -307,3 +307,21 @@ func (s *parallelTaskProcessorSuite) TestProcessorContract() {
s.NotEqual(TaskStatePending, status)
}
}

func (s *parallelTaskProcessorSuite) TestExecuteTask_PanicHandling() {
mockTask := NewMockTask(s.controller)
mockTask.EXPECT().Execute().Do(func() {
panic("A panic occurred")
})
mockTask.EXPECT().HandleErr(gomock.Any()).Return(errRetryable).AnyTimes()
mockTask.EXPECT().Nack().Times(1)
done := make(chan struct{})
workerShutdownCh := make(chan struct{})
go func() {
s.processor.executeTask(mockTask, workerShutdownCh)
close(done)
}()
time.Sleep(100 * time.Millisecond)
close(workerShutdownCh)
<-done
}
25 changes: 20 additions & 5 deletions service/history/execution/mutable_state_builder.go
Expand Up @@ -4347,7 +4347,10 @@ func (e *mutableStateBuilder) eventsToReplicationTask(
lastEvent := events[len(events)-1]
version := firstEvent.Version

sourceCluster := e.clusterMetadata.ClusterNameForFailoverVersion(version)
sourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(version)
if err != nil {
return nil, err
}
currentCluster := e.clusterMetadata.GetCurrentClusterName()

if currentCluster != sourceCluster {
Expand Down Expand Up @@ -4508,8 +4511,14 @@ func (e *mutableStateBuilder) startTransactionHandleDecisionFailover(
)}
}

lastWriteSourceCluster := e.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
currentVersionCluster := e.clusterMetadata.ClusterNameForFailoverVersion(currentVersion)
lastWriteSourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
if err != nil {
return false, err
}
currentVersionCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(currentVersion)
if err != nil {
return false, err
}
currentCluster := e.clusterMetadata.GetCurrentClusterName()

// there are 4 cases for version changes (based on version from domain cache)
Expand All @@ -4524,7 +4533,10 @@ func (e *mutableStateBuilder) startTransactionHandleDecisionFailover(
// is missing and the missing history replicate back from remote cluster via resending approach => nothing to do

// handle case 5
incomingTaskSourceCluster := e.clusterMetadata.ClusterNameForFailoverVersion(incomingTaskVersion)
incomingTaskSourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(incomingTaskVersion)
if err != nil {
return false, err
}
if incomingTaskVersion != common.EmptyVersion &&
currentVersionCluster != currentCluster &&
incomingTaskSourceCluster == currentCluster {
Expand Down Expand Up @@ -4588,7 +4600,10 @@ func (e *mutableStateBuilder) closeTransactionWithPolicyCheck(
return nil
}

activeCluster := e.clusterMetadata.ClusterNameForFailoverVersion(e.GetCurrentVersion())
activeCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(e.GetCurrentVersion())
if err != nil {
return err
}
currentCluster := e.clusterMetadata.GetCurrentClusterName()

if activeCluster != currentCluster {
Expand Down
10 changes: 8 additions & 2 deletions service/history/execution/workflow.go
Expand Up @@ -182,7 +182,10 @@ func (r *workflowImpl) SuppressBy(
return TransactionPolicyPassive, nil
}

lastWriteCluster := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
lastWriteCluster, err := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
if err != nil {
return TransactionPolicyActive, err
}
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
Expand All @@ -206,7 +209,10 @@ func (r *workflowImpl) FlushBufferedEvents() error {
return err
}

lastWriteCluster := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
lastWriteCluster, err := r.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion)
if err != nil {
return err
}
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if lastWriteCluster != currentCluster {
Expand Down
25 changes: 19 additions & 6 deletions service/history/historyEngine.go
Expand Up @@ -498,12 +498,17 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName
previousFailoverVersion := nextDomain.GetPreviousFailoverVersion()
previousClusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion)
if err != nil {
e.logger.Error("Failed to handle graceful failover", tag.WorkflowDomainID(nextDomain.GetInfo().ID))
continue
}

if nextDomain.IsGlobalDomain() &&
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster != e.currentClusterName &&
previousFailoverVersion != common.InitialPreviousFailoverVersion &&
e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion) == e.currentClusterName {
previousClusterName == e.currentClusterName {
// the visibility timestamp will be set in shard context
failoverMarkerTasks = append(failoverMarkerTasks, &persistence.FailoverMarkerTask{
Version: nextDomain.GetFailoverVersion(),
Expand Down Expand Up @@ -599,10 +604,14 @@ func (e *historyEngineImpl) newDomainNotActiveError(
failoverVersion int64,
) error {
clusterMetadata := e.shard.GetService().GetClusterMetadata()
clusterName, err := clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
if err != nil {
clusterName = "_unknown_"
}
return ce.NewDomainNotActiveError(
domainName,
clusterMetadata.GetCurrentClusterName(),
clusterMetadata.ClusterNameForFailoverVersion(failoverVersion),
clusterName,
)
}

Expand Down Expand Up @@ -2995,8 +3004,10 @@ func (e *historyEngineImpl) NotifyNewTransferTasks(

if len(info.Tasks) > 0 {
task := info.Tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.txProcessor.NotifyNewTask(clusterName, info)
clusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
if err == nil {
e.txProcessor.NotifyNewTask(clusterName, info)
}
}
}

Expand All @@ -3006,8 +3017,10 @@ func (e *historyEngineImpl) NotifyNewTimerTasks(

if len(info.Tasks) > 0 {
task := info.Tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTask(clusterName, info)
clusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
if err == nil {
e.timerProcessor.NotifyNewTask(clusterName, info)
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion service/history/ndc/replication_task.go
Expand Up @@ -117,7 +117,10 @@ func newReplicationTask(
lastEvent := events[len(events)-1]
version := firstEvent.Version

sourceCluster := clusterMetadata.ClusterNameForFailoverVersion(version)
sourceCluster, err := clusterMetadata.ClusterNameForFailoverVersion(version)
if err != nil {
return nil, err
}

eventTime := int64(0)
for _, event := range events {
Expand Down
5 changes: 4 additions & 1 deletion service/history/ndc/transaction_manager.go
Expand Up @@ -275,9 +275,12 @@ func (r *transactionManagerImpl) backfillWorkflowEventsReapply(
return 0, execution.TransactionPolicyActive, err
}
isWorkflowRunning := targetWorkflow.GetMutableState().IsWorkflowExecutionRunning()
targetWorkflowActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(
targetWorkflowActiveCluster, err := r.clusterMetadata.ClusterNameForFailoverVersion(
targetWorkflow.GetMutableState().GetDomainEntry().GetFailoverVersion(),
)
if err != nil {
return 0, execution.TransactionPolicyActive, err
}
currentCluster := r.clusterMetadata.GetCurrentClusterName()
isActiveCluster := targetWorkflowActiveCluster == currentCluster

Expand Down
5 changes: 4 additions & 1 deletion service/history/replication/task_processor.go
Expand Up @@ -582,7 +582,10 @@ func (p *taskProcessorImpl) triggerDataInconsistencyScan(replicationTask *types.
default:
return nil
}
clusterName := p.shard.GetClusterMetadata().ClusterNameForFailoverVersion(failoverVersion)
clusterName, err := p.shard.GetClusterMetadata().ClusterNameForFailoverVersion(failoverVersion)
if err != nil {
return err
}
client := p.shard.GetService().GetClientBean().GetRemoteFrontendClient(clusterName)
fixExecution := entity.Execution{
DomainID: domainID,
Expand Down
7 changes: 4 additions & 3 deletions service/matching/taskListManager.go
Expand Up @@ -581,9 +581,9 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task
// Not all poller information are available at the time of task list manager creation,
// because we don't persist poller information in database, so in the first minute, we always assume
// pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group.
// Besides, for sticky tasklists, not all poller information are available, we also use all isolation group.
if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky {
pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-time.Minute))
// Besides, for sticky and scalable tasklists, not all poller information are available, we also use all isolation group.
if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky && c.taskListID.IsRoot() {
pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Time{}) // the lookback window must be larger than the timeout of poller requests (2 mins), otherwise we don't get all pollers
if len(pollerIsolationGroups) == 0 {
// we don't have any pollers, use all isolation groups and wait for pollers' arriving
pollerIsolationGroups = c.config.AllIsolationGroups
Expand All @@ -599,6 +599,7 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task
c.logger.Error("Failed to get isolation group from partition library", tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err))
return "", nil
}
c.logger.Debug("get isolation group", tag.PollerGroups(pollerIsolationGroups), tag.IsolationGroup(group), tag.PartitionConfig(partitionConfig))
// For a sticky tasklist, it is possible that when an isolation group is undrained, the tasks from one workflow is reassigned
// to the isolation group undrained. If there is no poller from the isolation group, we should return StickyUnavailableError
// to let the task to be re-enqueued to the non-sticky tasklist. If there is poller, just return an empty isolation group, because
Expand Down
34 changes: 24 additions & 10 deletions service/matching/taskReader.go
Expand Up @@ -29,6 +29,8 @@ import (
"time"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
Expand All @@ -41,15 +43,17 @@ var epochStartTime = time.Unix(0, 0)

type (
taskReader struct {
taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence
notifyC chan struct{} // Used as signal to notify pump of new tasks
tlMgr *taskListManagerImpl
taskListID *taskListID
config *taskListConfig
db *taskListDB
taskWriter *taskWriter
taskGC *taskGC
taskAckManager messaging.AckManager
taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence
notifyC chan struct{} // Used as signal to notify pump of new tasks
tlMgr *taskListManagerImpl
taskListID *taskListID
config *taskListConfig
db *taskListDB
taskWriter *taskWriter
taskGC *taskGC
taskAckManager messaging.AckManager
domainCache cache.DomainCache
clusterMetadata cluster.Metadata
// The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
// approach is to use request-scoped contexts and use a unique one for each call to Wait. However
// in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
Expand Down Expand Up @@ -84,6 +88,8 @@ func newTaskReader(tlMgr *taskListManagerImpl) *taskReader {
// we always dequeue the head of the buffer and try to dispatch it to a poller
// so allocate one less than desired target buffer size
taskBuffer: make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1),
domainCache: tlMgr.domainCache,
clusterMetadata: tlMgr.clusterMetadata,
logger: tlMgr.logger,
scope: tlMgr.scope,
handleErr: tlMgr.handleErr,
Expand Down Expand Up @@ -361,7 +367,15 @@ func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {

func (tr *taskReader) newDispatchContext(isolationGroup string) (context.Context, context.CancelFunc) {
if isolationGroup != "" {
return context.WithTimeout(tr.cancelCtx, tr.config.AsyncTaskDispatchTimeout())
domainEntry, err := tr.domainCache.GetDomainByID(tr.taskListID.domainID)
if err != nil {
// we don't know if the domain is active in the current cluster, assume it is active and set the timeout
return context.WithTimeout(tr.cancelCtx, tr.config.AsyncTaskDispatchTimeout())
}
if _, err := domainEntry.IsActiveIn(tr.clusterMetadata.GetCurrentClusterName()); err == nil {
// if the domain is active in the current cluster, set the timeout
return context.WithTimeout(tr.cancelCtx, tr.config.AsyncTaskDispatchTimeout())
}
}
return tr.cancelCtx, func() {}
}

0 comments on commit 84a279e

Please sign in to comment.