-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
authentication.go
162 lines (140 loc) · 5.54 KB
/
authentication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
import (
"context"
"crypto/sha256"
"crypto/sha512"
"fmt"
"github.com/IBM/sarama"
"go.opentelemetry.io/collector/config/configtls"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk"
)
// Authentication defines authentication.
type Authentication struct {
PlainText *PlainTextConfig `mapstructure:"plain_text"`
SASL *SASLConfig `mapstructure:"sasl"`
TLS *configtls.ClientConfig `mapstructure:"tls"`
Kerberos *KerberosConfig `mapstructure:"kerberos"`
}
// PlainTextConfig defines plaintext authentication.
type PlainTextConfig struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
// SASLConfig defines the configuration for the SASL authentication.
type SASLConfig struct {
// Username to be used on authentication
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
// SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
Version int `mapstructure:"version"`
AWSMSK AWSMSKConfig `mapstructure:"aws_msk"`
}
// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
}
// KerberosConfig defines kereros configuration.
type KerberosConfig struct {
ServiceName string `mapstructure:"service_name"`
Realm string `mapstructure:"realm"`
UseKeyTab bool `mapstructure:"use_keytab"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
ConfigPath string `mapstructure:"config_file"`
KeyTabPath string `mapstructure:"keytab_file"`
}
// ConfigureAuthentication configures authentication in sarama.Config.
func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error {
if config.PlainText != nil {
configurePlaintext(*config.PlainText, saramaConfig)
}
if config.TLS != nil {
if err := configureTLS(*config.TLS, saramaConfig); err != nil {
return err
}
}
if config.SASL != nil {
if err := configureSASL(*config.SASL, saramaConfig); err != nil {
return err
}
}
if config.Kerberos != nil {
configureKerberos(*config.Kerberos, saramaConfig)
}
return nil
}
func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.Username
saramaConfig.Net.SASL.Password = config.Password
}
func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
if config.Username == "" {
return fmt.Errorf("username have to be provided")
}
if config.Password == "" {
return fmt.Errorf("password have to be provided")
}
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.Username
saramaConfig.Net.SASL.Password = config.Password
switch config.Mechanism {
case "SCRAM-SHA-512":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha512.New} }
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "SCRAM-SHA-256":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha256.New} }
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "PLAIN":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
case "AWS_MSK_IAM":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
}
saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
default:
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
}
switch config.Version {
case 0:
saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV0
case 1:
saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV1
default:
return fmt.Errorf(`invalid SASL Protocol Version %d: can be either 0 or 1`, config.Version)
}
return nil
}
func configureTLS(config configtls.ClientConfig, saramaConfig *sarama.Config) error {
tlsConfig, err := config.LoadTLSConfig(context.Background())
if err != nil {
return fmt.Errorf("error loading tls config: %w", err)
}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
return nil
}
func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaConfig.Net.SASL.Enable = true
if config.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.Password = config.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
}