-
Notifications
You must be signed in to change notification settings - Fork 71
/
process.go
418 lines (374 loc) · 13.1 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
package scipipe
import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"
)
// Process is the central component in SciPipe after Workflow. Processes are
// long-running "services" that schedules and executes Tasks based on the IPs
// and parameters received on its in-ports and parameter ports
type Process struct {
BaseProcess
CommandPattern string
PathFuncs map[string]func(*Task) string
CustomExecute func(*Task)
CoresPerTask int
Prepend string
Spawn bool
PortInfo map[string]*PortInfo
}
// ------------------------------------------------------------------------
// Factory method(s)
// ------------------------------------------------------------------------
// NewProc returns a new Process, and initializes its ports based on the
// command pattern.
func NewProc(workflow *Workflow, name string, cmd string) *Process {
p := &Process{
BaseProcess: NewBaseProcess(
workflow,
name,
),
CommandPattern: cmd,
PathFuncs: make(map[string]func(*Task) string),
Spawn: true,
CoresPerTask: 1,
PortInfo: map[string]*PortInfo{},
}
workflow.AddProc(p)
p.initPortsFromCmdPattern(cmd, nil)
p.initDefaultPathFuncs()
return p
}
// PortInfo is a container for various information about process ports
type PortInfo struct {
portType string
extension string
doStream bool
join bool
joinSep string
}
// initPortsFromCmdPattern is a helper function for NewProc, that sets up in-
// and out-ports based on the shell command pattern used to create the Process.
// Ports are set up in this way:
// `{i:PORTNAME}` specifies an in-port
// `{o:PORTNAME}` specifies an out-port
// `{os:PORTNAME}` specifies an out-port that streams via a FIFO file
// `{p:PORTNAME}` a "parameter (in-)port", which means a port where parameters can be "streamed"
func (p *Process) initPortsFromCmdPattern(cmd string, params map[string]string) {
// Find in/out port names and params and set up ports
r := getShellCommandPlaceHolderRegex()
ms := r.FindAllStringSubmatch(cmd, -1)
portNameTypeCombos := map[string]map[string]string{}
for _, m := range ms {
portType := m[1]
portRest := m[2]
splitParts := strings.Split(portRest, "|")
portName := splitParts[0]
if _, ok := portNameTypeCombos[portName]; ok && len(portNameTypeCombos[portName]) > 1 {
p.Failf("Port-name (%s) used in multiple port-types. A name can not be used for e.g. both in-ports and out-ports in the same process.", portName)
}
if _, ok := portNameTypeCombos[portName]; !ok {
portNameTypeCombos[portName] = map[string]string{}
}
portNameTypeCombos[portName][portType] = portName
p.PortInfo[portName] = &PortInfo{portType: portType}
for _, part := range splitParts[1:] {
// If the |-separated part starts with a dot, treat it as a
// configuration for file extenion to use
fileExtPtn := regexp.MustCompile("\\.([a-z0-9\\.\\-\\_]+)")
if fileExtPtn.MatchString(part) {
m := fileExtPtn.FindStringSubmatch(part)
p.PortInfo[portName].extension = m[1]
}
// If the |-separated part starts with "join:"
// then treat the character following that as the character to use
// when joining multiple files received on a sub-stream
joinPtn := regexp.MustCompile("join:([^{}|]+)")
if joinPtn.MatchString(part) {
m := joinPtn.FindStringSubmatch(part)
p.PortInfo[portName].join = true
p.PortInfo[portName].joinSep = m[1]
}
}
}
for portName, pInfo := range p.PortInfo {
if pInfo.portType == "o" || pInfo.portType == "os" {
p.InitOutPort(p, portName)
if pInfo.portType == "os" {
p.PortInfo[portName].doStream = true
}
} else if pInfo.portType == "i" {
p.InitInPort(p, portName)
} else if pInfo.portType == "p" {
if params == nil {
p.InitInParamPort(p, portName)
} else if _, ok := params[portName]; !ok {
p.InitInParamPort(p, portName)
}
}
}
}
// initDefaultPathFuncs does exactly what it name says: Initializes default
// path formatters for processes, that is used if no explicit path is set, using
// the proc.SetPath[...] methods
func (p *Process) initDefaultPathFuncs() {
for outName := range p.OutPorts() {
outName := outName
p.PathFuncs[outName] = func(t *Task) string {
pathPcs := []string{}
for _, ipName := range sortedFileIPMapKeys(t.InIPs) {
pathPcs = append(pathPcs, filepath.Base(t.InIP(ipName).Path()))
}
procName := sanitizePathFragment(t.Process.Name())
pathPcs = append(pathPcs, procName)
for _, paramName := range sortedStringMapKeys(t.Params) {
pathPcs = append(pathPcs, paramName+"_"+t.Param(paramName))
}
for _, tagName := range sortedStringMapKeys(t.Tags) {
pathPcs = append(pathPcs, tagName+"_"+t.Tag(tagName))
}
pathPcs = append(pathPcs, outName)
fileExt := p.PortInfo[outName].extension
if fileExt != "" {
pathPcs = append(pathPcs, fileExt)
}
return strings.Join(pathPcs, ".")
}
}
}
func sortedFileIPMapKeys(kv map[string]*FileIP) []string {
keys := []string{}
for k := range kv {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func sortedStringMapKeys(kv map[string]string) []string {
keys := []string{}
for k := range kv {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func sortedFileIPSliceMapKeys(kv map[string][]*FileIP) []string {
keys := []string{}
for k := range kv {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// ------------------------------------------------------------------------
// Main API methods: Port accessor methods
// ------------------------------------------------------------------------
// In is a short-form for InPort() (of BaseProcess), which works only on Process
// processes
func (p *Process) In(portName string) *InPort {
if portName == "" && len(p.InPorts()) == 1 {
for _, inPort := range p.InPorts() {
return inPort // Return the (only) in-port available
}
}
return p.InPort(portName)
}
// Out is a short-form for OutPort() (of BaseProcess), which works only on
// Process processes
func (p *Process) Out(portName string) *OutPort {
if portName == "" && len(p.OutPorts()) == 1 {
for _, outPort := range p.OutPorts() {
return outPort // Return the (only) out-port available
}
}
return p.OutPort(portName)
}
// InParam is a short-form for InParamPort() (of BaseProcess), which works only on Process
// processes
func (p *Process) InParam(portName string) *InParamPort {
if _, ok := p.inParamPorts[portName]; !ok {
p.InitInParamPort(p, portName)
}
return p.InParamPort(portName)
}
// OutParam is a short-form for OutParamPort() (of BaseProcess), which works only on
// Process processes
func (p *Process) OutParam(portName string) *OutParamPort {
return p.OutParamPort(portName)
}
// ------------------------------------------------------------------------
// Main API methods: Configure path formatting
// ------------------------------------------------------------------------
// SetOut initializes a port (if it does not already exist), and takes a
// configuration for its outputs paths via a pattern similar to the command
// pattern used to create new processes, with placeholder tags. Available
// placeholder tags to use are:
// {i:inport_name}
// {p:param_name}
// {t:tag_name}
// An example might be: {i:foo}.replace_with_{p:replacement}.txt
// ... given that the process contains an in-port named 'foo', and a parameter
// named 'replacement'.
// If an out-port with the specified name does not exist, it will be created.
// This allows to create out-ports for filenames that are created without explicitly
// stating a filename on the commandline, such as when only submitting a prefix.
func (p *Process) SetOut(outPortName string, pathPattern string) {
p.SetOutFunc(outPortName, func(t *Task) string {
path := pathPattern // Avoiding reusing the same variable in multiple instances of this func
r := getShellCommandPlaceHolderRegex()
matches := r.FindAllStringSubmatch(path, -1)
for _, match := range matches {
var replacement string
placeHolder := match[0]
phType := match[1]
restMatch := match[2]
parts := strings.Split(restMatch, "|")
portName := parts[0]
restParts := parts[1:]
switch phType {
case "i":
replacement = t.InPath(portName)
case "o":
if _, ok := t.Process.PathFuncs[portName]; !ok {
p.Failf("No such pathfunc for out-port " + portName + " in task " + t.Name)
}
replacement = t.Process.PathFuncs[portName](t)
case "p":
replacement = t.Param(portName)
case "t":
replacement = t.Tag(portName)
default:
p.Failf("Replace failed for placeholder (%s) for path patterh (%s)", portName, path)
}
if len(restParts) > 0 {
replacement = applyPathModifiers(replacement, restParts)
}
// Replace placeholder with concrete value
path = strings.Replace(path, placeHolder, replacement, -1)
}
return path
})
}
// SetOutFunc takes a function which produces a file path based on data
// available in *Task, such as concrete file paths and parameter values,
func (p *Process) SetOutFunc(outPortName string, pathFmtFunc func(task *Task) (path string)) {
if _, ok := p.outPorts[outPortName]; !ok {
p.InitOutPort(p, outPortName)
}
p.PathFuncs[outPortName] = pathFmtFunc
}
// ------------------------------------------------------------------------
// Run method
// ------------------------------------------------------------------------
// Run runs the process by instantiating and executing Tasks for all inputs
// and parameter values on its in-ports. in the case when there are no inputs
// or parameter values on the in-ports, it will run just once before it
// terminates. note that the actual execution of shell commands are done inside
// Task.Execute, not here.
func (p *Process) Run() {
defer p.CloseOutPorts()
// Check that CoresPerTask is a sane number
if p.CoresPerTask > cap(p.workflow.concurrentTasks) {
p.Failf("CoresPerTask (%d) can't be greater than maxConcurrentTasks of workflow (%d)", p.CoresPerTask, cap(p.workflow.concurrentTasks))
}
// Using a slice to store unprocessed tasks allows us to receive tasks as
// they are produced and to maintain the correct order of IPs. This select
// allows us to process completed tasks as they become available. Waiting
// for all Tasks to be spawned before processing any can cause deadlock
// under certain workflow architectures when there are more than getBufsize()
// Tasks per process, see #81.
startedTasks := taskQueue{}
var nextTask *Task
tasks := p.createTasks()
for tasks != nil || len(startedTasks) > 0 {
select {
case t, ok := <-tasks:
if !ok {
tasks = nil
} else {
// Sending FIFOs for the task
for oname, oip := range t.OutIPs {
if oip.doStream {
if oip.FifoFileExists() {
p.Failf("Fifo file exists, so exiting (clean up fifo files before restarting the workflow): %s", oip.FifoPath())
}
oip.CreateFifo()
p.Out(oname).Send(oip)
}
}
// Execute task in separate go-routine
go t.Execute()
startedTasks = append(startedTasks, t)
}
case <-startedTasks.NextTaskDone():
nextTask, startedTasks = startedTasks[0], startedTasks[1:]
for oname, oip := range nextTask.OutIPs {
if !oip.doStream { // Streaming (FIFO) outputs have been sent earlier
p.Out(oname).Send(oip)
}
// Remove any FIFO file
if oip.doStream && oip.FifoFileExists() {
err := os.Remove(oip.FifoPath())
if err != nil {
p.Failf("Could not remove Fifo path %s", oip.FifoPath())
}
}
}
}
}
}
// createTasks is a helper method for Run that creates tasks based on incoming
// IPs on in-ports, and feeds them to the Run method on the returned channel ch
func (p *Process) createTasks() (ch chan *Task) {
ch = make(chan *Task)
go func() {
defer close(ch)
inIPs := map[string]*FileIP{}
params := map[string]string{}
inPortsOpen := true
paramPortsOpen := true
for {
// Tags need to be per Task, otherwise they are overwritten by future IPs
tags := map[string]string{}
// Only read on in-ports if we have any
if len(p.inPorts) > 0 {
inIPs, inPortsOpen = p.receiveOnInPorts()
// If in-port is closed, that means we got the last params on last iteration, so break
if !inPortsOpen {
break
}
}
// Only read on param in-ports if we have any
if len(p.inParamPorts) > 0 {
params, paramPortsOpen = p.receiveOnInParamPorts()
// If param-port is closed, that means we got the last params on last iteration, so break
if !paramPortsOpen {
break
}
}
for iname, ip := range inIPs {
for k, v := range ip.Tags() {
tags[iname+"."+k] = v
}
}
// Create task and send on the channel we are about to return
ch <- NewTask(p.workflow, p, p.Name(), p.CommandPattern, inIPs, p.PathFuncs, p.PortInfo, params, tags, p.Prepend, p.CustomExecute, p.CoresPerTask)
// If we have no in-ports nor param in-ports, we should break after the first iteration
if len(p.inPorts) == 0 && len(p.inParamPorts) == 0 {
break
}
}
}()
return ch
}
type taskQueue []*Task
// NextTaskDone allows us to wait for the next task to be done if it's
// available. Otherwise, nil is returned since nil channels always block.
func (tq taskQueue) NextTaskDone() chan int {
if len(tq) > 0 {
return tq[0].Done
}
return nil
}