diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 8adb1a687..83ab6342a 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -66,10 +66,10 @@ type Endpoint struct { Host string Port int TopicName string - SASL bool + Auth string + SSL bool SASLSHA256 bool SASLSHA512 bool - TLS bool CACertFile string CertFile string KeyFile string @@ -422,16 +422,16 @@ func parseEndpoint(s string) (Endpoint, error) { continue } switch key { - case "tls": - endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0]) + case "auth": + endpoint.Kafka.Auth = val[0] + case "ssl": + endpoint.Kafka.SSL, _ = strconv.ParseBool(val[0]) case "cacert": endpoint.Kafka.CACertFile = val[0] case "cert": endpoint.Kafka.CertFile = val[0] case "key": endpoint.Kafka.KeyFile = val[0] - case "sasl": - endpoint.Kafka.SASL, _ = strconv.ParseBool(val[0]) case "sha256": endpoint.Kafka.SASLSHA256, _ = strconv.ParseBool(val[0]) case "sha512": diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index d84e0f722..86ed770be 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -71,18 +71,30 @@ func (conn *KafkaConn) Send(msg string) error { if conn.conn == nil { cfg := sarama.NewConfig() - if conn.ep.Kafka.TLS { - log.Debugf("building kafka tls config") - tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) - if err != nil { - cfg.MetricRegistry.UnregisterAll() - return err + cfg.Net.DialTimeout = time.Second + cfg.Net.ReadTimeout = time.Second * 5 + cfg.Net.WriteTimeout = time.Second * 5 + // Fix #333 : fix backward incompatibility introduced by sarama library + cfg.Producer.Return.Successes = true + cfg.Version = sarama.V0_10_0_0 + + switch conn.ep.Kafka.Auth { + case "sasl": + // Other than TLS authentication, SASL does not require SSL + if conn.ep.Kafka.SSL { + tlsConfig := tls.Config{} + + log.Debugf("building kafka tls root config") + caCertPool, err := loadRootTLSCert(conn.ep.Kafka.CACertFile) + if err != nil { + return err + } + tlsConfig.RootCAs = &caCertPool + + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = &tlsConfig } - cfg.Net.TLS.Enable = true - cfg.Net.TLS.Config = tlsConfig - } - if conn.ep.Kafka.SASL { log.Debugf("building kafka sasl config") cfg.Net.SASL.Enable = true cfg.Net.SASL.User = os.Getenv("KAFKA_USERNAME") @@ -96,14 +108,29 @@ func (conn *KafkaConn) Send(msg string) error { cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 } - } + case "tls": + tlsConfig := tls.Config{} - cfg.Net.DialTimeout = time.Second - cfg.Net.ReadTimeout = time.Second * 5 - cfg.Net.WriteTimeout = time.Second * 5 - // Fix #333 : fix backward incompatibility introduced by sarama library - cfg.Producer.Return.Successes = true - cfg.Version = sarama.V0_10_0_0 + log.Debugf("building kafka tls client config") + certificates, err := loadClientTLSCert(conn.ep.Kafka.KeyFile, conn.ep.Kafka.CertFile) + if err != nil { + cfg.MetricRegistry.UnregisterAll() + return err + } + + // TLS authentication requires SSL + // and Tile38 requires certificates to be validated + caCertPool, err := loadRootTLSCert(conn.ep.Kafka.CACertFile) + if err != nil { + return err + } + + cfg.Net.TLS.Enable = true + + tlsConfig.Certificates = certificates + tlsConfig.RootCAs = &caCertPool + cfg.Net.TLS.Config = &tlsConfig + } c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { @@ -147,24 +174,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn { } } -func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) { - tlsConfig := tls.Config{} - - // Load client cert +func loadClientTLSCert(KeyFile, CertFile string) ([]tls.Certificate, error) { + // load client cert cert, err := tls.LoadX509KeyPair(CertFile, KeyFile) + if err != nil { - return &tlsConfig, err + return []tls.Certificate{cert}, err } - tlsConfig.Certificates = []tls.Certificate{cert} + return []tls.Certificate{cert}, err +} + +func loadRootTLSCert(CACertFile string) (x509.CertPool, error) { // Load CA cert caCert, err := ioutil.ReadFile(CACertFile) + if err != nil { - return &tlsConfig, err + return x509.CertPool{}, err } + caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) - tlsConfig.RootCAs = caCertPool - - return &tlsConfig, err + return *caCertPool, err }