Skip to content

Commit

Permalink
Fill in fields in ListSchedulesResponse (#2946)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jun 6, 2022
1 parent ba8ce65 commit 14556dd
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 30 deletions.
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,8 @@ const (
// e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat"
TimeoutFailureTypePrefix = "TemporalTimeout:"
)

const (
// Limit for schedule notes field
ScheduleNotesSizeLimit = 1000
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
go.temporal.io/api v1.8.0
go.temporal.io/api v1.8.1-0.20220603192404-e65836719706
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJ
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.7.1-0.20220510183009-449d18444c9a/go.mod h1:YU5EQaONkIr0ZRju0NqdqYNH/hCkBuwqRMDA0iaj7JM=
go.temporal.io/api v1.8.0 h1:FzAMmBeLs6BEMFyHeJ9M9GAv6McFuH/GjnliBCdQ/Zw=
go.temporal.io/api v1.8.0/go.mod h1:7m1ZOVUFi/54a5IMzMeELnvDy5sJwRfz11zi3Jrww8w=
go.temporal.io/api v1.8.1-0.20220603192404-e65836719706 h1:9zrW4CMQUgBMx9IUZ0qE/HhRxZEugmgvFTXBZhIdlsw=
go.temporal.io/api v1.8.1-0.20220603192404-e65836719706/go.mod h1:7m1ZOVUFi/54a5IMzMeELnvDy5sJwRfz11zi3Jrww8w=
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9 h1:6XjIRR49O5FAXF6f64EP1N5VfCs8/IPIqXfXWQcRCp8=
go.temporal.io/sdk v1.14.1-0.20220525140819-54f4148173a9/go.mod h1:d1S1ETFShrybdBYxhqizQASREk0G9oi4RE2VKsYGHAk=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
1 change: 1 addition & 0 deletions service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
errTaskQueueTooLong = serviceerror.NewInvalidArgument("TaskQueue length exceeds limit.")
errRequestIDTooLong = serviceerror.NewInvalidArgument("RequestId length exceeds limit.")
errIdentityTooLong = serviceerror.NewInvalidArgument("Identity length exceeds limit.")
errNotesTooLong = serviceerror.NewInvalidArgument("Schedule notes exceeds limit.")
errEarliestTimeIsGreaterThanLatestTime = serviceerror.NewInvalidArgument("EarliestTime in StartTimeFilter should not be larger than LatestTime.")
errClusterIsNotConfiguredForVisibilityArchival = serviceerror.NewInvalidArgument("Cluster is not configured for visibility archival.")
errClusterIsNotConfiguredForReadingArchivalVisibility = serviceerror.NewInvalidArgument("Cluster is not configured for reading archived visibility records.")
Expand Down
71 changes: 67 additions & 4 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"
"unicode/utf8"

"github.com/gogo/protobuf/jsonpb"
"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
Expand Down Expand Up @@ -3266,11 +3268,18 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
if !needRefresh {
token := make([]byte, 8)
binary.BigEndian.PutUint64(token, uint64(response.ConflictToken))

searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes()
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)

memo := describeResponse.GetWorkflowExecutionInfo().GetMemo()
memo = wh.cleanScheduleMemo(memo)

describeScheduleResponse = &workflowservice.DescribeScheduleResponse{
Schedule: response.Schedule,
Info: response.Info,
Memo: describeResponse.GetWorkflowExecutionInfo().Memo,
SearchAttributes: describeResponse.GetWorkflowExecutionInfo().SearchAttributes,
Memo: memo,
SearchAttributes: searchAttributes,
ConflictToken: token,
}
return nil
Expand Down Expand Up @@ -3404,6 +3413,11 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
return nil, err
}

if len(request.Patch.Pause) > common.ScheduleNotesSizeLimit ||
len(request.Patch.Unpause) > common.ScheduleNotesSizeLimit {
return nil, errNotesTooLong
}

inputPayloads, err := payloads.Encode(request.Patch)

sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace())
Expand Down Expand Up @@ -3606,10 +3620,15 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows

schedules := make([]*schedpb.ScheduleListEntry, len(persistenceResp.Executions))
for i, ex := range persistenceResp.Executions {
searchAttributes := ex.GetSearchAttributes()
info := wh.decodeScheduleListInfo(searchAttributes)
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)
memo := wh.cleanScheduleMemo(ex.GetMemo())
schedules[i] = &schedpb.ScheduleListEntry{
ScheduleId: ex.GetExecution().GetWorkflowId(),
Memo: ex.GetMemo(),
SearchAttributes: ex.GetSearchAttributes(),
Memo: memo,
SearchAttributes: searchAttributes,
Info: info,
}
}

