Skip to content

Commit

Permalink
feat: kafka over ssh
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Mar 14, 2023
1 parent b9deb4a commit 7aa4ec0
Show file tree
Hide file tree
Showing 16 changed files with 454 additions and 90 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sys v0.5.0 // indirect
Expand Down
33 changes: 33 additions & 0 deletions services/streammanager/kafka/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package client
import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go"
"golang.org/x/crypto/ssh"
)

// Logger specifies a logger used to report internal changes within the consumer
Expand Down Expand Up @@ -64,6 +66,37 @@ func New(network string, addresses []string, conf Config) (*Client, error) {
}
}

if conf.SSHConfig != nil {
signer, err := ssh.ParsePrivateKey([]byte(conf.SSHConfig.PrivateKey))
if err != nil {
return nil, fmt.Errorf("cannot parse SSH private key: %w", err)
}

sshConfig := &ssh.ClientConfig{
User: conf.SSHConfig.User,
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
Timeout: conf.DialTimeout,
}
if conf.SSHConfig.AcceptAnyHostKey {
sshConfig.HostKeyCallback = ssh.InsecureIgnoreHostKey() // skipcq: GSC-G106
}

dialer.DialFunc = func(ctx context.Context, network, address string) (net.Conn, error) {
sshClient, err := ssh.Dial("tcp", conf.SSHConfig.Host, sshConfig)
if err != nil {
return nil, fmt.Errorf("cannot dial SSH host %q: %w", conf.SSHConfig.Host, err)
}

conn, err := sshClient.Dial(network, address)
if err != nil {
return nil, fmt.Errorf(
"cannot dial address %q over SSH (host %q): %w", address, conf.SSHConfig.Host, err,
)
}
return conn, nil
}
}

return &Client{
network: network,
addresses: addresses,
Expand Down
111 changes: 107 additions & 4 deletions services/streammanager/kafka/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"time"

"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/services/streammanager/kafka/client/testutil"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/testhelper/destination/sshserver"
)

const (
Expand Down Expand Up @@ -344,8 +346,8 @@ func TestWithSASL(t *testing.T) {
{Username: "client1", Password: "password"},
},
CertificatePassword: "password",
KeyStorePath: filepath.Join(path, "/testdata/keystore/kafka.keystore.jks"),
TrustStorePath: filepath.Join(path, "/testdata/truststore/kafka.truststore.jks"),
KeyStorePath: filepath.Join(path, "testdata", "keystore", "kafka.keystore.jks"),
TrustStorePath: filepath.Join(path, "testdata", "truststore", "kafka.truststore.jks"),
}

hashTypes := []ScramHashGenerator{ScramPlainText, ScramSHA256, ScramSHA512}
Expand Down Expand Up @@ -435,8 +437,8 @@ func TestWithSASLBadCredentials(t *testing.T) {
{Username: "client1", Password: "password"},
},
CertificatePassword: "password",
KeyStorePath: filepath.Join(path, "/testdata/keystore/kafka.keystore.jks"),
TrustStorePath: filepath.Join(path, "/testdata/truststore/kafka.truststore.jks"),
KeyStorePath: filepath.Join(path, "testdata", "keystore", "kafka.keystore.jks"),
TrustStorePath: filepath.Join(path, "testdata", "truststore", "kafka.truststore.jks"),
}

pool, err := dockertest.NewPool("")
Expand Down Expand Up @@ -724,6 +726,107 @@ func TestAzureEventHubsCloud(t *testing.T) {
require.Contains(t, err.Error(), "SASL Authentication failed")
}

func TestSSH(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

// Start shared Docker network
network, err := pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: "kafka_network"})
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.Client.RemoveNetwork(network.ID); err != nil {
t.Logf("Error while removing Docker network: %v", err)
}
})

// Start Kafka cluster with ZooKeeper and one broker
_, err = destination.SetupKafka(pool, &testCleanup{t},
destination.WithBrokers(3),
destination.WithLogger(t),
destination.WithNetwork(network),
destination.WithoutDockerHostListeners(),
)
require.NoError(t, err)

// Let's setup the SSH server
publicKeyPath, err := filepath.Abs("./testdata/ssh/test_key.pub")
require.NoError(t, err)
sshServer, err := sshserver.Setup(pool, &testCleanup{t},
sshserver.WithPublicKeyPath(publicKeyPath),
sshserver.WithCredentials("linuxserver.io", ""),
sshserver.WithDockerNetwork(network),
sshserver.WithLogger(t),
)
require.NoError(t, err)
sshServerHost := fmt.Sprintf("localhost:%s", sshServer.Port)
t.Logf("SSH server is listening on %s", sshServerHost)

