-
Notifications
You must be signed in to change notification settings - Fork 21
/
workunit.go
113 lines (102 loc) · 3.24 KB
/
workunit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package core
import (
"fmt"
"github.com/MG-RAST/AWE/lib/conf"
"os"
"time"
)
const (
WORK_STAT_QUEUED = "queued"
WORK_STAT_CHECKOUT = "checkout"
WORK_STAT_SUSPEND = "suspend"
WORK_STAT_DONE = "done"
WORK_STAT_FAIL = "fail"
WORK_STAT_PREPARED = "prepared"
WORK_STAT_COMPUTED = "computed"
WORK_STAT_DISCARDED = "discarded"
WORK_STAT_PROXYQUEUED = "proxyqueued"
)
type Workunit struct {
Id string `bson:"wuid" json:"wuid"`
Info *Info `bson:"info" json:"info"`
Inputs IOmap `bson:"inputs" json:"inputs"`
Outputs IOmap `bson:"outputs" json:"outputs"`
Predata IOmap `bson:"predata" json:"predata"`
Cmd *Command `bson:"cmd" json:"cmd"`
Rank int `bson:"rank" json:"rank"`
TotalWork int `bson:"totalwork" json:"totalwork"`
Partition *PartInfo `bson:"part" json:"part"`
State string `bson:"state" json:"state"`
Failed int `bson:"failed" json:"failed"`
CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"`
Client string `bson:"client" json:"client"`
ComputeTime int `bson:"computetime" json:"computetime"`
Notes string `bson:"-" json:"-"`
UserAttr map[string]string `bson:"userattr" json:"userattr"`
}
func NewWorkunit(task *Task, rank int) *Workunit {
return &Workunit{
Id: fmt.Sprintf("%s_%d", task.Id, rank),
Info: task.Info,
Inputs: task.Inputs,
Outputs: task.Outputs,
Predata: task.Predata,
Cmd: task.Cmd,
Rank: rank,
TotalWork: task.TotalWork, //keep this info in workunit for load balancing
Partition: task.Partition,
State: WORK_STAT_QUEUED,
Failed: 0,
UserAttr: task.UserAttr,
}
}
func (work *Workunit) Mkdir() (err error) {
// delete workdir just in case it exists; will not work if awe-client is not in docker container AND tasks are in container
os.RemoveAll(work.Path())
err = os.MkdirAll(work.Path(), 0777)
if err != nil {
return
}
return
}
func (work *Workunit) RemoveDir() (err error) {
err = os.RemoveAll(work.Path())
if err != nil {
return
}
return
}
func (work *Workunit) Path() string {
id := work.Id
return fmt.Sprintf("%s/%s/%s/%s/%s", conf.WORK_PATH, id[0:2], id[2:4], id[4:6], id)
}
func (work *Workunit) CDworkpath() (err error) {
return os.Chdir(work.Path())
}
func (work *Workunit) IndexType() (indextype string) {
return work.Partition.Index
}
//calculate the range of data part
//algorithm: try to evenly distribute indexed parts to workunits
//e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2
func (work *Workunit) Part() (part string) {
if work.Rank == 0 {
return ""
}
partsize := work.Partition.TotalIndex / work.TotalWork //floor
remainder := work.Partition.TotalIndex % work.TotalWork
var start, end int
if work.Rank <= remainder {
start = (partsize+1)*(work.Rank-1) + 1
end = start + partsize
} else {
start = (partsize+1)*remainder + partsize*(work.Rank-remainder-1) + 1
end = start + partsize - 1
}
if start == end {
part = fmt.Sprintf("%d", start)
} else {
part = fmt.Sprintf("%d-%d", start, end)
}
return
}