Skip to content

Commit

Permalink
chore: some more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed May 23, 2023
1 parent 585235a commit 1dd32b2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
4 changes: 2 additions & 2 deletions warehouse/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (cl *Client) sqlQuery(statement string) (result warehouseutils.QueryResult,

func (cl *Client) bqQuery(statement string) (result warehouseutils.QueryResult, err error) {
query := cl.BQ.Query(statement)
context := context.Background()
it, err := query.Read(context)
ctx := context.Background()
it, err := query.Read(ctx)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (idr *Identity) createTempGzFile(dirName string) (gzWriter misc.GZipWriter,
}

func (idr *Identity) processMergeRules(fileNames []string) (err error) {
txn, err := idr.db.Begin()
txn, err := idr.db.BeginTx(idr.ctx, &sql.TxOptions{})
if err != nil {
panic(err)
}
Expand Down
22 changes: 11 additions & 11 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,13 @@ func (as *AzureSynapse) loadTable(ctx context.Context, tableName string, tableSc
break
}
as.Logger.Errorf("AZ: Error while reading csv file %s for loading in staging table:%s: %v", objectFileName, stagingTableName, err)
_ = txn.Rollback()
txn.Rollback()
return
}
if len(sortedColumnKeys) != len(record) {
err = fmt.Errorf(`load file CSV columns for a row mismatch number found in upload schema. Columns in CSV row: %d, Columns in upload schema of table-%s: %d. Processed rows in csv file until mismatch: %d`, len(record), tableName, len(sortedColumnKeys), csvRowsProcessedCount)
as.Logger.Error(err)
_ = txn.Rollback()
txn.Rollback()
return
}
var recordInterface []interface{}
Expand Down Expand Up @@ -352,18 +352,18 @@ func (as *AzureSynapse) loadTable(ctx context.Context, tableName string, tableSc
_, err = stmt.ExecContext(ctx, finalColumnValues...)
if err != nil {
as.Logger.Errorf("AZ: Error in exec statement for loading in staging table:%s: %v", stagingTableName, err)
_ = txn.Rollback()
txn.Rollback()
return
}
csvRowsProcessedCount++
}
_ = gzipReader.Close()
gzipReader.Close()
gzipFile.Close()
}

_, err = stmt.ExecContext(ctx)
if err != nil {
_ = txn.Rollback()
txn.Rollback()
as.Logger.Errorf("AZ: Rollback transaction as there was error while loading staging table:%s: %v", stagingTableName, err)
return

Expand All @@ -386,7 +386,7 @@ func (as *AzureSynapse) loadTable(ctx context.Context, tableName string, tableSc
_, err = txn.ExecContext(ctx, sqlStatement)
if err != nil {
as.Logger.Errorf("AZ: Error deleting from original table for dedup: %v\n", err)
_ = txn.Rollback()
txn.Rollback()
return
}

Expand All @@ -397,7 +397,7 @@ func (as *AzureSynapse) loadTable(ctx context.Context, tableName string, tableSc

if err != nil {
as.Logger.Errorf("AZ: Error inserting into original table: %v\n", err)
_ = txn.Rollback()
txn.Rollback()
return
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string
_, err = tx.ExecContext(ctx, sqlStatement)
if err != nil {
as.Logger.Errorf("AZ: Error deleting from original table for dedup: %v\n", err)
_ = tx.Rollback()
tx.Rollback()
errorMap[warehouseutils.UsersTable] = err
return
}
Expand All @@ -532,15 +532,15 @@ func (as *AzureSynapse) loadUserTables(ctx context.Context) (errorMap map[string

if err != nil {
as.Logger.Errorf("AZ: Error inserting into users table from staging table: %v\n", err)
_ = tx.Rollback()
tx.Rollback()
errorMap[warehouseutils.UsersTable] = err
return
}

err = tx.Commit()
if err != nil {
as.Logger.Errorf("AZ: Error in transaction commit for users table: %v\n", err)
_ = tx.Rollback()
tx.Rollback()
errorMap[warehouseutils.UsersTable] = err
return
}
Expand Down Expand Up @@ -780,7 +780,7 @@ func (as *AzureSynapse) Cleanup(ctx context.Context) {
if as.DB != nil {
// extra check aside dropStagingTable(table)
as.dropDanglingStagingTables(ctx)
_ = as.DB.Close()
as.DB.Close()
}
}

Expand Down

0 comments on commit 1dd32b2

Please sign in to comment.