Skip to content

Commit

Permalink
Implement HistoryTaskQueueManager.DeleteTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 29, 2023
1 parent fb01307 commit b68ec1f
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Expand Up @@ -105,7 +105,7 @@ issues:
text: "(cyclomatic|cognitive)"
linters:
- revive
- path: _test\.go
- path: _test\.go|^common/persistence\/tests\/.+\.go
linters:
- goerr113
- path: ^tools\/.+\.go
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/data_interfaces.go
Expand Up @@ -1219,6 +1219,7 @@ type (
ReadTasks(ctx context.Context, request *ReadTasksRequest) (*ReadTasksResponse, error)
// CreateQueue must return an ErrQueueAlreadyExists if the queue already exists.
CreateQueue(ctx context.Context, request *CreateQueueRequest) (*CreateQueueResponse, error)
DeleteTasks(ctx context.Context, request *DeleteTasksRequest) (*DeleteTasksResponse, error)
}

HistoryTaskQueueManagerImpl struct {
Expand Down Expand Up @@ -1283,6 +1284,15 @@ type (

CreateQueueResponse struct {
}

DeleteTasksRequest struct {
QueueKey QueueKey
InclusiveMaxMessageMetadata MessageMetadata
}

DeleteTasksResponse struct {
// empty
}
)

func (e *InvalidPersistenceRequestError) Error() string {
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/data_interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion common/persistence/history_task_queue_manager.go
Expand Up @@ -188,7 +188,10 @@ func (m *HistoryTaskQueueManagerImpl) ReadTasks(ctx context.Context, request *Re
}, nil
}

func (m *HistoryTaskQueueManagerImpl) CreateQueue(ctx context.Context, request *CreateQueueRequest) (*CreateQueueResponse, error) {
func (m *HistoryTaskQueueManagerImpl) CreateQueue(
ctx context.Context,
request *CreateQueueRequest,
) (*CreateQueueResponse, error) {
_, err := m.queue.CreateQueue(ctx, &InternalCreateQueueRequest{
QueueType: request.QueueKey.QueueType,
QueueName: request.QueueKey.GetQueueName(),
Expand All @@ -199,6 +202,20 @@ func (m *HistoryTaskQueueManagerImpl) CreateQueue(ctx context.Context, request *
return &CreateQueueResponse{}, nil
}

func (m *HistoryTaskQueueManagerImpl) DeleteTasks(
ctx context.Context,
request *DeleteTasksRequest,
) (*DeleteTasksResponse, error) {
_, err := m.queue.RangeDeleteMessages(ctx, &InternalRangeDeleteMessagesRequest{
QueueType: request.QueueKey.QueueType,
QueueName: request.QueueKey.GetQueueName(),
})
if err != nil {
return nil, err
}
return &DeleteTasksResponse{}, nil
}

// combineUnique combines the given strings into a single string by hashing the length of each string and the string
// itself. This is used to generate a unique suffix for the queue name.
func combineUnique(strs ...string) string {
Expand Down
62 changes: 0 additions & 62 deletions common/persistence/history_task_queue_manager_test.go
Expand Up @@ -167,65 +167,3 @@ func TestHistoryTaskQueueManager_ReadTasks_NonPositivePageSize(t *testing.T) {
"ErrReadTasksNonPositivePageSize when the request's page size is: "+strconv.Itoa(pageSize))
}
}

// failingQueue is a QueueV2 implementation that always returns an error.
type failingQueue struct{}

func (q failingQueue) EnqueueMessage(
context.Context,
*persistence.InternalEnqueueMessageRequest,
) (*persistence.InternalEnqueueMessageResponse, error) {
return nil, assert.AnError
}

func (q failingQueue) ReadMessages(
context.Context,
*persistence.InternalReadMessagesRequest,
) (*persistence.InternalReadMessagesResponse, error) {
return nil, assert.AnError
}

func (q failingQueue) CreateQueue(
context.Context,
*persistence.InternalCreateQueueRequest,
) (*persistence.InternalCreateQueueResponse, error) {
return nil, assert.AnError
}

func (q failingQueue) RangeDeleteMessages(
context.Context,
*persistence.InternalRangeDeleteMessagesRequest,
) (*persistence.InternalRangeDeleteMessagesResponse, error) {
return nil, assert.AnError
}

func TestHistoryTaskQueueManager_ReadTasks_ErrReadQueueMessages(t *testing.T) {
t.Parallel()

m := persistence.NewHistoryTaskQueueManager(failingQueue{}, 1)
_, err := m.ReadTasks(context.Background(), &persistence.ReadTasksRequest{
QueueKey: persistence.QueueKey{
Category: tasks.CategoryTransfer,
},
PageSize: 1,
})
assert.ErrorIs(t, err, assert.AnError, "ReadTasks should propagate errors from ReadMessages")
}

func TestHistoryTaskQueueManager_ReadTasks_ErrEnqueueMessage(t *testing.T) {
t.Parallel()

m := persistence.NewHistoryTaskQueueManager(failingQueue{}, 1)
_, err := m.EnqueueTask(context.Background(), &persistence.EnqueueTaskRequest{
Task: &tasks.WorkflowTask{},
})
assert.ErrorIs(t, err, assert.AnError, "EnqueueTask should propagate errors from EnqueueMessage")
}

func TestHistoryTaskQueueManager_CreateQueue(t *testing.T) {
t.Parallel()

m := persistence.NewHistoryTaskQueueManager(failingQueue{}, 1)
_, err := m.CreateQueue(context.Background(), &persistence.CreateQueueRequest{})
assert.ErrorIs(t, err, assert.AnError, "CreateQueue should propagate errors from the persistence layer")
}

0 comments on commit b68ec1f

Please sign in to comment.