Skip to content

Commit

Permalink
fix: handle NULL on upload timings
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Feb 9, 2023
1 parent 169ab96 commit 939779e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 38 deletions.
29 changes: 29 additions & 0 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,35 @@ func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, op
return stats, nil
}

func (uploads *Uploads) UploadTimings(ctx context.Context, uploadID int64) (model.Timings, error) {
var (
rawJSON jsoniter.RawMessage
timings model.Timings
)

err := uploads.db.QueryRowContext(ctx, `
SELECT
COALESCE(timings, '[]')::JSONB
FROM
`+uploadsTableName+`
WHERE
id = $1;
`, uploadID).Scan(&rawJSON)
if err == sql.ErrNoRows {
return timings, model.ErrUploadNotFound
}
if err != nil {
return timings, err
}

err = json.Unmarshal(rawJSON, &timings)
if err != nil {
return timings, err
}

return timings, nil
}

func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error {
_, err := uploads.db.ExecContext(ctx,
`DELETE FROM `+uploadsTableName+` WHERE id = $1 AND status = $2`,
Expand Down
27 changes: 27 additions & 0 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package repo_test

import (
"context"
"encoding/json"
"testing"
"time"

Expand Down Expand Up @@ -112,6 +113,31 @@ func TestUploads_Get(t *testing.T) {
PickupWaitTime: 0,
}, uploadStats)
})
t.Run("UploadTimings", func(t *testing.T) {
timings, err := repoUpload.UploadTimings(ctx, id)
require.NoError(t, err)
require.Equal(t, model.Timings{}, timings)

expected := model.Timings{{
"download": time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
}, {
"upload": time.Date(2021, 1, 1, 0, 0, 1, 0, time.UTC),
}}

r, err := json.Marshal(expected)
require.NoError(t, err)

// TODO: implement and use repo method
_, err = db.Exec("UPDATE wh_uploads SET timings = $1 WHERE id = $2", r, id)
require.NoError(t, err)

timings, err = repoUpload.UploadTimings(ctx, id)
require.NoError(t, err)
require.Equal(t, expected, timings)

_, err = repoUpload.UploadTimings(ctx, -1)
require.Equal(t, err, model.ErrUploadNotFound)
})
}

func TestUploads_Processing(t *testing.T) {
Expand Down Expand Up @@ -301,6 +327,7 @@ func TestUploads_UploadMetadata(t *testing.T) {
Priority: 40,
NextRetryTime: time.Date(2021, 1, 1, 0, 0, 1, 0, time.UTC),
}, metadata)

}

func TestUploads_Delete(t *testing.T) {
Expand Down
33 changes: 1 addition & 32 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,41 +1326,10 @@ func (job *UploadJobT) processLoadTableResponse(errorMap map[string]error) (erro
return errors, tableUploadErr
}

// getUploadTimings returns timings json column
// e.g. timings: [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]
func (job *UploadJobT) getUploadTimings() (model.Timings, error) {
var (
rawJSON json.RawMessage
timings model.Timings
)
sqlStatement := fmt.Sprintf(`
SELECT
timings
FROM
%s
WHERE
id = %d;
`,
warehouseutils.WarehouseUploadsTable,
job.upload.ID,
)
err := job.dbHandle.QueryRow(sqlStatement).Scan(&rawJSON)
if err != nil {
return timings, err
}

err = json.Unmarshal(rawJSON, &timings)
if err != nil {
return timings, err
}

return timings, nil
}

// getNewTimings appends current status with current time to timings column
// e.g. status: exported_data, timings: [{exporting_data: 2020-04-21 15:16:19.687716] -> [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]
func (job *UploadJobT) getNewTimings(status string) ([]byte, model.Timings) {
timings, err := job.getUploadTimings()
timings, err := repo.NewUploads(job.dbHandle).UploadTimings(context.TODO(), job.upload.ID)
if err != nil {
pkgLogger.Error("error getting timing, scrapping them", err)
}
Expand Down
13 changes: 7 additions & 6 deletions warehouse/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,13 @@ var _ = Describe("Upload", Ordered, func() {
It("Get uploads timings", func() {
exportedData, _ := time.ParseDateTime("2020-04-21T15:26:34.344356")
exportingData, _ := time.ParseDateTime("2020-04-21T15:16:19.687716")
Expect(job.getUploadTimings()).To(BeEquivalentTo(model.Timings{
{
"exported_data": exportedData,
"exporting_data": exportingData,
},
}))
Expect(repo.NewUploads(job.dbHandle).UploadTimings(context.TODO(), job.upload.ID)).
To(BeEquivalentTo(model.Timings{
{
"exported_data": exportedData,
"exporting_data": exportingData,
},
}))
})

Describe("Staging files and load files events match", func() {
Expand Down

0 comments on commit 939779e

Please sign in to comment.