Skip to content

Commit

Permalink
chore: getSSHConfig improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Mar 14, 2023
1 parent 7aa4ec0 commit 9a49c25
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
38 changes: 32 additions & 6 deletions services/streammanager/kafka/kafkamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -270,9 +271,14 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*Produ
}
}

sshConfig, err := getSSHConfig(destination.ID, config.Default)
if err != nil {
return nil, fmt.Errorf("[Kafka] invalid SSH configuration: %w", err)
}

clientConf := client.Config{
DialTimeout: kafkaDialTimeout,
SSHConfig: getSSHConfig(config.Default),
SSHConfig: sshConfig,
}
if destConfig.SslEnabled {
if destConfig.CACertificate != "" {
Expand Down Expand Up @@ -661,14 +667,34 @@ func getStatusCodeFromError(err error) int {
return 400
}

func getSSHConfig(c *config.Config) *client.SSHConfig {
if !c.GetBool("ROUTER_KAFKA_SSH_ENABLED", false) {
return nil
func getSSHConfig(destinationID string, c *config.Config) (*client.SSHConfig, error) {
enabled := strings.Split(c.GetString("ROUTER_KAFKA_SSH_ENABLED", ""), ",")

var found bool
for _, id := range enabled {
if id == destinationID {
found = true
break
}
}
if !found {
return nil, nil
}

privateKey := c.GetString("ROUTER_KAFKA_SSH_PRIVATE_KEY", "")
if privateKey == "" {
return nil, fmt.Errorf("kafka SSH private key is not set")
}

rawPrivateKey, err := base64.StdEncoding.DecodeString(privateKey)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 private key: %w", err)
}

return &client.SSHConfig{
User: c.GetString("ROUTER_KAFKA_SSH_USER", ""),
Host: c.GetString("ROUTER_KAFKA_SSH_HOST", ""),
PrivateKey: c.GetString("ROUTER_KAFKA_SSH_PRIVATE_KEY", ""),
PrivateKey: string(rawPrivateKey),
AcceptAnyHostKey: c.GetBool("ROUTER_KAFKA_SSH_ACCEPT_ANY_HOST_KEY", false),
}
}, nil
}
52 changes: 52 additions & 0 deletions services/streammanager/kafka/kafkamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
mockStats "github.com/rudderlabs/rudder-server/mocks/services/stats"
"github.com/rudderlabs/rudder-server/services/streammanager/common"
Expand Down Expand Up @@ -838,6 +839,57 @@ func TestPublish(t *testing.T) {
})
}

func TestSSHConfig(t *testing.T) {
t.Run("not enabled", func(t *testing.T) {
c := config.New()
conf, err := getSSHConfig("some id", c)
require.NoError(t, err)
require.Nil(t, conf)
})

t.Run("enabled for another destination", func(t *testing.T) {
c := config.New()
c.Set("ROUTER_KAFKA_SSH_ENABLED", "dest1,dest3")
conf, err := getSSHConfig("dest2", c)
require.NoError(t, err)
require.Nil(t, conf)
})

t.Run("no private key", func(t *testing.T) {
c := config.New()
c.Set("ROUTER_KAFKA_SSH_ENABLED", "dest0,dest1,dest5")
conf, err := getSSHConfig("dest1", c)
require.ErrorContains(t, err, "kafka SSH private key is not set")
require.Nil(t, conf)
})

t.Run("no base64 private key", func(t *testing.T) {
c := config.New()
c.Set("ROUTER_KAFKA_SSH_ENABLED", "dest0,dest1,dest5")
c.Set("ROUTER_KAFKA_SSH_PRIVATE_KEY", "not base64 encoded")
conf, err := getSSHConfig("dest1", c)
require.ErrorContains(t, err, "failed to decode base64 private key")
require.Nil(t, conf)
})

t.Run("ok", func(t *testing.T) {
c := config.New()
c.Set("ROUTER_KAFKA_SSH_ENABLED", "dest0,dest1,dest5")
c.Set("ROUTER_KAFKA_SSH_PRIVATE_KEY", "a2V5IGNvbnRlbnQ=")
c.Set("ROUTER_KAFKA_SSH_USER", "some-user")
c.Set("ROUTER_KAFKA_SSH_HOST", "1.2.3.4:22")
c.Set("ROUTER_KAFKA_SSH_ACCEPT_ANY_HOST_KEY", "true")
conf, err := getSSHConfig("dest1", c)
require.NoError(t, err)
require.Equal(t, &client.SSHConfig{
User: "some-user",
Host: "1.2.3.4:22",
PrivateKey: "key content",
AcceptAnyHostKey: true,
}, conf)
})
}

func getMockedTimer(t *testing.T, ctrl *gomock.Controller) *mockStats.MockMeasurement {
t.Helper()
mockedTimer := mockStats.NewMockMeasurement(ctrl)
Expand Down

0 comments on commit 9a49c25

Please sign in to comment.