Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: warehouse backend config refactoring #3602

Merged
merged 40 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b1b0c1b
chore: typo
fracasula Jul 7, 2023
fbc204f
chore: backend-config manager
fracasula Jul 7, 2023
ba0427d
fix: retry
fracasula Jul 7, 2023
1deb4ac
fix: api
fracasula Jul 7, 2023
58f4dfa
fix: admin
fracasula Jul 7, 2023
451348c
chore: updating tests
fracasula Jul 10, 2023
87ff175
chore: removing unused attributes
fracasula Jul 10, 2023
f47aaab
chore: bigquery test fix
fracasula Jul 10, 2023
301e17a
chore: linting
fracasula Jul 10, 2023
0bc99d5
chore: moving constructor
fracasula Jul 10, 2023
44aeb08
chore: refactor wh grpc test
fracasula Jul 18, 2023
7b95baa
chore: removing duplicated code in test
fracasula Jul 19, 2023
cdd1bc8
chore: cleaning up
fracasula Jul 19, 2023
b8d36a0
chore: ssh tunneling and logger
fracasula Jul 19, 2023
8dd4296
chore: removing unused client
fracasula Jul 19, 2023
ccf5134
chore: better routine management
fracasula Jul 19, 2023
a5e75a2
chore: rename variable
fracasula Jul 19, 2023
7e3d25f
chore: removing TODOs
fracasula Jul 19, 2023
2a30339
chore: more efficient workerIdentifier()
fracasula Jul 19, 2023
c70b229
chore: adding TODOs
fracasula Jul 19, 2023
d79319d
chore: adding info to backendConfigSubscriber
fracasula Jul 19, 2023
39cd70b
chore: optimizing concurrent code
fracasula Jul 20, 2023
0ad24f7
chore: using logger
fracasula Jul 21, 2023
ee7b1e1
chore: initialConfigFetched
fracasula Jul 21, 2023
5c756c7
chore: moving var
fracasula Jul 21, 2023
642dd40
chore: TODO
fracasula Jul 21, 2023
b24e638
chore: wrapped db handle
fracasula Jul 21, 2023
077abc3
chore: moving workerChannelMap initialization
fracasula Jul 21, 2023
8f9f564
chore: removing unused field
fracasula Jul 21, 2023
009d557
chore: removing unused variable
fracasula Jul 26, 2023
ca754ee
chore: go-kit v0.15.4
fracasula Jul 26, 2023
b4305f3
fix: alteredSchemaInAtLeastOneTable data race
fracasula Jul 26, 2023
fcd31db
chore: send all warehouses to subscriptions
fracasula Jul 26, 2023
53d0def
chore: formatting
fracasula Jul 26, 2023
99be513
chore: removing unnecessary logic
fracasula Jul 27, 2023
1f0f024
chore: reformat
fracasula Jul 27, 2023
3fc87a8
chore: added test case
achettyiitr Jul 31, 2023
fe6d88a
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Jul 31, 2023
e3a6c6e
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
1c8e8fe
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions warehouse/backend-config.go → warehouse/backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"sync"

"github.com/samber/lo"

"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -20,32 +22,32 @@ import (

