Skip to content

Commit

Permalink
Delete global namespace (#2867)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 31, 2022
1 parent 00394e9 commit 7af6cda
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 18 deletions.
5 changes: 5 additions & 0 deletions common/namespace/transmissionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package namespace
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
replicationpb "go.temporal.io/api/replication/v1"

Expand Down Expand Up @@ -89,6 +90,10 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
if len(replicationConfig.Clusters) <= 1 {
return nil
}
if info.State == enumspb.NAMESPACE_STATE_DELETED {
// Don't replicate deleted namespace changes.
return nil
}

taskType := enumsspb.REPLICATION_TASK_TYPE_NAMESPACE_TASK
task := &replicationspb.ReplicationTask_NamespaceTaskAttributes{
Expand Down
5 changes: 2 additions & 3 deletions common/xdc/nDCHistoryResender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@ import (
"context"
"time"

"go.temporal.io/server/client"
"go.temporal.io/server/common/persistence/serialization"

commonpb "go.temporal.io/api/common/v1"

"go.temporal.io/server/api/adminservice/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/rpc"
)

Expand Down
4 changes: 2 additions & 2 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (r *nDCHistoryReplicatorImpl) applyEvents(
return r.applyNonStartEventsResetWorkflow(ctx, context, mutableState, task)

default:
// unable to get mutable state, return err so we can retry the task later
// unable to get mutable state, return err, so we can retry the task later
return err
}
}
Expand All @@ -392,7 +392,7 @@ func (r *nDCHistoryReplicatorImpl) applyStartEvents(
context workflow.Context,
releaseFn workflow.ReleaseCacheFunc,
task nDCReplicationTask,
) (retError error) {
) error {

namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(task.getNamespaceID())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (p *ackMgrImpl) processReplication(
return nil, nil
}
return action(msBuilder)
case *serviceerror.NotFound:
case *serviceerror.NotFound, *serviceerror.NamespaceNotFound:
return nil, nil
default:
return nil, err
Expand Down
21 changes: 16 additions & 5 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
if err != nil {
return err
}
if resendInfo.lastEventID != common.EmptyEventID && resendInfo.lastEventVersion != common.EmptyVersion {
if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion {
err = serviceerror.NewInternal("timerQueueStandbyProcessor encountered empty historyResendInfo")
} else {
ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
Expand All @@ -541,12 +543,18 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}

t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName))
tag.ClusterName(remoteClusterName),
tag.Error(err))
}

// NOTE: history resend may take long time and its timeout is currently
Expand All @@ -561,17 +569,20 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
common.EmptyEventID,
common.EmptyVersion,
)
} else {
err = serviceerror.NewInternal("timerQueueStandbyProcessor encountered empty historyResendInfo")
}

if err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}
t.logger.Error("Error re-replicating history from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName))
tag.ClusterName(remoteClusterName),
tag.Error(err))
}

// return error so task processing logic will retry
Expand Down
22 changes: 15 additions & 7 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
if err != nil {
return err
}
if resendInfo.lastEventID != common.EmptyEventID && resendInfo.lastEventVersion != common.EmptyVersion {
if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion {
err = serviceerror.NewInternal("transferQueueStandbyProcessor encountered empty historyResendInfo")
} else {
ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
Expand All @@ -617,12 +619,17 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}
t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName))
tag.ClusterName(remoteClusterName),
tag.Error(err))
}

// NOTE: history resend may take long time and its timeout is currently
Expand All @@ -637,19 +644,20 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
0,
0,
)
} else {
err = serviceerror.NewInternal(
"transferQueueStandbyProcessor encountered empty historyResendInfo",
)
}

if err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}
t.logger.Error("Error re-replicating history from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.SourceCluster(remoteClusterName))
tag.SourceCluster(remoteClusterName),
tag.Error(err))
}

// return error so task processing logic will retry
Expand Down

0 comments on commit 7af6cda

Please sign in to comment.