This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
/
status.go
84 lines (69 loc) · 1.74 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Copyright (c) 2018 Lyft. All rights reserved.
*/
package arraystatus
import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/bitarray"
)
type JobID = string
type ArraySummary map[core.Phase]int64
type ArrayStatus struct {
// Summary of the array job. It's a map of phases and how many jobs are in that phase.
Summary ArraySummary `json:"summary"`
// Status of every job in the array.
Detailed bitarray.CompactArray `json:"details"`
}
// This is a status object that is returned after we make Catalog calls to see if subtasks are Cached
type ArrayCachedStatus struct {
CachedJobs *bitarray.BitSet `json:"cachedJobs"`
NumCached uint `json:"numCached"`
}
func deleteOrSet(summary ArraySummary, key core.Phase, value int64) {
if value == 0 {
delete(summary, key)
} else {
summary[key] = value
}
}
func (in ArraySummary) IncByCount(phase core.Phase, count int64) {
if existing, found := in[phase]; !found {
in[phase] = count
} else {
in[phase] = existing + count
}
}
func (in ArraySummary) Inc(phase core.Phase) {
in.IncByCount(phase, 1)
}
func (in ArraySummary) Dec(phase core.Phase) {
// TODO: Error if already 0?
in.IncByCount(phase, -1)
}
func (in ArraySummary) MergeFrom(other ArraySummary) (updated bool) {
// TODO: Refactor using sets
if other == nil {
for key := range in {
delete(in, key)
updated = true
}
return
}
for key, otherValue := range other {
if value, found := in[key]; found {
if value != otherValue {
deleteOrSet(in, key, otherValue)
updated = true
}
} else if otherValue != 0 {
in[key] = otherValue
updated = true
}
}
for key := range in {
if _, found := other[key]; !found {
delete(in, key)
}
}
return
}