-
Notifications
You must be signed in to change notification settings - Fork 1
/
helper.go
35 lines (32 loc) · 1.03 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package command
// StatsCompleted returns if CoordinatorStatsAck shows processing has finished or not
func (s *CoordinatorStatsAck) StatsCompleted() bool {
// Why I check the number of messages sent is that the number of actives is often incorrect.
// Vertex actor returns its active state with ComputeAck, but then it may receives a message until the next superstep is started
return s.SuperStep > 0 && s.NrOfActiveVertex == 0 && s.NrOfSentMessages == 0
}
// FindWoerkerInfoByPartition returns worker info that owns partition
func (info *ClusterInfo) FindWoerkerInfoByPartition(partitionID uint64) *ClusterInfo_WorkerInfo {
for _, info := range info.WorkerInfo {
for _, p := range info.Partitions {
if p == partitionID {
return info
}
}
}
return nil
}
// NumOfPartitions returns number of partitions
func (info *ClusterInfo) NumOfPartitions() uint64 {
if info == nil {
return 0
}
if info.WorkerInfo == nil {
return 0
}
var size uint64
for _, i := range info.WorkerInfo {
size += uint64(len(i.Partitions))
}
return size
}