Skip to content

Commit

Permalink
fix(warehouse): grouping uploads for processing pickup (#3039)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Feb 27, 2023
1 parent 63fd288 commit 4832630
Show file tree
Hide file tree
Showing 2 changed files with 364 additions and 2 deletions.
5 changes: 3 additions & 2 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit
`+uploadColumns+`
FROM (
SELECT
ROW_NUMBER() OVER (PARTITION BY %s ORDER BY COALESCE(metadata->>'priority', '100')::int ASC, id ASC) AS row_number,
ROW_NUMBER() OVER (PARTITION BY %s ORDER BY COALESCE(metadata->>'priority', '100')::int ASC, COALESCE(first_event_at, NOW()) ASC, id ASC) AS row_number,
t.*
FROM
`+uploadsTableName+` t
Expand All @@ -263,7 +263,8 @@ func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit
grouped_uploads.row_number = 1
ORDER BY
COALESCE(metadata->>'priority', '100')::int ASC,
COALESCE(first_event_at, NOW()) ASC
COALESCE(first_event_at, NOW()) ASC,
id ASC
LIMIT %d;
`,
partitionIdentifierSQL,
Expand Down
361 changes: 361 additions & 0 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package repo_test

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -140,6 +143,364 @@ func TestUploads_Get(t *testing.T) {
})
}

func TestUploads_GetToProcess(t *testing.T) {
var (
workspaceID = "workspace_id"
namespace = "namespace"
sourceID = "source_id"
destID = "dest_id"
sourceTaskRunID = "source_task_run_id"
sourceJobID = "source_job_id"
sourceJobRunID = "source_job_run_id"
loadFileType = "csv"
destType = warehouseutils.RS
ctx = context.Background()
)

prepareUpload := func(db *sql.DB, sourceID string, status model.UploadStatus, priority int, now, nextRetryTime time.Time) model.Upload {
stagingFileID := int64(0)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))

var (
startStagingFileID = atomic.AddInt64(&stagingFileID, 1)
endStagingFileID = atomic.AddInt64(&stagingFileID, 1)
firstEventAt = now.Add(-30 * time.Minute)
lastEventAt = now
)

ogUpload := model.Upload{
WorkspaceID: workspaceID,
Namespace: namespace,
SourceID: sourceID,
DestinationID: destID,
DestinationType: destType,
Status: status,
Error: []byte("{}"),
FirstEventAt: firstEventAt,
LastEventAt: lastEventAt,
LoadFileType: loadFileType,
Priority: priority,
NextRetryTime: nextRetryTime,
Retried: true,

StagingFileStartID: startStagingFileID,
StagingFileEndID: endStagingFileID,
LoadFileStartID: 0,
LoadFileEndID: 0,
Timings: nil,
FirstAttemptAt: time.Time{},
LastAttemptAt: time.Time{},
Attempts: 0,
UploadSchema: model.Schema{},
MergedSchema: model.Schema{},

UseRudderStorage: true,
SourceTaskRunID: sourceTaskRunID,
SourceJobID: sourceJobID,
SourceJobRunID: sourceTaskRunID,
}

files := []model.StagingFile{
{
ID: startStagingFileID,
FirstEventAt: firstEventAt,

UseRudderStorage: true,
SourceTaskRunID: sourceTaskRunID,
SourceJobID: sourceJobID,
SourceJobRunID: sourceJobRunID,
},
{
ID: endStagingFileID,
LastEventAt: lastEventAt,
},
}
for _, file := range files {
s := file.WithSchema(nil)
_, err := repoStaging.Insert(ctx, &s)
require.NoError(t, err)
}

id, err := repoUpload.CreateWithStagingFiles(ctx, ogUpload, files)
require.NoError(t, err)
ogUpload.ID = id

return ogUpload
}

t.Run("none present", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
priority = 100

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.Aborted, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.ExportedData, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 2)

toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 0)
})

t.Run("skip identifier", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
priority = 100

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.Waiting, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.ExportingDataFailed, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 2)

toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[0].ID, toProcess[0].ID)

skipIdentifier := fmt.Sprintf("%s_%s", destID, namespace)
toProcess, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
SkipIdentifiers: []string{skipIdentifier},
})
require.NoError(t, err)
require.Len(t, toProcess, 0)
})

t.Run("skip workspaces", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
priority = 100

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.Waiting, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.ExportingDataFailed, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 2)

toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[0].ID, toProcess[0].ID)

toProcess, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
SkipWorkspaces: []string{workspaceID},
})
require.NoError(t, err)
require.Len(t, toProcess, 0)
})

t.Run("ordering by priority", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
lowPriority = 100
highPriority = 0
differentSourceID = "source_id_2"

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.Waiting, lowPriority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.Waiting, highPriority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.Waiting, lowPriority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.Waiting, highPriority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 4)

toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[1].ID, toProcess[0].ID)

toProcess, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
AllowMultipleSourcesForJobsPickup: true,
})
require.NoError(t, err)
require.Len(t, toProcess, 2)
require.Equal(t, uploads[1].ID, toProcess[0].ID)
require.Equal(t, uploads[3].ID, toProcess[1].ID)
})

t.Run("ordering by first event at", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
lowPriority = 100
highPriority = 0
differentSourceID = "source_id_2"

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.ExportingDataFailed, lowPriority,
time.Date(2021, 1, 1, 0, 40, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.Waiting, highPriority,
time.Date(2021, 1, 1, 0, 25, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.GeneratedUploadSchema, lowPriority,
time.Date(2021, 1, 1, 0, 30, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.GeneratingLoadFiles, highPriority,
time.Date(2021, 1, 1, 0, 15, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 4)

toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[3].ID, toProcess[0].ID)

toProcess, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
AllowMultipleSourcesForJobsPickup: true,
})
require.NoError(t, err)
require.Len(t, toProcess, 2)
require.Equal(t, uploads[3].ID, toProcess[0].ID)
require.Equal(t, uploads[1].ID, toProcess[1].ID)
})

t.Run("allow multiple sources for jobs pickup", func(t *testing.T) {
t.Parallel()

var (
db = setupDB(t)
repoUpload = repo.NewUploads(db)
priority = 100
differentSourceID = "source_id_2"

uploads []model.Upload
)

uploads = append(uploads,
prepareUpload(db, sourceID, model.Waiting, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, sourceID, model.ExportingDataFailed, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.Waiting, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
prepareUpload(db, differentSourceID, model.ExportingDataFailed, priority,
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
),
)
require.Len(t, uploads, 4)

t.Run("single source", func(t *testing.T) {
toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[0].ID, toProcess[0].ID)
})

t.Run("multiple sources", func(t *testing.T) {
toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
AllowMultipleSourcesForJobsPickup: true,
})
require.NoError(t, err)
require.Len(t, toProcess, 2)
require.Equal(t, uploads[0].ID, toProcess[0].ID)
require.Equal(t, uploads[2].ID, toProcess[1].ID)
})

t.Run("skip few identifiers", func(t *testing.T) {
toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
SkipIdentifiers: []string{
fmt.Sprintf("%s_%s_%s", sourceID, destID, namespace),
},
AllowMultipleSourcesForJobsPickup: true,
})
require.NoError(t, err)
require.Len(t, toProcess, 1)
require.Equal(t, uploads[2].ID, toProcess[0].ID)
})

t.Run("skip all identifiers", func(t *testing.T) {
toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
SkipIdentifiers: []string{
fmt.Sprintf("%s_%s_%s", sourceID, destID, namespace),
fmt.Sprintf("%s_%s_%s", differentSourceID, destID, namespace),
},
AllowMultipleSourcesForJobsPickup: true,
})
require.NoError(t, err)
require.Len(t, toProcess, 0)
})
})
}

func TestUploads_Processing(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 4832630

Please sign in to comment.