Skip to content

Commit

Permalink
chore: added scheduling tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 8, 2023
1 parent c8ea735 commit 0fbd947
Show file tree
Hide file tree
Showing 11 changed files with 824 additions and 285 deletions.
33 changes: 33 additions & 0 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,36 @@ func (uploads *Uploads) ResetInProgress(ctx context.Context, destType string) er
}
return nil
}

func (uploads *Uploads) LastCreatedAt(ctx context.Context, sourceID, destinationID string) (time.Time, error) {
row := uploads.db.QueryRowContext(ctx, `
SELECT
created_at
FROM
`+uploadsTableName+`
WHERE
source_id = $1 AND
destination_id = $2
ORDER BY
id DESC
LIMIT 1;
`,
sourceID,
destinationID,
)

var createdAt sql.NullTime

err := row.Scan(&createdAt)
if err == sql.ErrNoRows {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, fmt.Errorf("last created at: %w", err)
}
if !createdAt.Valid {
return time.Time{}, nil
}

return createdAt.Time, nil
}
69 changes: 66 additions & 3 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,11 +1014,12 @@ func TestUploads_ResetInProgress(t *testing.T) {
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("success", func(t *testing.T) {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))

stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

Expand Down Expand Up @@ -1059,3 +1060,65 @@ func TestUploads_ResetInProgress(t *testing.T) {
require.EqualError(t, err, "reset in progress: context canceled")
})
}

func TestUploads_LastCreatedAt(t *testing.T) {
const (
sourceID = "source_id"
destinationID = "destination_id"
destinationType = "destination_type"
)

db, ctx := setupDB(t), context.Background()

now := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
}))

t.Run("many uploads", func(t *testing.T) {
for i := 0; i < 5; i++ {
repoStaging := repo.NewStagingFiles(db, repo.WithNow(func() time.Time {
return now
}))
stagingID, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
require.NoError(t, err)

repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now.Add(time.Second * time.Duration(i+1))
}))

_, err = repoUpload.CreateWithStagingFiles(ctx, model.Upload{
SourceID: sourceID,
DestinationID: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
}, []*model.StagingFile{
{
ID: stagingID,
SourceID: sourceID,
DestinationID: destinationID,
},
})
require.NoError(t, err)
}

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.NoError(t, err)
require.Equal(t, lastCreatedAt.UTC(), now.Add(time.Second*5).UTC())
})

t.Run("no uploads", func(t *testing.T) {
lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, "unknown_source", "unknown_destination")
require.NoError(t, err)
require.Equal(t, lastCreatedAt, time.Time{})
})

t.Run("context cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel()

lastCreatedAt, err := repoUpload.LastCreatedAt(ctx, sourceID, destinationID)
require.EqualError(t, err, "last created at: context canceled")
require.Equal(t, lastCreatedAt, time.Time{})
})
}
Loading

0 comments on commit 0fbd947

Please sign in to comment.