Skip to content

Commit

Permalink
chore: use upload_id for staging files (#3066)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Mar 7, 2023
1 parent 4cb5907 commit 3ec2433
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 343 deletions.
12 changes: 6 additions & 6 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUplo
return
}

func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
func (uploadReq *UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
err := uploadReq.validateReq()
if err != nil {
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), err.Error())
Expand Down Expand Up @@ -364,7 +364,7 @@ func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
return &upload, nil
}

func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error) {
func (uploadReq *UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error) {
err = uploadReq.validateReq()
defer func() {
if err != nil {
Expand Down Expand Up @@ -491,7 +491,7 @@ func (tableUploadReq TableUploadReqT) validateReq() error {
return nil
}

func (uploadReq UploadReqT) generateQuery(selectedFields string) string {
func (uploadReq *UploadReqT) generateQuery(selectedFields string) string {
return fmt.Sprintf(`
SELECT
%s
Expand All @@ -506,7 +506,7 @@ func (uploadReq UploadReqT) generateQuery(selectedFields string) string {
)
}

func (uploadReq UploadReqT) validateReq() error {
func (uploadReq *UploadReqT) validateReq() error {
if !uploadReq.API.enabled || uploadReq.API.log == nil || uploadReq.API.dbHandle == nil {
return errors.New("warehouse api are not initialized")
}
Expand All @@ -516,7 +516,7 @@ func (uploadReq UploadReqT) validateReq() error {
return nil
}

func (uploadReq UploadReqT) authorizeSource(sourceID string) bool {
func (uploadReq *UploadReqT) authorizeSource(sourceID string) bool {
var authorizedSourceIDs []string
var ok bool
sourceIDsByWorkspaceLock.RLock()
Expand All @@ -529,7 +529,7 @@ func (uploadReq UploadReqT) authorizeSource(sourceID string) bool {
return misc.Contains(authorizedSourceIDs, sourceID)
}

func (uploadsReq UploadsReqT) authorizedSources() (sourceIDs []string) {
func (uploadsReq *UploadsReqT) authorizedSources() (sourceIDs []string) {
sourceIDsByWorkspaceLock.RLock()
defer sourceIDsByWorkspaceLock.RUnlock()
var ok bool
Expand Down
14 changes: 0 additions & 14 deletions warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,6 @@ func (wh *HandleT) getPendingPopulateIdentitiesLoad(warehouse warehouseutils.War
}
found = true
upload.UploadSchema = warehouseutils.JSONSchemaToMap(schema)

// TODO: remove this once the migration is complete
if upload.WorkspaceID == "" {
var ok bool
wh.workspaceBySourceIDsLock.RLock()
upload.WorkspaceID, ok = wh.workspaceBySourceIDs[upload.SourceID]
wh.workspaceBySourceIDsLock.RUnlock()

if !ok {
pkgLogger.Warnf("Workspace not found for source id: %q", upload.SourceID)
}

}

return
}

Expand Down
76 changes: 15 additions & 61 deletions warehouse/internal/repo/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,10 @@ func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.Staging
}

// praseRow is a helper for mapping a row of tableColumns to a model.StagingFile.
func (*StagingFiles) parseRows(rows *sql.Rows) ([]model.StagingFile, error) {
var stagingFiles []model.StagingFile
func (*StagingFiles) parseRows(rows *sql.Rows) ([]*model.StagingFile, error) {
var stagingFiles []*model.StagingFile

defer func() { _ = rows.Close() }()

for rows.Next() {
var (
stagingFile model.StagingFile
Expand Down Expand Up @@ -222,7 +221,7 @@ func (*StagingFiles) parseRows(rows *sql.Rows) ([]model.StagingFile, error) {
}

m.SetStagingFile(&stagingFile)
stagingFiles = append(stagingFiles, stagingFile)
stagingFiles = append(stagingFiles, &stagingFile)
}

if err := rows.Err(); err != nil {
Expand All @@ -249,7 +248,7 @@ func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingF
return model.StagingFile{}, fmt.Errorf("no staging file found with id: %d", ID)
}

return entries[0], err
return *entries[0], err
}

// GetSchemaByID returns staging file schema field the given ID.
Expand All @@ -270,59 +269,37 @@ func (repo *StagingFiles) GetSchemaByID(ctx context.Context, ID int64) (jsonstd.
return schema, err
}

// GetInRange returns staging files in [startID, endID] range inclusive.
func (repo *StagingFiles) GetInRange(ctx context.Context, sourceID, destinationID string, startID, endID int64) ([]model.StagingFile, error) {
// GetForUploadID returns all the staging files for that uploadID
func (repo *StagingFiles) GetForUploadID(ctx context.Context, sourceID, destinationID string, uploadId int64) ([]*model.StagingFile, error) {
query := `SELECT ` + stagingTableColumns + ` FROM ` + stagingTableName + ` ST
WHERE
id >= $1 AND id <= $2
AND source_id = $3
AND destination_id = $4
ORDER BY
id ASC;`

rows, err := repo.db.QueryContext(ctx, query, startID, endID, sourceID, destinationID)
if err != nil {
return nil, fmt.Errorf("querying staging files: %w", err)
}

return repo.parseRows(rows)
}

func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]model.StagingFile, error) {
return repo.GetInRange(ctx, upload.SourceID, upload.DestinationID, upload.StagingFileStartID, upload.StagingFileEndID)
}

// GetAfterID returns staging files in (startID, +Inf) range.
func (repo *StagingFiles) GetAfterID(ctx context.Context, sourceID, destinationID string, startID int64) ([]model.StagingFile, error) {
query := `SELECT ` + stagingTableColumns + ` FROM ` + stagingTableName + `
WHERE
id > $1
upload_id = $1
AND source_id = $2
AND destination_id = $3
ORDER BY
id ASC;`

rows, err := repo.db.QueryContext(ctx, query, startID, sourceID, destinationID)
rows, err := repo.db.QueryContext(ctx, query, uploadId, sourceID, destinationID)
if err != nil {
return nil, fmt.Errorf("querying staging files: %w", err)
}

return repo.parseRows(rows)
}

func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]model.StagingFile, error) {
func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]*model.StagingFile, error) {
return repo.GetForUploadID(ctx, upload.SourceID, upload.DestinationID, upload.ID)
}

func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error) {
var (
uploadID int64
lastStartStagingFileID int64
lastEndStagingFileID int64
useUploadID sql.NullBool
)
err := repo.db.QueryRowContext(ctx, `
SELECT
id,
start_staging_file_id,
end_staging_file_id,
metadata->>'use_upload_id' AS use_upload_id
start_staging_file_id
FROM
`+uploadsTableName+`
WHERE
Expand All @@ -334,33 +311,10 @@ func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID s
).Scan(
&uploadID,
&lastStartStagingFileID,
&lastEndStagingFileID,
&useUploadID,
)

if err == sql.ErrNoRows {
lastEndStagingFileID = 0
} else if err != nil {
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("querying uploads: %w", err)
}

// Legacy path:
// staging files are not associated with uploads,
// so we need to get them by range.
if !useUploadID.Bool {
stagingFilesList, err := repo.GetAfterID(
ctx,
sourceID,
destinationID,
lastEndStagingFileID,
)
if err != nil {
return nil, err
}

return stagingFilesList, nil
}

// lastStartStagingFileID is used as an optimization to avoid scanning the whole table.
query := `SELECT ` + stagingTableColumns + ` FROM ` + stagingTableName + `
WHERE
Expand Down
Loading

0 comments on commit 3ec2433

Please sign in to comment.