-
Notifications
You must be signed in to change notification settings - Fork 7
/
files.go
151 lines (131 loc) · 4.54 KB
/
files.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main
import (
"encoding/json"
"fmt"
"net/url"
"path/filepath"
"strconv"
"time"
"github.com/pcelvng/task"
"github.com/pcelvng/task-tools/file/stat"
"github.com/pcelvng/task-tools/tmpl"
"github.com/pcelvng/task-tools/workflow"
)
// gcp_files is a struct populated by https://cloud.google.com/storage/docs/json_api/v1/objects#resource
// GCSEvent is the payload of a GCS event.
type gcsEvent struct {
Kind string `json:"kind"`
ID string `json:"id"`
SelfLink string `json:"selfLink"`
Path string `json:"name"`
Bucket string `json:"bucket"`
Generation string `json:"generation"`
Metageneration string `json:"metageneration"`
ContentType string `json:"contentType"`
TimeCreated time.Time `json:"timeCreated"`
Updated time.Time `json:"updated"`
TemporaryHold bool `json:"temporaryHold"`
//EventBasedHold bool `json:"eventBasedHold"`
//RetentionExpirationTime time.Time `json:"retentionExpirationTime"`
StorageClass string `json:"storageClass"`
TimeStorageClassUpdated time.Time `json:"timeStorageClassUpdated"`
Size string `json:"size"`
MD5Hash string `json:"md5Hash"`
MediaLink string `json:"mediaLink"`
//ContentEncoding string `json:"contentEncoding"`
//ContentDisposition string `json:"contentDisposition"`
//CacheControl string `json:"cacheControl"`
//Metadata map[string]interface{} `json:"metadata"`
CRC32C string `json:"crc32c"`
//ComponentCount int `json:"componentCount"`
Etag string `json:"etag"`
/*CustomerEncryption struct {
EncryptionAlgorithm string `json:"encryptionAlgorithm"`
KeySha256 string `json:"keySha256"`
}
KMSKeyName string `json:"kmsKeyName"`
ResourceState string `json:"resourceState"`*/
}
type fileRule struct {
SrcPattern string `uri:"file"` // source file path pattern to match (supports glob style matching)
workflowFile string
workflow.Phase
// checks for rules that checks on groups of files instead of responding
// immediately to an individual file.
CronCheck string `uri:"cron"` // optional cron parsable string representing when to check src pattern matching files
CountCheck int `uri:"count"` // optional int representing how many files matching that rule to wait for until the rule is exercised
}
func (e gcsEvent) Stat() stat.Stats {
size, _ := strconv.Atoi(e.Size)
return stat.Stats{
LineCnt: 0,
ByteCnt: 0,
Size: int64(size),
Checksum: e.MD5Hash,
Path: "gs://" + e.Bucket + "/" + e.Path,
Created: e.TimeCreated.In(time.UTC).Format(time.RFC3339),
IsDir: false,
Files: 0,
}
}
func unmarshalStat(b []byte) (sts stat.Stats) {
e := gcsEvent{}
json.Unmarshal(b, &e)
if e.ID != "" && e.Path != "" { // this is a valid gcsEvent
return e.Stat()
}
json.Unmarshal(b, &sts)
return sts
}
// matchFile checks the sts.Path with all file Rules that are registered with flowlord
// if a match is found it will create a task and send it out
func (tm *taskMaster) matchFile(sts stat.Stats) error {
matches := 0
for _, f := range tm.files {
if isMatch, _ := filepath.Match(f.SrcPattern, sts.Path); !isMatch {
continue
}
matches++
// setup task
t := tmpl.PathTime(sts.Path) // get time from path
// setup custom files values from rules
meta, _ := url.ParseQuery(f.Rule)
meta.Set("file", sts.Path)
meta.Set("filename", filepath.Base(sts.Path))
meta.Set("workflow", f.workflowFile)
// todo: add job if provided in task name ex -> task:job
// populate the info string
info := tmpl.Parse(f.Template, t)
info = tmpl.Meta(info, meta)
tsk := task.New(f.Topic(), info)
tsk.Meta, _ = url.QueryUnescape(meta.Encode())
if err := tm.producer.Send(tsk.Type, tsk.JSONBytes()); err != nil {
return err
}
}
if matches == 0 {
return fmt.Errorf("no match found for %q", sts.Path)
}
return nil
}
/*
func (tm *taskMaster) match(sts *stat.Stats, rule *Rule) {
if isMatch, _ := filepath.Match(rule.SrcPattern, sts.Path); !isMatch {
return
}
// goes to a rule bucket?
if rule.CountCheck > 0 || rule.CronCheck != "" {
tm.addSts(rule, sts)
// count check - send tsk if count is full
if rule.CountCheck > 0 {
tm.countCheck(rule)
}
} else {
// does not go to a rule bucket so
// create task and send immediately
info := genInfo(rule.InfoTemplate, sts)
tsk := task.New(rule.TaskType, info)
tm.sendTsk(tsk, rule)
}
}
*/