// Read private key
privateKey, err := os.ReadFile("./testdata/ssh/test_key")
require.NoError(t, err)

// Setup client and ping
ctx := context.Background()
c, err := New("tcp", []string{"kafka1:9092"}, Config{
SSHConfig: &SSHConfig{
User: "linuxserver.io",
Host: sshServerHost,
PrivateKey: string(privateKey),
AcceptAnyHostKey: true,
},
})
require.NoError(t, err)
require.Eventuallyf(t, func() bool { err = c.Ping(ctx); return err == nil }, 30*time.Second, time.Second,
"could not ping kafka: %v", err,
)

// Create topic for test
tc := testutil.NewWithDialer(c.dialer, c.network, c.addresses...)
require.Eventually(t, func() bool {
err := tc.CreateTopic(ctx, t.Name(), 1, 1) // partitions = 1, replication factor = 1
if err != nil {
t.Logf("Could not create topic: %v", err)
}
return err == nil
}, defaultTestTimeout, time.Second)

// Check that the topic has been created with the right number of partitions
topics, err := tc.ListTopics(ctx)
require.NoError(t, err)
require.Equal(t, []testutil.TopicPartition{{Topic: t.Name(), Partition: 0}}, topics)

// Check producer
producerConf := ProducerConfig{ClientID: "producer-01"}
if testing.Verbose() {
producerConf.Logger = &testLogger{t}
producerConf.ErrorLogger = producerConf.Logger
}
p, err := c.NewProducer(producerConf)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := p.Close(ctx); err != nil {
t.Logf("Error closing producer: %v", err)
}
})

pubCtx, pubCancel := context.WithTimeout(ctx, 30*time.Second)
defer pubCancel()
require.NoError(t, p.Publish(pubCtx,
Message{Key: []byte("key-01"), Value: []byte("value-01"), Topic: t.Name()},
))

// Verify that the message has been published and it's readable
consumer := c.NewConsumer(t.Name(), ConsumerConfig{})
consumerCtx, consumerCancel := context.WithTimeout(ctx, 10*time.Second)
defer consumerCancel()
msg, err := consumer.Receive(consumerCtx)
require.NoError(t, err)
require.Equal(t, "key-01", string(msg.Key))
require.Equal(t, "value-01", string(msg.Value))
}

