Skip to content

Commit

Permalink
Persistence Context Part 3: Namespace Replication Queue (#2638)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 23, 2022
1 parent a0c9001 commit e60d6bd
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 157 deletions.
13 changes: 9 additions & 4 deletions common/namespace/dlqMessageHandler.go
Expand Up @@ -73,12 +73,13 @@ func (d *dlqMessageHandlerImpl) Read(
pageToken []byte,
) ([]*replicationspb.ReplicationTask, []byte, error) {

ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel()
ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx)
if err != nil {
return nil, nil, err
}

return d.namespaceReplicationQueue.GetMessagesFromDLQ(
ctx,
ackLevel,
lastMessageID,
pageSize,
Expand All @@ -92,19 +93,21 @@ func (d *dlqMessageHandlerImpl) Purge(
lastMessageID int64,
) error {

ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel()
ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx)
if err != nil {
return err
}

if err := d.namespaceReplicationQueue.RangeDeleteMessagesFromDLQ(
ctx,
ackLevel,
lastMessageID,
); err != nil {
return err
}

if err := d.namespaceReplicationQueue.UpdateDLQAckLevel(
ctx,
lastMessageID,
); err != nil {
d.logger.Error("Failed to update DLQ ack level after purging messages", tag.Error(err))
Expand All @@ -121,12 +124,13 @@ func (d *dlqMessageHandlerImpl) Merge(
pageToken []byte,
) ([]byte, error) {

ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel()
ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx)
if err != nil {
return nil, err
}

messages, token, err := d.namespaceReplicationQueue.GetMessagesFromDLQ(
ctx,
ackLevel,
lastMessageID,
pageSize,
Expand All @@ -153,13 +157,14 @@ func (d *dlqMessageHandlerImpl) Merge(
}

if err := d.namespaceReplicationQueue.RangeDeleteMessagesFromDLQ(
ctx,
ackLevel,
ackedMessageID,
); err != nil {
d.logger.Error("failed to delete merged tasks on merging namespace DLQ message", tag.Error(err))
return nil, err
}
if err := d.namespaceReplicationQueue.UpdateDLQAckLevel(ackedMessageID); err != nil {
if err := d.namespaceReplicationQueue.UpdateDLQAckLevel(ctx, ackedMessageID); err != nil {
d.logger.Error("failed to update ack level on merging namespace DLQ message", tag.Error(err))
}

Expand Down
72 changes: 36 additions & 36 deletions common/namespace/dlqMessageHandler_test.go
Expand Up @@ -95,8 +95,8 @@ func (s *dlqMessageHandlerSuite) TestReadMessages() {
SourceTaskId: 1,
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(tasks, nil, nil)

resp, token, err := s.dlqMessageHandler.Read(context.Background(), lastMessageID, pageSize, pageToken)
Expand All @@ -118,8 +118,8 @@ func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnGetDLQAckLevel() {
},
}
testError := fmt.Errorf("test")
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(tasks, nil, nil).Times(0)

_, _, err := s.dlqMessageHandler.Read(context.Background(), lastMessageID, pageSize, pageToken)
Expand All @@ -134,8 +134,8 @@ func (s *dlqMessageHandlerSuite) TestReadMessages_ThrowErrorOnReadMessages() {
pageToken := []byte{}

testError := fmt.Errorf("test")
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(nil, nil, testError)

_, _, err := s.dlqMessageHandler.Read(context.Background(), lastMessageID, pageSize, pageToken)
Expand All @@ -147,9 +147,9 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages() {
ackLevel := int64(10)
lastMessageID := int64(20)

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, lastMessageID).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(lastMessageID).Return(nil)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), lastMessageID).Return(nil)
err := s.dlqMessageHandler.Purge(context.Background(), lastMessageID)

s.NoError(err)
Expand All @@ -159,9 +159,9 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnGetDLQAckLevel()
lastMessageID := int64(20)
testError := fmt.Errorf("test")

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Any()).Return(nil).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), gomock.Any()).Times(0)
err := s.dlqMessageHandler.Purge(context.Background(), lastMessageID)

s.Equal(testError, err)
Expand All @@ -172,9 +172,9 @@ func (s *dlqMessageHandlerSuite) TestPurgeMessages_ThrowErrorOnPurgeMessages() {
lastMessageID := int64(20)
testError := fmt.Errorf("test")

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, lastMessageID).Return(testError)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID).Return(testError)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), gomock.Any()).Times(0)
err := s.dlqMessageHandler.Purge(context.Background(), lastMessageID)

