Skip to content

Commit

Permalink
chore: move warehouse handle to router (#3687)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 9, 2023
1 parent 3cc17f3 commit e227046
Show file tree
Hide file tree
Showing 17 changed files with 2,683 additions and 1,196 deletions.
2 changes: 0 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,6 @@ func runAllInit() {
jobsdb.Init()
jobsdb.Init2()
warehouse.Init()
warehouse.Init2()
warehouse.Init3()
warehouse.Init4()
warehouse.Init6()
warehousearchiver.Init()
Expand Down
2 changes: 1 addition & 1 deletion warehouse/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {

// TriggerUpload sets uploads to start without delay
func (*Admin) TriggerUpload(off bool, reply *string) error {
startUploadAlways = !off
startUploadAlways.Store(!off)
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
} else {
Expand Down
48 changes: 48 additions & 0 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,51 @@ func (uploads *Uploads) PendingTableUploads(ctx context.Context, namespace strin
}
return pendingTableUploads, nil
}

func (uploads *Uploads) ResetInProgress(ctx context.Context, destType string) error {
_, err := uploads.db.ExecContext(ctx, `
UPDATE
`+uploadsTableName+`
SET
in_progress = FALSE
WHERE
destination_type = $1 AND
in_progress = TRUE;
`,
destType,
)
if err != nil {
return fmt.Errorf("reset in progress: %w", err)
}
return nil
}

func (uploads *Uploads) LastCreatedAt(ctx context.Context, sourceID, destinationID string) (time.Time, error) {
row := uploads.db.QueryRowContext(ctx, `
SELECT
created_at
FROM
`+uploadsTableName+`
WHERE
source_id = $1 AND
destination_id = $2
ORDER BY
id DESC
LIMIT 1;
`,
sourceID,
destinationID,
)

var createdAt sql.NullTime

err := row.Scan(&createdAt)
if err != nil && err != sql.ErrNoRows {
return time.Time{}, fmt.Errorf("last created at: %w", err)
}
if err == sql.ErrNoRows || !createdAt.Valid {
return time.Time{}, nil
}

return createdAt.Time, nil
}
122 changes: 122 additions & 0 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,3 +1000,125 @@ func TestUploads_PendingTableUploads(t *testing.T) {
require.EqualError(t, err, "pending table uploads: context canceled")
})
}

func TestUploads_ResetInProgress(t *testing.T) {
const (
sourceID = "source_id"
destinationID = "destination_id"
destinationType = "destination_type"
)

db, ctx := setupDB(t), context.Background()

now := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("success", func(t *testing.T) {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))

stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

uploadID, err := repoUpload.CreateWithStagingFiles(ctx, model.Upload{
SourceID: sourceID,
DestinationID: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
}, []*model.StagingFile{
{
ID: stagingID,
SourceID: sourceID,
DestinationID: destinationID,
},
})
require.NoError(t, err)

_, err = db.ExecContext(ctx, `UPDATE wh_uploads SET in_progress = TRUE WHERE id = $1;`, uploadID)
require.NoError(t, err)

uploadsToProcess, err := repoUpload.GetToProcess(ctx, destinationType, 1, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, uploadsToProcess, 0)

err = repoUpload.ResetInProgress(ctx, destinationType)
require.NoError(t, err)

uploadsToProcess, err = repoUpload.GetToProcess(ctx, destinationType, 1, repo.ProcessOptions{})
require.NoError(t, err)
require.Len(t, uploadsToProcess, 1)
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel()

err := repoUpload.ResetInProgress(ctx, destinationType)
require.EqualError(t, err, "reset in progress: context canceled")
})
}

func TestUploads_LastCreatedAt(t *testing.T) {
const (
sourceID = "source_id"
destinationID = "destination_id"
destinationType = "destination_type"
)

db, ctx := setupDB(t), context.Background()

now := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("many uploads", func(t *testing.T) {
for i := 0; i < 5; i++ {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))
stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now.Add(time.Second * time.Duration(i+1))
}))

_, err = repoUpload.CreateWithStagingFiles(ctx, model.Upload{
SourceID: sourceID,
DestinationID: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
}, []*model.StagingFile{
{
ID: stagingID,
SourceID: sourceID,
DestinationID: destinationID,
},
})
require.NoError(t, err)
}

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.NoError(t, err)
require.Equal(t, lastCreatedAt.UTC(), now.Add(time.Second*5).UTC())
})

t.Run("no uploads", func(t *testing.T) {
lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, "unknown_source", "unknown_destination")
require.NoError(t, err)
require.Equal(t, lastCreatedAt, time.Time{})
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel()

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.EqualError(t, err, "last created at: context canceled")
require.Equal(t, lastCreatedAt, time.Time{})
})
}
Loading

0 comments on commit e227046

Please sign in to comment.