Skip to content

Commit

Permalink
Add history service API for reading DLQ tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 7, 2023
1 parent 741335b commit 781c261
Show file tree
Hide file tree
Showing 41 changed files with 3,411 additions and 572 deletions.
2,542 changes: 2,129 additions & 413 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

219 changes: 128 additions & 91 deletions api/historyservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions api/historyservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/history/client.go
Expand Up @@ -252,6 +252,21 @@ func (c *clientImpl) StreamWorkflowReplicationMessages(
)
}

// GetDLQTasks doesn't need redirects or routing because DLQ tasks are not sharded, so it just picks any available host
// in the connection pool (or creates one) and forwards the request to it.
func (c *clientImpl) GetDLQTasks(
ctx context.Context,
in *historyservice.GetDLQTasksRequest,
opts ...grpc.CallOption,
) (*historyservice.GetDLQTasksResponse, error) {
conn, _, err := c.connections.getAnyClientConn()
if err != nil {
msg := fmt.Sprintf("Failed to get a history host to send GetDLQTasks to. Error: %v", err)
return nil, serviceerror.NewUnavailable(msg)
}
return conn.historyClient.GetDLQTasks(ctx, in, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, c.timeout)
}
Expand Down
62 changes: 62 additions & 0 deletions client/history/client_test.go
@@ -0,0 +1,62 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history_test

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client/history"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
)

func TestGetDLQTasks_ErrNoHosts(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
serviceResolver := membership.NewMockServiceResolver(ctrl)
serviceResolver.EXPECT().Members().Return([]membership.HostInfo{})
client := history.NewClient(
dynamicconfig.NewNoopCollection(),
serviceResolver,
log.NewTestLogger(),
1,
nil,
time.Duration(0),
)
_, err := client.GetDLQTasks(context.Background(), &historyservice.GetDLQTasksRequest{})
var unavailableErr *serviceerror.Unavailable
require.ErrorAs(t, err, &unavailableErr, "Should return an 'Unavailable' error when there are no"+
" history hosts available to serve the request")
assert.ErrorContains(t, err, history.ErrNoHosts.Error())
}
26 changes: 26 additions & 0 deletions client/history/connections.go
Expand Up @@ -27,6 +27,7 @@
package history