s.Equal(testError, err)
Expand All @@ -200,12 +200,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages() {
},
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(tasks, nil, nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(messageID).Return(nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, messageID).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), messageID).Return(nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), ackLevel, messageID).Return(nil)

token, err := s.dlqMessageHandler.Merge(context.Background(), lastMessageID, pageSize, pageToken)
s.NoError(err)
Expand All @@ -231,12 +231,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQAckLevel()
},
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(int64(-1), testError)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(tasks, nil, nil).Times(0)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().DeleteMessageFromDLQ(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), gomock.Any()).Times(0)

token, err := s.dlqMessageHandler.Merge(context.Background(), lastMessageID, pageSize, pageToken)
s.Equal(testError, err)
Expand All @@ -250,12 +250,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnGetDLQMessages()
pageToken := []byte{}
testError := fmt.Errorf("test")

s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(nil, nil, testError)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().DeleteMessageFromDLQ(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Any()).Times(0)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), gomock.Any()).Times(0)

token, err := s.dlqMessageHandler.Merge(context.Background(), lastMessageID, pageSize, pageToken)
s.Equal(testError, err)
Expand Down Expand Up @@ -292,8 +292,8 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnHandleReceivingTa
},
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(tasks, nil, nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute1).Return(nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute2).Return(testError)
Expand Down Expand Up @@ -333,12 +333,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_ThrowErrorOnDeleteMessages()
},
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(tasks, nil, nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute1).Return(nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute2).Return(nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, messageID2).Return(testError)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), ackLevel, messageID2).Return(testError)

token, err := s.dlqMessageHandler.Merge(context.Background(), lastMessageID, pageSize, pageToken)
s.Error(err)
Expand All @@ -365,12 +365,12 @@ func (s *dlqMessageHandlerSuite) TestMergeMessages_IgnoreErrorOnUpdateDLQAckLeve
},
},
}
s.mockReplicationQueue.EXPECT().GetDLQAckLevel().Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(ackLevel, lastMessageID, pageSize, pageToken).
s.mockReplicationQueue.EXPECT().GetDLQAckLevel(gomock.Any()).Return(ackLevel, nil)
s.mockReplicationQueue.EXPECT().GetMessagesFromDLQ(gomock.Any(), ackLevel, lastMessageID, pageSize, pageToken).
Return(tasks, nil, nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), namespaceAttribute).Return(nil)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(ackLevel, messageID).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(messageID).Return(testError)
s.mockReplicationQueue.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), ackLevel, messageID).Return(nil)
s.mockReplicationQueue.EXPECT().UpdateDLQAckLevel(gomock.Any(), messageID).Return(testError)

token, err := s.dlqMessageHandler.Merge(context.Background(), lastMessageID, pageSize, pageToken)
s.NoError(err)
Expand Down
13 changes: 11 additions & 2 deletions common/namespace/handler.go
Expand Up @@ -268,6 +268,7 @@ func (d *HandlerImpl) RegisterNamespace(
}

err = d.namespaceReplicator.HandleTransmissionTask(
ctx,
enumsspb.NAMESPACE_OPERATION_CREATE,
namespaceRequest.Namespace.Info,
namespaceRequest.Namespace.Config,
Expand Down Expand Up @@ -570,8 +571,16 @@ func (d *HandlerImpl) UpdateNamespace(
}
}

