/
evaluate.go
133 lines (114 loc) · 3 KB
/
evaluate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package assigners
import (
"fmt"
"reflect"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/config"
)
// EvaluateAssignments determines whether the given assignments are consistent
// with the provided placement strategy.
func EvaluateAssignments(
assignments []admin.PartitionAssignment,
brokers []admin.BrokerInfo,
placementConfig config.TopicPlacementConfig,
) (bool, error) {
if err := admin.CheckAssignments(assignments); err != nil {
return false, err
}
minRacks, maxRacks, leaderRackCounts := minMaxRacks(assignments, brokers)
balanced := balancedLeaders(leaderRackCounts)
switch placementConfig.Strategy {
case config.PlacementStrategyAny:
return true, nil
case config.PlacementStrategyStatic:
replicas, err := admin.AssignmentsToReplicas(assignments)
if err != nil {
return false, err
}
return reflect.DeepEqual(
replicas,
placementConfig.StaticAssignments,
), nil
case config.PlacementStrategyStaticInRack:
if !(minRacks == 1 && maxRacks == 1) {
return false, nil
}
if len(placementConfig.StaticRackAssignments) != len(assignments) {
return false, nil
}
brokerRacks := admin.BrokerRacks(brokers)
for a, assignment := range assignments {
partitionRack := brokerRacks[assignment.Replicas[0]]
expectedRack := placementConfig.StaticRackAssignments[a]
if partitionRack != expectedRack {
return false, nil
}
}
return true, nil
case config.PlacementStrategyBalancedLeaders:
return balanced, nil
case config.PlacementStrategyInRack:
return minRacks == 1 && maxRacks == 1, nil
case config.PlacementStrategyCrossRack:
brokerRacks := admin.BrokerRacks(brokers)
for _, assignment := range assignments {
if len(assignment.Replicas) != len(assignment.DistinctRacks(brokerRacks)) {
return false, nil
}
}
return true, nil
default:
return false, fmt.Errorf(
"Unrecognized placementStrategy: %s",
placementConfig.Strategy,
)
}
}
func balancedLeaders(leaderRackCounts map[string]int) bool {
var minCount, maxCount int
first := true
for _, count := range leaderRackCounts {
if first {
minCount = count
maxCount = count
first = false
} else {
if count < minCount {
minCount = count
}
if count > maxCount {
maxCount = count
}
}
}
return minCount == maxCount
}
func minMaxRacks(
assignments []admin.PartitionAssignment,
brokers []admin.BrokerInfo,
) (int, int, map[string]int) {
brokerRacks := admin.BrokerRacks(brokers)
racks := admin.DistinctRacks(brokers)
leaderRackCounts := map[string]int{}
for _, rack := range racks {
leaderRackCounts[rack] = 0
}
var minRacks, maxRacks int
for a, assignment := range assignments {
leader := assignment.Replicas[0]
leaderRackCounts[brokerRacks[leader]]++
racks := len(assignment.DistinctRacks(brokerRacks))
if a == 0 {
minRacks = racks
maxRacks = racks
} else {
if racks < minRacks {
minRacks = racks
}
if racks > maxRacks {
maxRacks = racks
}
}
}
return minRacks, maxRacks, leaderRackCounts
}