Expand Down Expand Up @@ -4339,3 +4358,47 @@ func (wh *WorkflowHandler) trimHistoryNode(
)
}
}

func (wh *WorkflowHandler) decodeScheduleListInfo(searchAttributes *commonpb.SearchAttributes) *schedpb.ScheduleListInfo {
var listInfoStr string
var listInfoPb schedpb.ScheduleListInfo
if listInfoPayload := searchAttributes.GetIndexedFields()[searchattribute.TemporalScheduleInfoJSON]; listInfoPayload == nil {
return nil
} else if err := payload.Decode(listInfoPayload, &listInfoStr); err != nil {
wh.logger.Error("decoding schedule list info from payload", tag.Error(err))
return nil
} else if err = jsonpb.UnmarshalString(listInfoStr, &listInfoPb); err != nil {
wh.logger.Error("decoding schedule list info from json", tag.Error(err))
return nil
}
return &listInfoPb
}

// This mutates searchAttributes
func (wh *WorkflowHandler) cleanScheduleSearchAttributes(searchAttributes *commonpb.SearchAttributes) *commonpb.SearchAttributes {
fields := searchAttributes.GetIndexedFields()
if len(fields) == 0 {
return nil
}

delete(fields, searchattribute.TemporalSchedulePaused)
delete(fields, searchattribute.TemporalScheduleInfoJSON)
// this isn't schedule-related but isn't relevant to the user for
// scheduler workflows since it's the server worker
delete(fields, searchattribute.BinaryChecksums)

if len(fields) == 0 {
return nil
}
return searchAttributes
}

// This mutates memo
func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo {
fields := memo.GetFields()
if len(fields) == 0 {
return nil
}
// we don't define any fields here but might in the future
return memo
}
135 changes: 112 additions & 23 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/google/uuid"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
Expand All @@ -38,6 +39,7 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
sdklog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -94,6 +96,9 @@ type (
AlwaysAppendTimestamp bool // Whether to append timestamp for non-overlapping workflows too
FutureActionCount int // The number of future action times to include in Describe.
RecentActionCount int // The number of recent actual action results to include in Describe.
FutureActionCountForList int // The number of future action times to include in List (search attr).
RecentActionCountForList int // The number of recent actual action results to include in List (search attr).
MaxSearchAttrLen int // Search attr length limit (should be <= server's limit).
IterationsBeforeContinueAsNew int
}
)
Expand Down Expand Up @@ -121,6 +126,9 @@ var (
AlwaysAppendTimestamp: true,
FutureActionCount: 10,
RecentActionCount: 10,
FutureActionCountForList: 5,
RecentActionCountForList: 5,
MaxSearchAttrLen: 2000, // server default is 2048 but leave a little room
IterationsBeforeContinueAsNew: 500,
}

Expand Down Expand Up @@ -181,6 +189,7 @@ func (s *scheduler) run() error {
s.State.LastProcessedTime = timestamp.TimePtr(t2)
for s.processBuffer() {
}
s.updateSearchAttributes()
// sleep returns on any of:
// 1. requested time elapsed
// 2. we got a signal (update, request, refresh)
Expand Down Expand Up @@ -471,22 +480,25 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) {
s.needRefresh = true
}

