forked from uber-common/cadence-samples
/
activities.go
159 lines (138 loc) · 4.81 KB
/
activities.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package main
import (
"context"
"errors"
"io/ioutil"
"os"
"strings"
"time"
"go.uber.org/cadence/activity"
"go.uber.org/zap"
)
/**
* Sample activities used by file processing sample workflow.
*/
const (
downloadFileActivityName = "downloadFileActivity"
processFileActivityName = "processFileActivity"
uploadFileActivityName = "uploadFileActivity"
)
// This is registration process where you register all your activity handlers.
func init() {
activity.RegisterWithOptions(
downloadFileActivity,
activity.RegisterOptions{Name: downloadFileActivityName},
)
activity.RegisterWithOptions(
processFileActivity,
activity.RegisterOptions{Name: processFileActivityName},
)
activity.RegisterWithOptions(
uploadFileActivity,
activity.RegisterOptions{Name: uploadFileActivityName},
)
}
func downloadFileActivity(ctx context.Context, fileID string) (*fileInfo, error) {
logger := activity.GetLogger(ctx)
logger.Info("Downloading file...", zap.String("FileID", fileID))
data := downloadFile(fileID)
tmpFile, err := saveToTmpFile(data)
if err != nil {
logger.Error("downloadFileActivity failed to save tmp file.", zap.Error(err))
return nil, err
}
fileInfo := &fileInfo{FileName: tmpFile.Name(), HostID: HostID}
logger.Info("downloadFileActivity succeed.", zap.String("SavedFilePath", fileInfo.FileName))
return fileInfo, nil
}
func processFileActivity(ctx context.Context, fInfo fileInfo) (*fileInfo, error) {
logger := activity.GetLogger(ctx).With(zap.String("HostID", HostID))
logger.Info("processFileActivity started.", zap.String("FileName", fInfo.FileName))
// assert that we are running on the same host as the file was downloaded
// this check is not necessary, just to demo the host specific tasklist is working
if fInfo.HostID != HostID {
logger.Error("processFileActivity on wrong host",
zap.String("TargetFile", fInfo.FileName),
zap.String("TargetHostID", fInfo.HostID))
return nil, errors.New("processFileActivity running on wrong host")
}
defer os.Remove(fInfo.FileName) // cleanup temp file
// read downloaded file
data, err := ioutil.ReadFile(fInfo.FileName)
if err != nil {
logger.Error("processFileActivity failed to read file.", zap.String("FileName", fInfo.FileName), zap.Error(err))
return nil, err
}
// process the file
transData := transcodeData(ctx, data)
tmpFile, err := saveToTmpFile(transData)
if err != nil {
logger.Error("processFileActivity failed to save tmp file.", zap.Error(err))
return nil, err
}
processedInfo := &fileInfo{FileName: tmpFile.Name(), HostID: HostID}
logger.Info("processFileActivity succeed.", zap.String("SavedFilePath", processedInfo.FileName))
return processedInfo, nil
}
func uploadFileActivity(ctx context.Context, fInfo fileInfo) error {
logger := activity.GetLogger(ctx).With(zap.String("HostID", HostID))
logger.Info("uploadFileActivity begin.", zap.String("UploadedFileName", fInfo.FileName))
// assert that we are running on the same host as the file was downloaded
// this check is not necessary, just to demo the host specific tasklist is working
if fInfo.HostID != HostID {
logger.Error("uploadFileActivity on wrong host",
zap.String("TargetFile", fInfo.FileName),
zap.String("TargetHostID", fInfo.HostID))
return errors.New("uploadFileActivity running on wrong host")
}
defer os.Remove(fInfo.FileName) // clean up tmp file
err := uploadFile(ctx, fInfo.FileName)
if err != nil {
logger.Error("uploadFileActivity uploading failed.", zap.Error(err))
return err
}
logger.Info("uploadFileActivity succeed.", zap.String("UploadedFileName", fInfo.FileName))
return nil
}
func downloadFile(fileID string) []byte {
// dummy downloader
dummyContent := "dummy content for fileID:" + fileID
return []byte(dummyContent)
}
func uploadFile(ctx context.Context, filename string) error {
// dummy uploader
_, err := ioutil.ReadFile(filename)
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
// Demonstrates that heartbeat accepts progress data.
// In case of a heartbeat timeout it is included into the error.
activity.RecordHeartbeat(ctx, i)
}
if err != nil {
return err
}
return nil
}
func transcodeData(ctx context.Context, data []byte) []byte {
// dummy file processor, just do upper case for the data.
// in real world case, you would want to avoid load entire file content into memory at once.
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
// Demonstrates that heartbeat accepts progress data.
// In case of a heartbeat timeout it is included into the error.
activity.RecordHeartbeat(ctx, i)
}
return []byte(strings.ToUpper(string(data)))
}
func saveToTmpFile(data []byte) (f *os.File, err error) {
tmpFile, err := ioutil.TempFile("", "cadence_sample")
if err != nil {
return nil, err
}
_, err = tmpFile.Write(data)
if err != nil {
os.Remove(tmpFile.Name())
return nil, err
}
return tmpFile, nil
}