func publishMessages(ctx context.Context, t *testing.T, p *Producer, noOfMessages int) {
t.Helper()
t.Cleanup(func() {
Expand Down
6 changes: 6 additions & 0 deletions services/streammanager/kafka/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type Config struct {
DialTimeout time.Duration
TLS *TLS
SASL *SASL
SSHConfig *SSHConfig
}

type SSHConfig struct {
User, Host, PrivateKey string
AcceptAnyHostKey bool
}

func (c *Config) defaults() {
Expand Down
6 changes: 1 addition & 5 deletions services/streammanager/kafka/client/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"net"
"os"
"syscall"
"time"
Expand Down Expand Up @@ -40,12 +39,9 @@ type Producer struct {
func (c *Client) NewProducer(producerConf ProducerConfig) (p *Producer, err error) { // skipcq: CRT-P0003
producerConf.defaults()

dialer := &net.Dialer{
Timeout: c.config.DialTimeout,
}
transport := &kafka.Transport{
DialTimeout: c.config.DialTimeout,
Dial: dialer.DialContext,
Dial: c.dialer.DialFunc,
}
if producerConf.ClientID != "" {
transport.ClientID = producerConf.ClientID
Expand Down
39 changes: 39 additions & 0 deletions services/streammanager/kafka/client/testdata/ssh/test_key
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEA0f/mqkkZ3c9qw8MTz5FoEO3PGecO/dtUFfJ4g1UBu9E7hi/pyVYY
fLfdsd5bqA2pXdU0ROymyVe683I1VzJcihUtwB1eQxP1mUhmoo0ixK0IUUGm4PRieCGv+r
0/gMvaYbVGUPCi5tAUVh02vZB7p2cTIaz872lvCnRhYbhGUHSbhNSSQOjnCtZfjuZZnE0l
PKjWV/wbJ7Pvoc/FZMlWOqL1AjAKuwFH5zs1RMrPDDv5PCZksq4a7DDxziEdq39jvA3sOm
pQXvzBBBLBOzu7rM3/MPJb6dvAGJcYxkptfL4YXTscIMINr0g24cn+Thvt9yqA93rkb9RB
kw6RIEwMlQKqserA+pfsaoW0SkvnlDKzS1DLwXioL4Uc1Jpr/9jTMEfR+W7v7gJPB1JDnV
gen5FBfiMqbsG1amUS+mjgNfC8I00tR+CUHxpqUWANtcWTinhSnLJ2skj/2QnciPHkHurR
EKyEwCVecgn+xVKyRgVDCGsJ+QnAdn51+i/kO3nvAAAFqENNbN9DTWzfAAAAB3NzaC1yc2
EAAAGBANH/5qpJGd3PasPDE8+RaBDtzxnnDv3bVBXyeINVAbvRO4Yv6clWGHy33bHeW6gN
qV3VNETspslXuvNyNVcyXIoVLcAdXkMT9ZlIZqKNIsStCFFBpuD0Ynghr/q9P4DL2mG1Rl
DwoubQFFYdNr2Qe6dnEyGs/O9pbwp0YWG4RlB0m4TUkkDo5wrWX47mWZxNJTyo1lf8Gyez
76HPxWTJVjqi9QIwCrsBR+c7NUTKzww7+TwmZLKuGuww8c4hHat/Y7wN7DpqUF78wQQSwT
s7u6zN/zDyW+nbwBiXGMZKbXy+GF07HCDCDa9INuHJ/k4b7fcqgPd65G/UQZMOkSBMDJUC
qrHqwPqX7GqFtEpL55Qys0tQy8F4qC+FHNSaa//Y0zBH0flu7+4CTwdSQ51YHp+RQX4jKm
7BtWplEvpo4DXwvCNNLUfglB8aalFgDbXFk4p4UpyydrJI/9kJ3Ijx5B7q0RCshMAlXnIJ
/sVSskYFQwhrCfkJwHZ+dfov5Dt57wAAAAMBAAEAAAGAd9pxr+ag2LO0353LBMCcgGz5sn
LpX4F6cDw/A9XUc3lrW56k88AroaLe6NFbxoJlk6RHfL8EQg3MKX2Za/bWUgjcX7VjQy11
EtL7oPKkUVPgV1/8+o8AVEgFxDmWsM+oB/QJ+dAdaVaBBNUPlQmNSXHOvX2ZrpqiQXlCyx
79IpYq3JjmEB3dH5ZSW6CkrExrYD+MdhLw/Kv5rISEyI0Qpc6zv1fkB+8nNpXYRTbrDLR9
/xJ6jnBH9V3J5DeKU4MUQ39nrAp6iviyWydB973+MOygpy41fXO6hHyVZ2aSCysn1t6J/K
QdeEjqAOI/5CbdtiFGp06et799EFyzPItW0FKetW1UTOL2YHqdb+Q9sNjiNlUSzgxMbJWJ
RGO6g9B1mJsHl5mJZUiHQPsG/wgBER8VOP4bLOEB6gzVO2GE9HTJTOh5C+eEfrl52wPfXj
TqjtWAnhssxtgmWjkS0ibi+u1KMVXKHfaiqJ7nH0jMx+eu1RpMvuR8JqkU8qdMMGChAAAA
wHkQMfpCnjNAo6sllEB5FwjEdTBBOt7gu6nLQ2O3uGv0KNEEZ/BWJLQ5fKOfBtDHO+kl+5
Qoxc0cE7cg64CyBF3+VjzrEzuX5Tuh4NwrsjT4vTTHhCIbIynxEPmKzvIyCMuglqd/nhu9
6CXhghuTg8NrC7lY+cImiBfhxE32zqNITlpHW7exr95Gz1sML2TRJqxDN93oUFfrEuInx8
HpXXnvMQxPRhcp9nDMU9/ahUamMabQqVVMwKDi8n3sPPzTiAAAAMEA+/hm3X/yNotAtMAH
y11parKQwPgEF4HYkSE0bEe+2MPJmEk4M4PGmmt/MQC5N5dXdUGxiQeVMR+Sw0kN9qZjM6
SIz0YHQFMsxVmUMKFpAh4UI0GlsW49jSpVXs34Fg95AfhZOYZmOcGcYosp0huCeRlpLeIH
7Vv2bkfQaic3uNaVPg7+cXg7zdY6tZlzwa/4Fj0udfTjGQJOPSzIihdMLHnV81rZ2cUOZq
MSk6b02aMpVB4TV0l1w4j2mlF2eGD9AAAAwQDVW6p2VXKuPR7SgGGQgHXpAQCFZPGLYd8K
duRaCbxKJXzUnZBn53OX5fuLlFhmRmAMXE6ztHPN1/5JjwILn+O49qel1uUvzU8TaWioq7
Are3SJR2ZucR4AKUvzUHGP3GWW96xPN8lq+rgb0th1eOSU2aVkaIdeTJhV1iPfaUUf+15S
YcJlSHLGgeqkok+VfuudZ73f3RFFhjoe1oAjlPB4leeMsBD9UBLx2U3xAevnfkecF4Lm83
4sVswWATSFAFsAAAAsYWJoaW1hbnl1YmFiYmFyQEFiaGltYW55dXMtTWFjQm9vay1Qcm8u
bG9jYWwBAgMEBQYH
-----END OPENSSH PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDR/+aqSRndz2rDwxPPkWgQ7c8Z5w7921QV8niDVQG70TuGL+nJVhh8t92x3luoDald1TRE7KbJV7rzcjVXMlyKFS3AHV5DE/WZSGaijSLErQhRQabg9GJ4Ia/6vT+Ay9phtUZQ8KLm0BRWHTa9kHunZxMhrPzvaW8KdGFhuEZQdJuE1JJA6OcK1l+O5lmcTSU8qNZX/Bsns++hz8VkyVY6ovUCMAq7AUfnOzVEys8MO/k8JmSyrhrsMPHOIR2rf2O8Dew6alBe/MEEEsE7O7uszf8w8lvp28AYlxjGSm18vhhdOxwgwg2vSDbhyf5OG+33KoD3euRv1EGTDpEgTAyVAqqx6sD6l+xqhbRKS+eUMrNLUMvBeKgvhRzUmmv/2NMwR9H5bu/uAk8HUkOdWB6fkUF+IypuwbVqZRL6aOA18LwjTS1H4JQfGmpRYA21xZOKeFKcsnaySP/ZCdyI8eQe6tEQrITAJV5yCf7FUrJGBUMIawn5CcB2fnX6L+Q7ee8= abhimanyubabbar@Abhimanyus-MacBook-Pro.local
41 changes: 2 additions & 39 deletions services/streammanager/kafka/client/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package testutil
import (
"context"
"fmt"
"net"
"strconv"
"time"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -58,44 +56,9 @@ func (c *Client) CreateTopic(ctx context.Context, topic string, numPartitions, r
return err
}

var (
errors = make(chan error, 1)
brokers = make(chan kafka.Broker, 1)
)

go func() { // doing it asynchronously because conn.Controller() does not honour the context
b, err := conn.Controller()
if err != nil {
errors <- fmt.Errorf("could not get controller: %w", err)
return
}
if b.Host == "" {
errors <- fmt.Errorf("create topic: empty host")
return
}
brokers <- b
}()

var broker kafka.Broker
select {
case <-ctx.Done():
return ctx.Err()
case err = <-errors:
return err
case broker = <-brokers:
}

controllerConn, err := kafka.DialContext(ctx, c.network, net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)))
if err != nil {
return fmt.Errorf("could not dial via controller: %w", err)
}
defer func() {
// close asynchronously, if we block we might not respect the context
go func() { _ = controllerConn.Close() }()
}()

errors := make(chan error, 1)
go func() { // doing it asynchronously because controllerConn.CreateTopics() does not honour the context
errors <- controllerConn.CreateTopics(kafka.TopicConfig{
errors <- conn.CreateTopics(kafka.TopicConfig{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor,
Expand Down
13 changes: 13 additions & 0 deletions services/streammanager/kafka/kafkamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*Produ

clientConf := client.Config{
DialTimeout: kafkaDialTimeout,
SSHConfig: getSSHConfig(config.Default),
}
if destConfig.SslEnabled {
if destConfig.CACertificate != "" {
Expand Down Expand Up @@ -659,3 +660,15 @@ func getStatusCodeFromError(err error) int {
}
return 400
}

func getSSHConfig(c *config.Config) *client.SSHConfig {
if !c.GetBool("ROUTER_KAFKA_SSH_ENABLED", false) {
return nil
}
return &client.SSHConfig{
User: c.GetString("ROUTER_KAFKA_SSH_USER", ""),
Host: c.GetString("ROUTER_KAFKA_SSH_HOST", ""),
PrivateKey: c.GetString("ROUTER_KAFKA_SSH_PRIVATE_KEY", ""),
AcceptAnyHostKey: c.GetBool("ROUTER_KAFKA_SSH_ACCEPT_ANY_HOST_KEY", false),
}
}
14 changes: 14 additions & 0 deletions testhelper/destination/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package destination

type Logger interface {
Log(...interface{})
}

type Cleaner interface {
Cleanup(func())
Logger
}

type NOPLogger struct{}

func (*NOPLogger) Log(...interface{}) {}
Loading

0 comments on commit 7aa4ec0

Please sign in to comment.