/
cron_workflow.go
72 lines (58 loc) · 2.23 KB
/
cron_workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main
import (
"context"
"time"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)
/**
* This cron sample workflow will schedule job based on given schedule spec. The schedule spec in this sample demo is
* very simple, but you could have more complicated scheduler logic that meet your needs.
*/
const (
// timeout for activity task from put in queue to started
activityScheduleToStartTimeout = time.Second * 10
// timeout for activity from start to complete
activityStartToCloseTimeout = time.Minute
// WorkflowStartToCloseTimeout (from workflow start to workflow close)
WorkflowStartToCloseTimeout = time.Minute * 20
// DecisionTaskStartToCloseTimeout (from decision task started to decision task completed, usually very short)
DecisionTaskStartToCloseTimeout = time.Second * 10
)
//
// Cron sample job activity.
//
func sampleCronActivity(ctx context.Context, beginTime, endTime time.Time) error {
activity.GetLogger(ctx).Info("Cron job running.", zap.Time("beginTime_exclude", beginTime), zap.Time("endTime_include", endTime))
// ...
return nil
}
// SampleCronResult used to return data from one cron run to next cron run.
type SampleCronResult struct {
EndTime time.Time
}
// sampleCronWorkflow workflow decider
func sampleCronWorkflow(ctx workflow.Context) (*SampleCronResult, error) {
workflow.GetLogger(ctx).Info("Cron workflow started.", zap.Time("StartTime", workflow.Now(ctx)))
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: activityScheduleToStartTimeout,
StartToCloseTimeout: activityStartToCloseTimeout,
}
ctx1 := workflow.WithActivityOptions(ctx, ao)
startTime := time.Time{} // start from 0 time for first cron job
if workflow.HasLastCompletionResult(ctx) {
var lastResult SampleCronResult
if err := workflow.GetLastCompletionResult(ctx, &lastResult); err == nil {
startTime = lastResult.EndTime
}
}
endTime := workflow.Now(ctx)
err := workflow.ExecuteActivity(ctx1, sampleCronActivity, startTime, endTime).Get(ctx, nil)
if err != nil {
// cron job failed. but next cron should continue to be scheduled by Cadence server
workflow.GetLogger(ctx).Error("Cron job failed.", zap.Error(err))
return nil, err
}
return &SampleCronResult{EndTime: endTime}, nil
}