forked from TykTechnologies/graphql-go-tools
/
config.go
141 lines (121 loc) · 3.98 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package kafka_datasource
import (
"fmt"
"github.com/Shopify/sarama"
)
const (
IsolationLevelReadUncommitted = "ReadUncommitted"
IsolationLevelReadCommitted = "ReadCommitted"
)
const DefaultIsolationLevel = IsolationLevelReadUncommitted
const (
BalanceStrategyRange = "BalanceStrategyRange"
BalanceStrategySticky = "BalanceStrategySticky"
BalanceStrategyRoundRobin = "BalanceStrategyRoundRobin"
)
const DefaultBalanceStrategy = BalanceStrategyRange
var (
DefaultKafkaVersion = "V1_0_0_0"
SaramaSupportedKafkaVersions = map[string]sarama.KafkaVersion{
"V0_10_2_0": sarama.V0_10_2_0,
"V0_10_2_1": sarama.V0_10_2_1,
"V0_11_0_0": sarama.V0_11_0_0,
"V0_11_0_1": sarama.V0_11_0_1,
"V0_11_0_2": sarama.V0_11_0_2,
"V1_0_0_0": sarama.V1_0_0_0,
"V1_1_0_0": sarama.V1_1_0_0,
"V1_1_1_0": sarama.V1_1_1_0,
"V2_0_0_0": sarama.V2_0_0_0,
"V2_0_1_0": sarama.V2_0_1_0,
"V2_1_0_0": sarama.V2_1_0_0,
"V2_2_0_0": sarama.V2_2_0_0,
"V2_3_0_0": sarama.V2_3_0_0,
"V2_4_0_0": sarama.V2_4_0_0,
"V2_5_0_0": sarama.V2_5_0_0,
"V2_6_0_0": sarama.V2_6_0_0,
"V2_7_0_0": sarama.V2_7_0_0,
"V2_8_0_0": sarama.V2_8_0_0,
}
)
type SASL struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool `json:"enable"`
// User is the authentication identity (authcid) to present for
// SASL/PLAIN or SASL/SCRAM authentication
User string `json:"user"`
// Password for SASL/PLAIN authentication
Password string `json:"password"`
}
type GraphQLSubscriptionOptions struct {
BrokerAddr string `json:"broker_addr"`
Topic string `json:"topic"`
GroupID string `json:"group_id"`
ClientID string `json:"client_id"`
KafkaVersion string `json:"kafka_version"`
StartConsumingLatest bool `json:"start_consuming_latest"`
BalanceStrategy string `json:"balance_strategy"`
IsolationLevel string `json:"isolation_level"`
SASL SASL `json:"sasl"`
startedCallback func()
}
func (g *GraphQLSubscriptionOptions) Sanitize() {
if g.KafkaVersion == "" {
g.KafkaVersion = DefaultKafkaVersion
}
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
if g.BalanceStrategy == "" {
g.BalanceStrategy = DefaultBalanceStrategy
}
if g.IsolationLevel == "" {
g.IsolationLevel = DefaultIsolationLevel
}
}
func (g *GraphQLSubscriptionOptions) Validate() error {
switch {
case g.BrokerAddr == "":
return fmt.Errorf("broker_addr cannot be empty")
case g.Topic == "":
return fmt.Errorf("topic cannot be empty")
case g.GroupID == "":
return fmt.Errorf("group_id cannot be empty")
case g.ClientID == "":
return fmt.Errorf("client_id cannot be empty")
}
if _, ok := SaramaSupportedKafkaVersions[g.KafkaVersion]; !ok {
return fmt.Errorf("kafka_version is invalid: %s", g.KafkaVersion)
}
switch g.BalanceStrategy {
case BalanceStrategyRange, BalanceStrategySticky, BalanceStrategyRoundRobin:
default:
return fmt.Errorf("balance_strategy is invalid: %s", g.BalanceStrategy)
}
switch g.IsolationLevel {
case IsolationLevelReadUncommitted, IsolationLevelReadCommitted:
default:
return fmt.Errorf("isolation_level is invalid: %s", g.IsolationLevel)
}
if g.SASL.Enable {
switch {
case g.SASL.User == "":
return fmt.Errorf("sasl.user cannot be empty")
case g.SASL.Password == "":
return fmt.Errorf("sasl.password cannot be empty")
}
}
return nil
}
type SubscriptionConfiguration struct {
BrokerAddr string `json:"broker_addr"`
Topic string `json:"topic"`
GroupID string `json:"group_id"`
ClientID string `json:"client_id"`
KafkaVersion string `json:"kafka_version"`
StartConsumingLatest bool `json:"start_consuming_latest"`
BalanceStrategy string `json:"balance_strategy"`
IsolationLevel string `json:"isolation_level"`
SASL SASL `json:"sasl"`
}
type Configuration struct {
Subscription SubscriptionConfiguration
}