Skip to content

Commit

Permalink
Expose buffer size and # dropped actions in ScheduleInfo (#4839)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Expose buffer size and number of dropped actions due to buffer limit in
ScheduleInfo
Related API change: temporalio/api#311


<!-- Tell your future self why have you made these changes -->
**Why?**
Give more visibility into schedule state to the clients.


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
None so far


<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
None

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
ShahabT authored Sep 6, 2023
1 parent c0bd30c commit 9684f9f
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 17 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.temporal.io/api v1.24.0
go.temporal.io/api v1.24.1-0.20230905121556-e0c76e6dc840
go.temporal.io/sdk v1.24.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.11.0
Expand Down Expand Up @@ -127,14 +127,14 @@ require (
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
lukechampine.com/uint128 v1.3.0 // indirect
Expand Down
22 changes: 12 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1306,8 +1306,8 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v0.20.0 h1:BLOA1cZBAGSbRiNuGCCKiFrCdYB7deeHDeD1SueyOfA=
go.opentelemetry.io/proto/otlp v0.20.0/go.mod h1:3QgjzPALBIv9pcknj2EXGPXjYPFdUh/RQfF8Lz3+Vnw=
go.temporal.io/api v1.21.0/go.mod h1:xlsUEakkN2vU2/WV7e5NqMG4N93nfuNfvbXdaXUpU8w=
go.temporal.io/api v1.24.0 h1:WWjMYSXNh4+T4Y4jq1e/d9yCNnWoHhq4bIwflHY6fic=
go.temporal.io/api v1.24.0/go.mod h1:4ackgCMjQHMpJYr1UQ6Tr/nknIqFkJ6dZ/SZsGv+St0=
go.temporal.io/api v1.24.1-0.20230905121556-e0c76e6dc840 h1:Pr9NMbXbizxLJLSLiaSEdwECRMVPF0MTdlito/zrQ74=
go.temporal.io/api v1.24.1-0.20230905121556-e0c76e6dc840/go.mod h1:SAfvX1/yC6i+UwAt+Ao1x5tL/u5n7fLThIjyUvYPh6g=
go.temporal.io/sdk v1.24.0 h1:mAk5VFR+z4s8QVzRx3iIpRnHcEO3m10CYNjnRXrhVq4=
go.temporal.io/sdk v1.24.0/go.mod h1:S7vWxU01lGcCny0sWx03bkkYw4VtVrpzeqBTn2A6y+E=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down Expand Up @@ -1621,8 +1621,9 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -1650,8 +1651,9 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -1961,8 +1963,8 @@ google.golang.org/genproto v0.0.0-20230629202037-9506855d4529/go.mod h1:xZnkP7mR
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:O9kGHb51iE/nOGvQaDUuadVYqovW56s5emA88lQnj6Y=
google.golang.org/genproto v0.0.0-20230726155614-23370e0ffb3e/go.mod h1:0ggbjUrZYpy1q+ANUS30SEoGZ53cdfwtbuG7Ptgy108=
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8=
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 h1:Iveh6tGCJkHAjJgEqUQYGDGgbwmhjoAOz8kO/ajxefY=
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234020-1aefcd67740a/go.mod h1:ts19tUU+Z0ZShN1y3aPyq2+O3d5FUNNgT6FtOzmrNn8=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
Expand All @@ -1971,8 +1973,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529/go.
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:mPBs5jNgx2GuQGvFwUvVKqtn6HsUw9nP64BedgvqEsQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5/go.mod h1:5DZzOUPCLYL3mNkQ0ms0F3EuUNZ7py1Bqeq6sxzI7/Q=
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 h1:WGq4lvB/mlicysM/dUT3SBvijH4D3sm/Ny1A4wmt2CI=
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q=
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234015-3fc162c6f38a/go.mod h1:xURIpW9ES5+/GZhnV6beoEtxQrnkRGIfP5VQG2tCBLc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
Expand All @@ -1982,8 +1984,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529/go.
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:8mL13HKkDa+IuJ8yruA3ci0q+0vsUz4m//+ottjwS5o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 h1:lv6/DhyiFFGsmzxbsUUTOkN29II+zeWHxvT8Lpdxsv0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down
2 changes: 2 additions & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ func (s *scheduler) handleDescribeQuery() (*schedspb.DescribeResponse, error) {
// this is a query handler, don't modify s.Info directly
infoCopy := *s.Info
infoCopy.FutureActionTimes = s.getFutureActionTimes(false, s.tweakables.FutureActionCount)
infoCopy.BufferSize = int64(len(s.State.BufferedStarts))

return &schedspb.DescribeResponse{
Schedule: s.Schedule,
Expand Down Expand Up @@ -894,6 +895,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
if s.tweakables.MaxBufferSize > 0 && len(s.State.BufferedStarts) >= s.tweakables.MaxBufferSize {
s.logger.Warn("Buffer too large", "start-time", nominalTime, "overlap-policy", overlapPolicy, "manual", manual)
s.metrics.Counter(metrics.ScheduleBufferOverruns.GetMetricName()).Inc(1)
s.Info.BufferDropped += 1
return
}
s.State.BufferedStarts = append(s.State.BufferedStarts, &schedspb.BufferedStart{
Expand Down
101 changes: 100 additions & 1 deletion service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (s *workflowSuite) TestOverlapBufferOne() {
end: time.Date(2022, 6, 1, 0, 29, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
// skipped over :15, :20
// skipped over :15, :20, :25
{
id: "myid-2022-06-01T00:30:00Z",
start: time.Date(2022, 6, 1, 0, 30, 0, 0, time.UTC),
Expand All @@ -557,10 +557,34 @@ func (s *workflowSuite) TestOverlapBufferOne() {
at: time.Date(2022, 6, 1, 0, 6, 0, 0, time.UTC),
f: func() { s.Equal([]string{"myid-2022-06-01T00:05:00Z"}, s.runningWorkflows()) },
},
{at: time.Date(2022, 6, 1, 0, 11, 0, 0, time.UTC),
f: func() {
s.Equal(int64(1), s.describe().Info.BufferSize)
s.Equal(int64(0), s.describe().Info.OverlapSkipped)
},
},
{at: time.Date(2022, 6, 1, 0, 16, 0, 0, time.UTC),
f: func() {
s.Equal(int64(1), s.describe().Info.BufferSize)
s.Equal(int64(1), s.describe().Info.OverlapSkipped)
},
},
{at: time.Date(2022, 6, 1, 0, 26, 0, 0, time.UTC),
f: func() {
s.Equal(int64(1), s.describe().Info.BufferSize)
s.Equal(int64(3), s.describe().Info.OverlapSkipped)
},
},
{
at: time.Date(2022, 6, 1, 0, 31, 0, 0, time.UTC),
f: func() { s.Equal([]string{"myid-2022-06-01T00:30:00Z"}, s.runningWorkflows()) },
},
{at: time.Date(2022, 6, 1, 0, 32, 0, 0, time.UTC),
f: func() {
s.Equal(int64(0), s.describe().Info.BufferSize)
s.Equal(int64(3), s.describe().Info.OverlapSkipped)
},
},
},
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Expand Down Expand Up @@ -638,6 +662,81 @@ func (s *workflowSuite) TestOverlapBufferAll() {
)
}

func (s *workflowSuite) TestBufferLimit() {
originalMaxBufferSize := currentTweakablePolicies.MaxBufferSize
currentTweakablePolicies.MaxBufferSize = 2
defer func() { currentTweakablePolicies.MaxBufferSize = originalMaxBufferSize }()

s.runAcrossContinue(
[]workflowRun{
{
id: "myid-2022-06-01T00:05:00Z",
start: time.Date(2022, 6, 1, 0, 5, 0, 0, time.UTC),
end: time.Date(2022, 6, 1, 0, 22, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
// first buffered one:
{
id: "myid-2022-06-01T00:10:00Z",
start: time.Date(2022, 6, 1, 0, 22, 0, 0, time.UTC),
end: time.Date(2022, 6, 1, 0, 23, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
// next buffered one, and also one more gets buffered:
{
id: "myid-2022-06-01T00:15:00Z",
start: time.Date(2022, 6, 1, 0, 23, 0, 0, time.UTC),
end: time.Date(2022, 6, 1, 0, 24, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
// run :20 does not fit in the buffer. finally back on track for :25
{
id: "myid-2022-06-01T00:25:00Z",
start: time.Date(2022, 6, 1, 0, 25, 0, 0, time.UTC),
end: time.Date(2022, 6, 1, 0, 27, 0, 0, time.UTC),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
},
},
[]delayedCallback{
{
at: time.Date(2022, 6, 1, 0, 20, 30, 0, time.UTC),
f: func() {
s.Equal([]string{"myid-2022-06-01T00:05:00Z"}, s.runningWorkflows())
s.Equal(int64(2), s.describe().Info.BufferSize)
s.Equal(int64(1), s.describe().Info.BufferDropped)
},
},
{
at: time.Date(2022, 6, 1, 0, 23, 30, 0, time.UTC),
f: func() {
s.Equal([]string{"myid-2022-06-01T00:15:00Z"}, s.runningWorkflows())
s.Equal(int64(0), s.describe().Info.BufferSize)
s.Equal(int64(1), s.describe().Info.BufferDropped)
},
},
{
at: time.Date(2022, 6, 1, 0, 25, 30, 0, time.UTC),
f: func() {
s.Equal([]string{"myid-2022-06-01T00:25:00Z"}, s.runningWorkflows())
s.Equal(int64(0), s.describe().Info.BufferSize)
s.Equal(int64(1), s.describe().Info.BufferDropped)
},
},
},
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(5 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL,
},
},
8,
)
}

func (s *workflowSuite) TestOverlapCancel() {
// written using low-level mocks so we can mock CancelWorkflow without adding support in
// the framework
Expand Down

0 comments on commit 9684f9f

Please sign in to comment.