// TODO: add tests
func newBackendConfigManager(
c *config.Config, // TODO possibly use this to get all the needed variables
wrappedDB *sqlquerywrapper.DB,
c *config.Config,
db *sqlquerywrapper.DB,
bc backendconfig.BackendConfig,
l logger.Logger,
log logger.Logger,
) *backendConfigManager {
if c == nil {
c = config.Default
}
if bc == nil {
bc = backendconfig.DefaultBackendConfig
}
if l == nil {
l = logger.NOP
if log == nil {
log = logger.NOP
}
bcm := &backendConfigManager{
conf: c,
db: wrappedDB,
schema: repo.NewWHSchemas(wrappedDB),
db: db,
schema: repo.NewWHSchemas(db),
backendConfig: bc,
logger: l,
logger: log,
initialConfigFetched: make(chan struct{}),
connectionsMap: make(map[string]map[string]model.Warehouse),
}
if config.GetBool("ENABLE_TUNNELLING", true) {
if c.GetBool("ENABLE_TUNNELLING", true) {
bcm.internalControlPlaneClient = cpclient.NewInternalClientWithCache(
backendconfig.GetConfigBackendURL(),
c.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com"),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we keep duplicating the variable name and its default? Isn't GetConfigBackendURL good enough?

Copy link
Member

@achettyiitr achettyiitr Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid calling global function for backendconfig was difficult to test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, because you replaced the bc with the tenant manager. Let's keep it this way for now then 👍

cpclient.BasicAuth{
Username: c.GetString("CP_INTERNAL_API_USERNAME", ""),
Password: c.GetString("CP_INTERNAL_API_PASSWORD", ""),
Expand All @@ -66,8 +68,9 @@ type backendConfigManager struct {

initialConfigFetched chan struct{}
closeInitialConfigFetchedOnce sync.Once
subscriptions []chan []model.Warehouse
subscriptionsMu sync.Mutex

subscriptions []chan []model.Warehouse
subscriptionsMu sync.Mutex

// variables to store the backend configuration
warehouses []model.Warehouse
Expand All @@ -86,7 +89,10 @@ func (s *backendConfigManager) Start(ctx context.Context) {
select {
case <-ctx.Done():
return
case data := <-ch:
case data, ok := <-ch:
if !ok {
return
Comment on lines +91 to +93
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achettyiitr nice catch 👍

}
s.processData(ctx, data.Data.(map[string]backendconfig.ConfigT))
}
}
Expand Down Expand Up @@ -246,26 +252,20 @@ func (s *backendConfigManager) SourceIDsByWorkspace() map[string][]string {
func (s *backendConfigManager) WarehousesBySourceID(sourceID string) []model.Warehouse {
s.warehousesMu.RLock()
defer s.warehousesMu.RUnlock()
var warehouses []model.Warehouse
for _, wh := range s.warehouses {
if wh.Source.ID == sourceID {
warehouses = append(warehouses, wh)
}
}
return warehouses

return lo.Filter(s.warehouses, func(w model.Warehouse, _ int) bool {
return w.Source.ID == sourceID
})
}

// WarehousesByDestID gets all WHs for the given destination ID
func (s *backendConfigManager) WarehousesByDestID(destID string) []model.Warehouse {
s.warehousesMu.RLock()
defer s.warehousesMu.RUnlock()
var warehouses []model.Warehouse
for _, wh := range s.warehouses {
if wh.Destination.ID == destID {
warehouses = append(warehouses, wh)
}
}
return warehouses

return lo.Filter(s.warehouses, func(w model.Warehouse, _ int) bool {
return w.Destination.ID == destID
})
}

func (s *backendConfigManager) attachSSHTunnellingInfo(
Expand All @@ -277,7 +277,8 @@ func (s *backendConfigManager) attachSSHTunnellingInfo(
return upstream
}

pkgLogger.Debugf("Fetching ssh keys for destination: %s", upstream.ID)
s.logger.Debugf("Fetching ssh keys for destination: %s", upstream.ID)

keys, err := s.internalControlPlaneClient.GetDestinationSSHKeys(ctx, upstream.ID)
if err != nil {
s.logger.Errorf("fetching ssh keys for destination: %s", err.Error())
Expand Down
247 changes: 247 additions & 0 deletions warehouse/backend_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package warehouse

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/golang/mock/gomock"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func TestBackendConfigManager(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

pgResource, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)

workspaceID := "test-workspace-id"
sourceID := "test-source-id"
destinationID := "test-destination-id"
namespace := "test-namespace"

unknownSourceID := "unknown-source-id"
unknownDestinationID := "unknown-destination-id"
unknownDestinationName := "unknown-destination-name"

ctrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(ctrl)
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), backendconfig.TopicBackendConfig).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ch := make(chan pubsub.DataEvent, 1)
ch <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
workspaceID: {
WorkspaceID: workspaceID,
Sources: []backendconfig.SourceT{
{
ID: sourceID,
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
},
},
},
},
{
ID: unknownSourceID,
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: unknownDestinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: unknownDestinationName,
},
Config: map[string]interface{}{},
},
},
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}
close(ch)
return ch
}).AnyTimes()

svc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"publicKey": "public_key", "privateKey": "private_key"}`))
}))
defer svc.Close()

c := config.New()
c.Set("CONFIG_BACKEND_URL", svc.URL)
c.Set("CP_INTERNAL_API_USERNAME", "username")
c.Set("CP_INTERNAL_API_PASSWORD", "password")

db := sqlquerywrapper.New(
pgResource.DB,
sqlquerywrapper.WithLogger(logger.NOP),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bcm := newBackendConfigManager(c, db, mockBackendConfig, logger.NOP)

t.Run("Subscriptions", func(t *testing.T) {
require.False(t, bcm.IsInitialized())
require.Equal(t, bcm.Connections(), map[string]map[string]model.Warehouse{})

csm, ok := bcm.ConnectionSourcesMap(destinationID)
require.False(t, ok)

require.Nil(t, csm)
require.Nil(t, bcm.SourceIDsByWorkspace())
require.Equal(t, bcm.WarehousesBySourceID(sourceID), []model.Warehouse{})
require.Equal(t, bcm.WarehousesByDestID(destinationID), []model.Warehouse{})

subscriptionsCh := bcm.Subscribe(ctx)

go func() {
bcm.Start(ctx)
}()

expectedWarehouse := model.Warehouse{
WorkspaceID: workspaceID,
Source: backendconfig.SourceT{
ID: sourceID,
Enabled: true,
Destinations: []backendconfig.DestinationT{
{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
},
},
},
},
Destination: backendconfig.DestinationT{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
},
},
Namespace: "test_namespace",
Type: warehouseutils.RS,
Identifier: "RS:test-source-id:test-destination-id",
}

require.Equal(t, <-subscriptionsCh, []model.Warehouse{expectedWarehouse})
require.True(t, bcm.IsInitialized())
require.Equal(t, bcm.Connections(), map[string]map[string]model.Warehouse{
destinationID: {
sourceID: expectedWarehouse,
},
})

csm, ok = bcm.ConnectionSourcesMap(destinationID)
require.True(t, ok)
require.Equal(t, csm, map[string]model.Warehouse{
sourceID: expectedWarehouse,
})

require.Equal(t, bcm.SourceIDsByWorkspace(), map[string][]string{
workspaceID: {sourceID, unknownSourceID},
})
require.Equal(t, bcm.WarehousesBySourceID(sourceID), []model.Warehouse{expectedWarehouse})
require.Equal(t, bcm.WarehousesByDestID(destinationID), []model.Warehouse{expectedWarehouse})
require.Equal(t, bcm.WarehousesBySourceID(unknownSourceID), []model.Warehouse{})
require.Equal(t, bcm.WarehousesByDestID(unknownDestinationID), []model.Warehouse{})
})

t.Run("Tunnelling", func(t *testing.T) {
testCases := []struct {
name string
input backendconfig.DestinationT
expected backendconfig.DestinationT
}{
{
name: "tunnelling disabled",
input: backendconfig.DestinationT{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
},
},
expected: backendconfig.DestinationT{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
},
},
},
{
name: "tunnelling enabled",
input: backendconfig.DestinationT{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
"useSSH": true,
},
},
expected: backendconfig.DestinationT{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: warehouseutils.RS,
},
Config: map[string]interface{}{
"namespace": namespace,
"useSSH": true,
"sshPrivateKey": "private_key",
},
},
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, bcm.attachSSHTunnellingInfo(ctx, tc.input), tc.expected)
})
}
})
}