Skip to content

Commit

Permalink
Add skeleton for Schedule rpcs (#2846)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed May 16, 2022
1 parent 4a47b81 commit ec73b48
Show file tree
Hide file tree
Showing 12 changed files with 765 additions and 10 deletions.
105 changes: 105 additions & 0 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,111 @@ func (c *clientImpl) ListTaskQueuePartitions(
return client.ListTaskQueuePartitions(ctx, request, opts...)
}

func (c *clientImpl) CreateSchedule(
ctx context.Context,
request *workflowservice.CreateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.CreateScheduleResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.CreateSchedule(ctx, request, opts...)
}

func (c *clientImpl) DescribeSchedule(
ctx context.Context,
request *workflowservice.DescribeScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeScheduleResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.DescribeSchedule(ctx, request, opts...)
}

func (c *clientImpl) UpdateSchedule(
ctx context.Context,
request *workflowservice.UpdateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.UpdateScheduleResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.UpdateSchedule(ctx, request, opts...)
}

func (c *clientImpl) PatchSchedule(
ctx context.Context,
request *workflowservice.PatchScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.PatchScheduleResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.PatchSchedule(ctx, request, opts...)
}

func (c *clientImpl) ListScheduleMatchingTimes(
ctx context.Context,
request *workflowservice.ListScheduleMatchingTimesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListScheduleMatchingTimesResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.ListScheduleMatchingTimes(ctx, request, opts...)
}

func (c *clientImpl) DeleteSchedule(
ctx context.Context,
request *workflowservice.DeleteScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DeleteScheduleResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.DeleteSchedule(ctx, request, opts...)
}

func (c *clientImpl) ListSchedules(
ctx context.Context,
request *workflowservice.ListSchedulesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListSchedulesResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.ListSchedules(ctx, request, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, c.timeout)
}
Expand Down
126 changes: 126 additions & 0 deletions client/frontend/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,129 @@ func (c *metricClient) ListTaskQueuePartitions(
}
return resp, err
}

func (c *metricClient) CreateSchedule(
ctx context.Context,
request *workflowservice.CreateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.CreateScheduleResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientCreateScheduleScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientCreateScheduleScope, metrics.ClientLatency)
resp, err := c.client.CreateSchedule(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientCreateScheduleScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) DescribeSchedule(
ctx context.Context,
request *workflowservice.DescribeScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeScheduleResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientDescribeScheduleScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeScheduleScope, metrics.ClientLatency)
resp, err := c.client.DescribeSchedule(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientDescribeScheduleScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) UpdateSchedule(
ctx context.Context,
request *workflowservice.UpdateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.UpdateScheduleResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientUpdateScheduleScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientUpdateScheduleScope, metrics.ClientLatency)
resp, err := c.client.UpdateSchedule(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientUpdateScheduleScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) PatchSchedule(
ctx context.Context,
request *workflowservice.PatchScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.PatchScheduleResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientPatchScheduleScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientPatchScheduleScope, metrics.ClientLatency)
resp, err := c.client.PatchSchedule(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientPatchScheduleScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) ListScheduleMatchingTimes(
ctx context.Context,
request *workflowservice.ListScheduleMatchingTimesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListScheduleMatchingTimesResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientLatency)
resp, err := c.client.ListScheduleMatchingTimes(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) DeleteSchedule(
ctx context.Context,
request *workflowservice.DeleteScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DeleteScheduleResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientDeleteScheduleScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientDeleteScheduleScope, metrics.ClientLatency)
resp, err := c.client.DeleteSchedule(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientDeleteScheduleScope, metrics.ClientFailures)
}
return resp, err
}

func (c *metricClient) ListSchedules(
ctx context.Context,
request *workflowservice.ListSchedulesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListSchedulesResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientListSchedulesScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientListSchedulesScope, metrics.ClientLatency)
resp, err := c.client.ListSchedules(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientListSchedulesScope, metrics.ClientFailures)
}
return resp, err
}
105 changes: 105 additions & 0 deletions client/frontend/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,108 @@ func (c *retryableClient) ListTaskQueuePartitions(
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) CreateSchedule(
ctx context.Context,
request *workflowservice.CreateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.CreateScheduleResponse, error) {
var resp *workflowservice.CreateScheduleResponse
op := func() error {
var err error
resp, err = c.client.CreateSchedule(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) DescribeSchedule(
ctx context.Context,
request *workflowservice.DescribeScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeScheduleResponse, error) {
var resp *workflowservice.DescribeScheduleResponse
op := func() error {
var err error
resp, err = c.client.DescribeSchedule(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) UpdateSchedule(
ctx context.Context,
request *workflowservice.UpdateScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.UpdateScheduleResponse, error) {
var resp *workflowservice.UpdateScheduleResponse
op := func() error {
var err error
resp, err = c.client.UpdateSchedule(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) PatchSchedule(
ctx context.Context,
request *workflowservice.PatchScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.PatchScheduleResponse, error) {
var resp *workflowservice.PatchScheduleResponse
op := func() error {
var err error
resp, err = c.client.PatchSchedule(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) ListScheduleMatchingTimes(
ctx context.Context,
request *workflowservice.ListScheduleMatchingTimesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListScheduleMatchingTimesResponse, error) {
var resp *workflowservice.ListScheduleMatchingTimesResponse
op := func() error {
var err error
resp, err = c.client.ListScheduleMatchingTimes(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) DeleteSchedule(
ctx context.Context,
request *workflowservice.DeleteScheduleRequest,
opts ...grpc.CallOption,
) (*workflowservice.DeleteScheduleResponse, error) {
var resp *workflowservice.DeleteScheduleResponse
op := func() error {
var err error
resp, err = c.client.DeleteSchedule(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) ListSchedules(
ctx context.Context,
request *workflowservice.ListSchedulesRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListSchedulesResponse, error) {
var resp *workflowservice.ListSchedulesResponse
op := func() error {
var err error
resp, err = c.client.ListSchedules(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

0 comments on commit ec73b48

Please sign in to comment.