import (
"errors"
"sync"

"google.golang.org/grpc"
Expand Down Expand Up @@ -57,10 +58,18 @@ type (
connectionPool interface {
getOrCreateClientConn(addr rpcAddress) clientConnection
getAllClientConns() []clientConnection
// getAnyClientConn returns a random connection from the pool. If the pool is empty, it creates a connection to
// the first host in the membership ring. If the membership ring is empty, it returns ErrNoHosts. The second
// return value indicates whether the connection is newly created.
getAnyClientConn() (clientConnection, bool, error)
resetConnectBackoff(clientConnection)
}
)

var (
ErrNoHosts = errors.New("no history hosts available to serve request")
)

func newConnectionPool(
historyServiceResolver membership.ServiceResolver,
rpcFactory common.RPCFactory,
Expand Down Expand Up @@ -110,6 +119,23 @@ func (c *connectionPoolImpl) getAllClientConns() []clientConnection {
return clientConns
}

func (c *connectionPoolImpl) getAnyClientConn() (clientConnection, bool, error) {
c.mu.RLock()

for _, conn := range c.mu.conns {
c.mu.RUnlock()
return conn, false, nil
}
c.mu.RUnlock()

members := c.historyServiceResolver.Members()
if len(members) == 0 {
return clientConnection{}, false, ErrNoHosts
}
conn := c.getOrCreateClientConn(rpcAddress(members[0].GetAddress()))
return conn, true, nil
}

func (c *connectionPoolImpl) resetConnectBackoff(cc clientConnection) {
cc.grpcConn.ResetConnectBackoff()
}
16 changes: 16 additions & 0 deletions client/history/connections_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

166 changes: 166 additions & 0 deletions client/history/historytest/clienttest.go
@@ -0,0 +1,166 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package historytest

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client/history"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/internal/nettest"
historyserver "go.temporal.io/server/service/history"
"go.temporal.io/server/service/history/tasks"
"google.golang.org/grpc"
)

// fakeTracerProvider is needed to construct a [historyserver.Handler] object.
type fakeTracerProvider struct{}

func (f fakeTracerProvider) Tracer(string, ...trace.TracerOption) trace.Tracer {
return nil
}

// TestClientGetDLQTasks is a library test function which tests [history.NewClient] given a history task queue manager.
// This is not a test function itself because we construct database clients in another package, which will in turn call
// this function, but we don't want to put the testing logic there because it's not specific to any database, but it is
// specific to the [history] package.
//
// This test works by doing the following:
// 1. Enqueue some tasks
// 2. Start a server which serves the GetDLQTasks endpoint
// 3. Create a client which connects to the server
// 4. Use the client to read the tasks
func TestClientGetDLQTasks(t *testing.T, historyTaskQueueManager *persistence.HistoryTaskQueueManager) {
ctrl := gomock.NewController(t)

// Note that it is important to include the test name in the cluster name to ensure that the generated queue name is
// unique across tests. That way, we can run many queue tests without any risk of queue name collisions.
sourceCluster := "source-cluster-" + t.Name()
targetCluster := "target-cluster-" + t.Name()
numTasks := 2

enqueueTasks(t, historyTaskQueueManager, numTasks, sourceCluster, targetCluster)

listener := nettest.NewListener(nettest.NewPipe())

serveErrs := make(chan error, 1)
grpcServer := createServer(historyTaskQueueManager)
go func() {
serveErrs <- grpcServer.Serve(listener)
}()

client := createClient(ctrl, listener)

readTasks(t, numTasks, client, sourceCluster, targetCluster)

grpcServer.GracefulStop()
assert.NoError(t, <-serveErrs)
}

func readTasks(t *testing.T, numTasks int, client historyservice.HistoryServiceClient, sourceCluster string, targetCluster string) {
t.Helper()

var nextPageToken []byte

// We want to run a test where the client makes multiple requests to the server because the client is stateful. In
// particular, the first request here should establish a connection, and the next one should reuse that connection.
for i := 0; i < numTasks; i++ {
res, err := client.GetDLQTasks(context.Background(), &historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: enums.TASK_CATEGORY_TRANSFER,
SourceCluster: sourceCluster,
TargetCluster: targetCluster,
},
PageSize: 1,
NextPageToken: nextPageToken,
})
require.NoError(t, err)
assert.Equal(t, 1, len(res.DlqTasks))
assert.Equal(t, int64(persistence.FirstQueueMessageID+i), res.DlqTasks[0].Metadata.MessageId)
nextPageToken = res.NextPageToken
}
}

func createServer(historyTaskQueueManager *persistence.HistoryTaskQueueManager) *grpc.Server {
// TODO: find a better way to create a history handler
historyHandler := historyserver.HandlerProvider(historyserver.NewHandlerArgs{
TaskQueueManager: historyTaskQueueManager,
TracerProvider: fakeTracerProvider{},
})
grpcServer := grpc.NewServer()
historyservice.RegisterHistoryServiceServer(grpcServer, historyHandler)
return grpcServer
}

func createClient(ctrl *gomock.Controller, listener *nettest.PipeListener) historyservice.HistoryServiceClient {
serviceResolver := membership.NewMockServiceResolver(ctrl)
serviceResolver.EXPECT().Members().Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("127.0.0.1:7104"),
})
rpcFactory := nettest.NewRPCFactory(listener)
client := history.NewClient(
dynamicconfig.NewNoopCollection(),
serviceResolver,
log.NewTestLogger(),
1,
rpcFactory,
time.Duration(0),
)
return client
}

func enqueueTasks(
t *testing.T,
historyTaskQueueManager *persistence.HistoryTaskQueueManager,
numTasks int,
sourceCluster string,
targetCluster string,
) {
t.Helper()

task := &tasks.WorkflowTask{
TaskID: 42,
}
for i := 0; i < numTasks; i++ {
_, err := historyTaskQueueManager.EnqueueTask(context.Background(), &persistence.EnqueueTaskRequest{
QueueType: persistence.QueueTypeHistoryDLQ,
SourceCluster: sourceCluster,
TargetCluster: targetCluster,
Task: task,
})
require.NoError(t, err)
}
}
14 changes: 14 additions & 0 deletions client/history/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 781c261

Please sign in to comment.