Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move warehouse handle to router #3687

Merged
merged 67 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
b1b0c1b
chore: typo
fracasula Jul 7, 2023
fbc204f
chore: backend-config manager
fracasula Jul 7, 2023
ba0427d
fix: retry
fracasula Jul 7, 2023
1deb4ac
fix: api
fracasula Jul 7, 2023
58f4dfa
fix: admin
fracasula Jul 7, 2023
451348c
chore: updating tests
fracasula Jul 10, 2023
87ff175
chore: removing unused attributes
fracasula Jul 10, 2023
f47aaab
chore: bigquery test fix
fracasula Jul 10, 2023
301e17a
chore: linting
fracasula Jul 10, 2023
0bc99d5
chore: moving constructor
fracasula Jul 10, 2023
44aeb08
chore: refactor wh grpc test
fracasula Jul 18, 2023
7b95baa
chore: removing duplicated code in test
fracasula Jul 19, 2023
cdd1bc8
chore: cleaning up
fracasula Jul 19, 2023
b8d36a0
chore: ssh tunneling and logger
fracasula Jul 19, 2023
8dd4296
chore: removing unused client
fracasula Jul 19, 2023
ccf5134
chore: better routine management
fracasula Jul 19, 2023
a5e75a2
chore: rename variable
fracasula Jul 19, 2023
7e3d25f
chore: removing TODOs
fracasula Jul 19, 2023
2a30339
chore: more efficient workerIdentifier()
fracasula Jul 19, 2023
c70b229
chore: adding TODOs
fracasula Jul 19, 2023
d79319d
chore: adding info to backendConfigSubscriber
fracasula Jul 19, 2023
39cd70b
chore: optimizing concurrent code
fracasula Jul 20, 2023
0ad24f7
chore: using logger
fracasula Jul 21, 2023
ee7b1e1
chore: initialConfigFetched
fracasula Jul 21, 2023
5c756c7
chore: moving var
fracasula Jul 21, 2023
642dd40
chore: TODO
fracasula Jul 21, 2023
b24e638
chore: wrapped db handle
fracasula Jul 21, 2023
077abc3
chore: moving workerChannelMap initialization
fracasula Jul 21, 2023
8f9f564
chore: removing unused field
fracasula Jul 21, 2023
009d557
chore: removing unused variable
fracasula Jul 26, 2023
ca754ee
chore: go-kit v0.15.4
fracasula Jul 26, 2023
b4305f3
fix: alteredSchemaInAtLeastOneTable data race
fracasula Jul 26, 2023
fcd31db
chore: send all warehouses to subscriptions
fracasula Jul 26, 2023
53d0def
chore: formatting
fracasula Jul 26, 2023
99be513
chore: removing unnecessary logic
fracasula Jul 27, 2023
1f0f024
chore: reformat
fracasula Jul 27, 2023
3fc87a8
chore: added test case
achettyiitr Jul 31, 2023
f770eb4
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Jul 31, 2023
552608e
chore: move handletT to router
achettyiitr Jul 31, 2023
fe6d88a
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Jul 31, 2023
08a42d6
chore: source pull
achettyiitr Jul 31, 2023
9909d35
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
2af046c
Merge remote-tracking branch 'origin/chore.whBackendConfig' into chor…
achettyiitr Jul 31, 2023
a813753
chore: temp
achettyiitr Jul 31, 2023
5bbf8d8
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
a6bb35c
chore: source pull
achettyiitr Aug 1, 2023
e3a6c6e
chore: added test case for many subscribers data race
achettyiitr Jul 31, 2023
af48890
chore: make fmt
achettyiitr Aug 1, 2023
62dee81
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Aug 1, 2023
11471e1
Merge remote-tracking branch 'origin/chore.whBackendConfig' into chor…
achettyiitr Aug 1, 2023
1c8e8fe
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Aug 1, 2023
3da9c4d
chore: source pull
achettyiitr Aug 3, 2023
3034cde
chore: added test cases
achettyiitr Aug 3, 2023
7e98bdd
chore: added test cases
achettyiitr Aug 3, 2023
b1ad349
chore: master pull
achettyiitr Aug 7, 2023
e031635
chore: addressing review coments from #3602
achettyiitr Aug 7, 2023
1a7a1a3
Merge remote-tracking branch 'origin/chore.whBackendConfig+review-com…
achettyiitr Aug 7, 2023
c239c77
chore: fixing imports
fracasula Aug 7, 2023
6e848c2
chore: master pull and some more changes
achettyiitr Aug 7, 2023
736a1fc
Merge branch 'chore.whBackendConfig+warehouse-handle-to-router' of gi…
achettyiitr Aug 7, 2023
c8ea735
add test for uploads repo reset in progress
achettyiitr Aug 7, 2023
53a1f6d
chore: added scheduling tests.
achettyiitr Aug 8, 2023
3fd347b
chore: some more changes
achettyiitr Aug 8, 2023
4f00a9d
chore: comments for tracker
achettyiitr Aug 8, 2023
fbbc869
chore: review comments
achettyiitr Aug 8, 2023
f6cb961
Merge remote-tracking branch 'origin/chore.whBackendConfig+review-com…
achettyiitr Aug 8, 2023
867d658
chore: review comments
achettyiitr Aug 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,6 @@ func runAllInit() {
jobsdb.Init()
jobsdb.Init2()
warehouse.Init()
warehouse.Init2()
warehouse.Init3()
warehouse.Init4()
warehouse.Init6()
warehousearchiver.Init()
Expand Down
5 changes: 3 additions & 2 deletions warehouse/backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func (s *backendConfigManager) Start(ctx context.Context) {
func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.Warehouse {
s.subscriptionsMu.Lock()
defer s.subscriptionsMu.Unlock()
s.warehousesMu.Lock()
defer s.warehousesMu.Unlock()

ch := make(chan []model.Warehouse, 10)
s.subscriptions = append(s.subscriptions, ch)

s.warehousesMu.Lock()
if len(s.warehouses) > 0 {
ch <- s.warehouses
}
s.warehousesMu.Unlock()

go func() {
<-ctx.Done()
Expand Down Expand Up @@ -249,6 +249,7 @@ func (s *backendConfigManager) namespace(ctx context.Context, source backendconf
logfield.DestinationID, destination.ID,
logfield.DestinationType, destination.DestinationDefinition.Name,
logfield.WorkspaceID, destination.WorkspaceID,
logfield.Error, err.Error(),
)
return ""
}
Expand Down
25 changes: 11 additions & 14 deletions warehouse/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,15 @@ func TestBackendConfigManager(t *testing.T) {

func TestBackendConfigManager_Namespace(t *testing.T) {
testcases := []struct {
name string
config map[string]interface{}
source backendconfig.SourceT
destination backendconfig.DestinationT
expectedNamespace string
setConfig bool
}{
{
name: "clickhouse with database configured in config",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
Expand All @@ -308,6 +310,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "clickhouse without database configured in config",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
Expand All @@ -319,19 +322,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
"namespace": "test_namespace",
},
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: "test-destinationType-1",
},
},
expectedNamespace: "test_namespace",
setConfig: false,
},
{
name: "namespace only contains letters",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
Expand All @@ -345,6 +336,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "namespace only contains special characters",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
Expand All @@ -358,6 +350,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "namespace contains special characters and letters",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{
Expand All @@ -371,6 +364,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "empty namespace but config is set",
source: backendconfig.SourceT{},
destination: backendconfig.DestinationT{
Config: map[string]interface{}{},
Expand All @@ -382,6 +376,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: true,
},
{
name: "empty namespace with picking from cache",
source: backendconfig.SourceT{
Name: "test-source",
ID: "test-sourceID",
Expand All @@ -397,6 +392,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "destination config without namespace configured and custom dataset prefix is not configured",
source: backendconfig.SourceT{
Name: "test-source",
ID: "random-sourceID",
Expand All @@ -412,6 +408,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
setConfig: false,
},
{
name: "destination config without namespace configured and custom dataset prefix configured",
source: backendconfig.SourceT{
Name: "test-source",
ID: "test-sourceID",
Expand All @@ -431,7 +428,7 @@ func TestBackendConfigManager_Namespace(t *testing.T) {
for _, tc := range testcases {
tc := tc

t.Run("should return namespace", func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

Expand Down
51 changes: 51 additions & 0 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,54 @@ func (uploads *Uploads) PendingTableUploads(ctx context.Context, namespace strin
}
return pendingTableUploads, nil
}

func (uploads *Uploads) ResetInProgress(ctx context.Context, destType string) error {
_, err := uploads.db.ExecContext(ctx, `
UPDATE
`+uploadsTableName+`
SET
in_progress = FALSE
WHERE
destination_type = $1 AND
in_progress = TRUE;
`,
destType,
)
if err != nil {
return fmt.Errorf("reset in progress: %w", err)
}
return nil
}

func (uploads *Uploads) LastCreatedAt(ctx context.Context, sourceID, destinationID string) (time.Time, error) {
row := uploads.db.QueryRowContext(ctx, `
SELECT
created_at
FROM
`+uploadsTableName+`
WHERE
source_id = $1 AND
destination_id = $2
ORDER BY
id DESC
LIMIT 1;
`,
sourceID,
destinationID,
)

var createdAt sql.NullTime

err := row.Scan(&createdAt)
if err == sql.ErrNoRows {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, fmt.Errorf("last created at: %w", err)
}
if !createdAt.Valid {
return time.Time{}, nil
}

return createdAt.Time, nil
}
122 changes: 122 additions & 0 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,3 +1000,125 @@ func TestUploads_PendingTableUploads(t *testing.T) {
require.EqualError(t, err, "pending table uploads: context canceled")
})
}

func TestUploads_ResetInProgress(t *testing.T) {
const (
sourceID = "source_id"
destinationID = "destination_id"
destinationType = "destination_type"
)

db, ctx := setupDB(t), context.Background()

now := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("success", func(t *testing.T) {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))

stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

uploadID, err := repoUpload.CreateWithStagingFiles(ctx, model.Upload{
SourceID: sourceID,
DestinationID: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
}, []*model.StagingFile{
{
ID: stagingID,
SourceID: sourceID,
DestinationID: destinationID,
},
})
require.NoError(t, err)

_, err = db.ExecContext(ctx, `UPDATE wh_uploads SET in_progress = TRUE WHERE id = $1;`, uploadID)
require.NoError(t, err)

uploadsToProcess, err := repoUpload.GetToProcess(ctx, destinationType, 1, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, uploadsToProcess, 0)

err = repoUpload.ResetInProgress(ctx, destinationType)
require.NoError(t, err)

uploadsToProcess, err = repoUpload.GetToProcess(ctx, destinationType, 1, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, uploadsToProcess, 1)
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel()

err := repoUpload.ResetInProgress(ctx, destinationType)
require.EqualError(t, err, "reset in progress: context canceled")
})
}

func TestUploads_LastCreatedAt(t *testing.T) {
const (
sourceID = "source_id"
destinationID = "destination_id"
destinationType = "destination_type"
)

db, ctx := setupDB(t), context.Background()

now := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("many uploads", func(t *testing.T) {
for i := 0; i < 5; i++ {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))
stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now.Add(time.Second * time.Duration(i+1))
}))

_, err = repoUpload.CreateWithStagingFiles(ctx, model.Upload{
SourceID: sourceID,
DestinationID: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
}, []*model.StagingFile{
{
ID: stagingID,
SourceID: sourceID,
DestinationID: destinationID,
},
})
require.NoError(t, err)
}

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.NoError(t, err)
require.Equal(t, lastCreatedAt.UTC(), now.Add(time.Second*5).UTC())
})

t.Run("no uploads", func(t *testing.T) {
lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, "unknown_source", "unknown_destination")
require.NoError(t, err)
require.Equal(t, lastCreatedAt, time.Time{})
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel()

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.EqualError(t, err, "last created at: context canceled")
require.Equal(t, lastCreatedAt, time.Time{})
})
}
Loading
Loading