Skip to content

Commit

Permalink
Create admin API to get task queue tasks (#2221)
Browse files Browse the repository at this point in the history
  • Loading branch information
feedmeapples committed Nov 24, 2021
1 parent 78acd4d commit 6c86649
Show file tree
Hide file tree
Showing 10 changed files with 1,144 additions and 280 deletions.
1,124 changes: 901 additions & 223 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

153 changes: 96 additions & 57 deletions api/adminservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions api/adminservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,20 @@ func (c *clientImpl) ResendReplicationTasks(
return client.ResendReplicationTasks(ctx, request, opts...)
}

func (c *clientImpl) GetTaskQueueTasks(
ctx context.Context,
request *adminservice.GetTaskQueueTasksRequest,
opts ...grpc.CallOption,
) (*adminservice.GetTaskQueueTasksResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.GetTaskQueueTasks(ctx, request, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, c.timeout)
}
Expand Down
17 changes: 17 additions & 0 deletions client/admin/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,20 @@ func (c *metricClient) ResendReplicationTasks(
}
return resp, err
}

func (c *metricClient) GetTaskQueueTasks(
ctx context.Context,
request *adminservice.GetTaskQueueTasksRequest,
opts ...grpc.CallOption,
) (*adminservice.GetTaskQueueTasksResponse, error) {

c.metricsClient.IncCounter(metrics.AdminClientResendReplicationTasksScope, metrics.ClientRequests)
sw := c.metricsClient.StartTimer(metrics.AdminClientResendReplicationTasksScope, metrics.ClientLatency)
resp, err := c.client.GetTaskQueueTasks(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.AdminClientResendReplicationTasksScope, metrics.ClientFailures)
}
return resp, err
}
16 changes: 16 additions & 0 deletions client/admin/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,3 +507,19 @@ func (c *retryableClient) ResendReplicationTasks(
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) GetTaskQueueTasks(
ctx context.Context,
request *adminservice.GetTaskQueueTasksRequest,
opts ...grpc.CallOption,
) (*adminservice.GetTaskQueueTasksResponse, error) {

var resp *adminservice.GetTaskQueueTasksResponse
op := func() error {
var err error
resp, err = c.client.GetTaskQueueTasks(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}
Loading

0 comments on commit 6c86649

Please sign in to comment.