Skip to content

Commit

Permalink
Fix caller info for migration activities (#4340)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and rodrigozhou committed May 15, 2023
1 parent a97cdef commit 6eba60d
Showing 1 changed file with 44 additions and 1 deletion.
45 changes: 44 additions & 1 deletion service/worker/migration/activities.go
Expand Up @@ -38,14 +38,22 @@ import (
"go.temporal.io/sdk/activity"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
)

// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
// However, activity background context is per-worker, which means once set, all activities processed by the
// worker will use CallerTypePreemptable, including those not related to migration. This is not ideal.
// Using a different task queue and a dedicated worker for migration can solve the issue but requires
// changing all existing tooling around namespace migration to start workflows & activities on the new task queue.
// Another approach is to use separate workers for workflow tasks and activities and keep existing tooling unchanged.

// GetMetadata returns history shard count and namespaceID for requested namespace.
func (a *activities) GetMetadata(ctx context.Context, request metadataRequest) (*metadataResponse, error) {
func (a *activities) GetMetadata(_ context.Context, request metadataRequest) (*metadataResponse, error) {
nsEntry, err := a.namespaceRegistry.GetNamespace(namespace.Name(request.Namespace))
if err != nil {
return nil, err
Expand All @@ -59,6 +67,8 @@ func (a *activities) GetMetadata(ctx context.Context, request metadataRequest) (

// GetMaxReplicationTaskIDs returns max replication task id per shard
func (a *activities) GetMaxReplicationTaskIDs(ctx context.Context) (*replicationStatus, error) {
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)

resp, err := a.historyClient.GetReplicationStatus(ctx, &historyservice.GetReplicationStatusRequest{})
if err != nil {
return nil, err
Expand All @@ -71,6 +81,8 @@ func (a *activities) GetMaxReplicationTaskIDs(ctx context.Context) (*replication
}

func (a *activities) WaitReplication(ctx context.Context, waitRequest waitReplicationRequest) error {
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)

for {
done, err := a.checkReplicationOnce(ctx, waitRequest)
if err != nil {
Expand Down Expand Up @@ -158,6 +170,10 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR
}

func (a *activities) WaitHandover(ctx context.Context, waitRequest waitHandoverRequest) error {
// Use the highest priority caller type for checking handover state
// since during handover state namespace has no availability
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(waitRequest.Namespace, headers.CallerTypeAPI, ""))

for {
done, err := a.checkHandoverOnce(ctx, waitRequest)
if err != nil {
Expand Down Expand Up @@ -257,6 +273,10 @@ func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey d
}

func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRequest) error {
// Use the highest priority caller type for updating namespace config
// since during handover state, namespace has no availability
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(req.Namespace, headers.CallerTypeAPI, ""))

descResp, err := a.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
Namespace: req.Namespace,
})
Expand All @@ -278,6 +298,10 @@ func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRe
}

func (a *activities) UpdateActiveCluster(ctx context.Context, req updateActiveClusterRequest) error {
// Use the highest priority caller type for updating namespace config
// since when both clusters think namespace are standby, namespace has no availability
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(req.Namespace, headers.CallerTypeAPI, ""))

descResp, err := a.frontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{
Namespace: req.Namespace,
})
Expand All @@ -299,6 +323,8 @@ func (a *activities) UpdateActiveCluster(ctx context.Context, req updateActiveCl
}

func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) {
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(request.Namespace, headers.CallerTypePreemptable, ""))

resp, err := a.frontendClient.ListWorkflowExecutions(ctx, request)
if err != nil {
return nil, err
Expand All @@ -321,6 +347,7 @@ func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice
}

func (a *activities) GenerateReplicationTasks(ctx context.Context, request *generateReplicationTasksRequest) error {
ctx = a.setCallerInfoForGenReplicationTask(ctx, namespace.ID(request.NamespaceID))
rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))

startIndex := 0
Expand All @@ -346,3 +373,19 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene

return nil
}

func (a *activities) setCallerInfoForGenReplicationTask(
ctx context.Context,
namespaceID namespace.ID,
) context.Context {

nsName, err := a.namespaceRegistry.GetNamespaceName(namespaceID)
if err != nil {
a.logger.Error("Failed to get namespace name when generating replication task",
tag.WorkflowNamespaceID(namespaceID.String()),
tag.Error(err),
)
nsName = namespace.EmptyName
}
return headers.SetCallerInfo(ctx, headers.NewPreemptableCallerInfo(nsName.String()))
}

0 comments on commit 6eba60d

Please sign in to comment.