func (s *scheduler) handleDescribeQuery() (*schedspb.DescribeResponse, error) {
// update future actions
if s.cspec != nil {
s.Info.FutureActionTimes = make([]*time.Time, 0, s.tweakables.FutureActionCount)
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
for len(s.Info.FutureActionTimes) < s.tweakables.FutureActionCount {
var has bool
_, t1, has = s.cspec.getNextTime(t1)
if !has {
break
}
s.Info.FutureActionTimes = append(s.Info.FutureActionTimes, timestamp.TimePtr(t1))
func (s *scheduler) getFutureActionTimes(n int) []*time.Time {
if s.cspec == nil {
return nil
}
out := make([]*time.Time, 0, n)
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
for len(out) < n {
var has bool
_, t1, has = s.cspec.getNextTime(t1)
if !has {
break
}
} else {
s.Info.FutureActionTimes = nil
out = append(out, timestamp.TimePtr(t1))
}
return out
}

func (s *scheduler) handleDescribeQuery() (*schedspb.DescribeResponse, error) {
s.Info.FutureActionTimes = s.getFutureActionTimes(s.tweakables.FutureActionCount)

return &schedspb.DescribeResponse{
Schedule: s.Schedule,
Expand Down Expand Up @@ -519,6 +531,80 @@ func (s *scheduler) incSeqNo() {
s.State.ConflictToken++
}

func (s *scheduler) getListInfo(shrink int) *schedpb.ScheduleListInfo {
specCopy := *s.Schedule.Spec
spec := &specCopy
// always clear some fields that are too large/not useful for the list view
spec.ExcludeCalendar = nil
spec.Jitter = nil
spec.TimezoneData = nil

recentActionCount := s.tweakables.RecentActionCountForList
futureActionCount := s.tweakables.FutureActionCountForList
notes := s.Schedule.State.Notes

// if we need to shrink it, clear/shrink some more
if shrink > 0 {
recentActionCount = 1
futureActionCount = 1
notes = ""
}
if shrink > 1 {
spec = nil
}

return &schedpb.ScheduleListInfo{
Spec: spec,
WorkflowType: s.Schedule.Action.GetStartWorkflow().GetWorkflowType(),
Notes: notes,
Paused: s.Schedule.State.Paused,
RecentActions: sliceTail(s.Info.RecentActions, recentActionCount),
FutureActionTimes: s.getFutureActionTimes(futureActionCount),
}
}

func (s *scheduler) updateSearchAttributes() {
dc := converter.GetDefaultDataConverter()

var currentInfo, newInfo string
if payload := workflow.GetInfo(s.ctx).SearchAttributes.GetIndexedFields()[searchattribute.TemporalScheduleInfoJSON]; payload != nil {
if err := dc.FromPayload(payload, &currentInfo); err != nil {
s.logger.Error("error decoding current info search attr", "error", err)
return
}
}

for shrink := 0; shrink <= 2; shrink++ {
var err error
m := &jsonpb.Marshaler{}
if newInfo, err = m.MarshalToString(s.getListInfo(shrink)); err != nil {
s.logger.Error("error encoding ScheduleListInfo", "error", err)
return
}
// encode to check size. note that the server uses len(Data) for per-attr size checks
if newInfoPayload, err := dc.ToPayload(newInfo); err != nil {
s.logger.Error("error encoding ScheduleListInfo into payload", "error", err)
return
} else if len(newInfoPayload.Data) <= s.tweakables.MaxSearchAttrLen {
break
}
newInfo = "{}" // fallback that can't possibly exceed the limit
}

// note that newInfo contains paused, so if paused changed, then newInfo will too
if newInfo == currentInfo {
return
}

err := workflow.UpsertSearchAttributes(s.ctx, map[string]interface{}{
searchattribute.TemporalSchedulePaused: s.Schedule.State.Paused,
searchattribute.TemporalScheduleInfoJSON: newInfo,
})
if err != nil {
s.logger.Error("error updating search attributes", "error", err)
}
}

func (s *scheduler) checkConflict(token int64) error {
if token == 0 || token == s.State.ConflictToken {
return nil
Expand Down Expand Up @@ -647,11 +733,7 @@ func (s *scheduler) processBuffer() bool {

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
s.Info.ActionCount++
s.Info.RecentActions = append(s.Info.RecentActions, result)
extra := len(s.Info.RecentActions) - s.tweakables.RecentActionCount
if extra > 0 {
s.Info.RecentActions = s.Info.RecentActions[extra:]
}
s.Info.RecentActions = sliceTail(append(s.Info.RecentActions, result), s.tweakables.RecentActionCount)
if result.StartWorkflowResult != nil {
s.Info.RunningWorkflows = append(s.Info.RunningWorkflows, result.StartWorkflowResult)
}
Expand Down Expand Up @@ -709,7 +791,7 @@ func (s *scheduler) startWorkflow(
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
RetryPolicy: newWorkflow.RetryPolicy,
Memo: newWorkflow.Memo,
SearchAttributes: s.addSearchAttrs(newWorkflow.SearchAttributes, nominalTimeSec),
SearchAttributes: s.addSearchAttributes(newWorkflow.SearchAttributes, nominalTimeSec),
Header: newWorkflow.Header,
},
StartTime: startTime,
Expand All @@ -736,11 +818,11 @@ func (s *scheduler) identity() string {
return fmt.Sprintf("temporal-scheduler-%s-%s", s.State.Namespace, s.State.ScheduleId)
}

func (s *scheduler) addSearchAttrs(
attrs *commonpb.SearchAttributes,
func (s *scheduler) addSearchAttributes(
attributes *commonpb.SearchAttributes,
nominal time.Time,
) *commonpb.SearchAttributes {
fields := maps.Clone(attrs.GetIndexedFields())
fields := maps.Clone(attributes.GetIndexedFields())
if p, err := payload.Encode(nominal); err == nil {
fields[searchattribute.TemporalScheduledStartTime] = p
}
Expand Down Expand Up @@ -838,3 +920,10 @@ func (s *scheduler) newUUIDString() string {
}).Get(&str)
return str
}

func sliceTail[S ~[]E, E any](s S, n int) S {
if extra := len(s) - n; extra > 0 {
return s[extra:]
}
return s
}

0 comments on commit 14556dd

Please sign in to comment.