err = d.namespaceReplicator.HandleTransmissionTask(enumsspb.NAMESPACE_OPERATION_UPDATE,
info, config, replicationConfig, configVersion, failoverVersion, isGlobalNamespace)
err = d.namespaceReplicator.HandleTransmissionTask(
ctx,
enumsspb.NAMESPACE_OPERATION_UPDATE,
info,
config,
replicationConfig,
configVersion,
failoverVersion,
isGlobalNamespace,
)
if err != nil {
return nil, err
}
Expand Down
Expand Up @@ -441,7 +441,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledMasterClusterSuite) TestRegisterG
})
}

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil).Times(0)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil).Times(0)

retention := 1 * time.Hour * 24
registerResp, err := s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledMasterClusterSuite) TestRegisterG
data := map[string]string{"some random key": "some random value"}
isGlobalNamespace := true

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)

registerResp, err := s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand Down Expand Up @@ -569,7 +569,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledMasterClusterSuite) TestUpdateGet
s.True(len(clusters) > 1)
isGlobalNamespace := true

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil).Times(2)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil).Times(2)

registerResp, err := s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledMasterClusterSuite) TestUpdateGet
s.True(len(clusters) > 1)
isGlobalNamespace := true

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil).Times(2)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil).Times(2)

registerResp, err := s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand Down Expand Up @@ -761,7 +761,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledMasterClusterSuite) TestUpdateGet
s.True(len(clusters) > 1)
isGlobalNamespace := true

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil).Times(2)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil).Times(2)

registerResp, err := s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Expand Down
Expand Up @@ -519,7 +519,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledNotMasterClusterSuite) TestUpdate
})
s.NoError(err)

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)
resp, err := s.handler.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
})
Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledNotMasterClusterSuite) TestUpdate
})
s.NoError(err)

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)
updateResp, err := s.handler.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
UpdateInfo: &namespacepb.UpdateNamespaceInfo{
Expand Down Expand Up @@ -686,7 +686,7 @@ func (s *namespaceHandlerGlobalNamespaceEnabledNotMasterClusterSuite) TestUpdate
s.Equal(isGlobalNamespace, isGlobalNamespace)
}

s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)

updateResp, err := s.handler.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
Expand Down
2 changes: 1 addition & 1 deletion common/namespace/handler_test.go
Expand Up @@ -295,7 +295,7 @@ func (s *namespaceHandlerCommonSuite) TestListNamespace() {
ClusterName: clusterName,
})
}
s.mockProducer.EXPECT().Publish(gomock.Any()).Return(nil)
s.mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil)
registerResp, err = s.handler.RegisterNamespace(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: namespace2,
Description: description2,
Expand Down
5 changes: 5 additions & 0 deletions common/namespace/transmissionTaskHandler.go
Expand Up @@ -25,6 +25,8 @@
package namespace

import (
"context"

namespacepb "go.temporal.io/api/namespace/v1"
replicationpb "go.temporal.io/api/replication/v1"

Expand All @@ -41,6 +43,7 @@ type (
// Replicator is the interface which can replicate the namespace
Replicator interface {
HandleTransmissionTask(
ctx context.Context,
namespaceOperation enumsspb.NamespaceOperation,
info *persistencespb.NamespaceInfo,
config *persistencespb.NamespaceConfig,
Expand Down Expand Up @@ -70,6 +73,7 @@ func NewNamespaceReplicator(

// HandleTransmissionTask handle transmission of the namespace replication task
func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
ctx context.Context,
namespaceOperation enumsspb.NamespaceOperation,
info *persistencespb.NamespaceInfo,
config *persistencespb.NamespaceConfig,
Expand Down Expand Up @@ -116,6 +120,7 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask(
}

return namespaceReplicator.namespaceReplicationQueue.Publish(
ctx,
&replicationspb.ReplicationTask{
TaskType: taskType,
Attributes: task,
Expand Down

0 comments on commit e60d6bd

Please sign in to comment.