From ea78ee5361091b924a38af2adf4ace4c7140069d Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Mon, 4 Mar 2024 12:43:25 +0530 Subject: [PATCH] fix: enable archiver for use rudder storage for dtaging file entries (#4433) --- warehouse/archive/archiver.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/warehouse/archive/archiver.go b/warehouse/archive/archiver.go index 21e887a1fe..77420ceb57 100644 --- a/warehouse/archive/archiver.go +++ b/warehouse/archive/archiver.go @@ -276,8 +276,6 @@ func (a *Archiver) Do(ctx context.Context) error { continue } - hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetdata) - // archive staging files stagingFileIDs, err := a.getStagingFilesData(ctx, txn, u) if err != nil { @@ -288,20 +286,18 @@ func (a *Archiver) Do(ctx context.Context) error { var storedStagingFilesLocation string if len(stagingFileIDs) > 0 { - if !hasUsedRudderStorage { - filterSQL := fmt.Sprintf(`id IN (%v)`, misc.IntArrayToString(stagingFileIDs, ",")) - storedStagingFilesLocation, err = a.backupRecords(ctx, backupRecordsArgs{ - tableName: warehouseutils.WarehouseStagingFilesTable, - sourceID: u.sourceID, - destID: u.destID, - tableFilterSQL: filterSQL, - uploadID: u.uploadID, - }) - if err != nil { - a.log.Errorf(`[Archiver]: Error backing up staging files for upload: %d: %v`, u.uploadID, err) - _ = txn.Rollback() - continue - } + filterSQL := fmt.Sprintf(`id IN (%v)`, misc.IntArrayToString(stagingFileIDs, ",")) + storedStagingFilesLocation, err = a.backupRecords(ctx, backupRecordsArgs{ + tableName: warehouseutils.WarehouseStagingFilesTable, + sourceID: u.sourceID, + destID: u.destID, + tableFilterSQL: filterSQL, + uploadID: u.uploadID, + }) + if err != nil { + a.log.Errorf(`[Archiver]: Error backing up staging files for upload: %d: %v`, u.uploadID, err) + _ = txn.Rollback() + continue } // delete staging file records @@ -317,6 +313,8 @@ func (a *Archiver) Do(ctx context.Context) error { continue } + hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetdata) + // delete load file records if err := a.deleteLoadFileRecords(ctx, txn, stagingFileIDs, hasUsedRudderStorage); err != nil { a.log.Errorf("[Archiver]: Error while deleting load file records for upload %d: %v", u.uploadID, err)