Skip to content

Commit

Permalink
Return JobRecordError for bulk v1 & v2 job results
Browse files Browse the repository at this point in the history
This allows us to generalize error handling when retrieving bulk job
results.
  • Loading branch information
nyergler committed Jan 30, 2024
1 parent 7eb8d67 commit 0a6cc4c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
20 changes: 20 additions & 0 deletions bulk/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package bulk

var _ error = (*JobRecordError)(nil)

func NewJobRecordError(e error) JobRecordError {
return JobRecordError{e: e}
}

// JobRecordError wraps errors returned when retrieving records for a bulk job.
type JobRecordError struct {
e error
}

func (e JobRecordError) Error() string {
return e.e.Error()
}

func (e JobRecordError) Unwrap() error {
return e.e
}
8 changes: 4 additions & 4 deletions bulk/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,12 +612,12 @@ func (j *Job) FailedRecords(ctx context.Context) ([]FailedRecord, error) {

response, err := j.session.Client().Do(request)
if err != nil {
return nil, err
return nil, NewJobRecordError(err)
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
return nil, sfdc.HandleError(response)
return nil, NewJobRecordError(sfdc.HandleError(response))
}

reader := csv.NewReader(response.Body)
Expand All @@ -627,15 +627,15 @@ func (j *Job) FailedRecords(ctx context.Context) ([]FailedRecord, error) {
var records []FailedRecord
fields, err := reader.Read()
if err != nil {
return nil, err
return nil, NewJobRecordError(err)
}
for {
values, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return records, err
return records, NewJobRecordError(err)
}
var record FailedRecord
record.Error = values[j.headerPosition(sfError, fields)]
Expand Down
13 changes: 7 additions & 6 deletions bulk/v1/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"

Expand Down Expand Up @@ -198,28 +197,30 @@ func (b *Batch) Results(ctx context.Context) (BatchResult, error) {

response, err := b.session.Client().Do(request)
if err != nil {
return result, err
return result, bulk.NewJobRecordError(err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return result, sfdc.HandleError(response)
}

body, err := ioutil.ReadAll(response.Body)
body, err := io.ReadAll(response.Body)
if err != nil {
return result, err
return result, bulk.NewJobRecordError(err)
}

records := []ResultRecord{}

err = json.Unmarshal(body, &records)
if err != nil {
return result, err
return result, bulk.NewJobRecordError(err)
}
var requestRecords []map[string]interface{}
requestRecords, err = b.requestRecords(ctx)
if err != nil {
return result, fmt.Errorf("error retrieving request: %w", err)
return result, bulk.NewJobRecordError(
fmt.Errorf("error retrieving request: %w", err),
)
}
for i, record := range records {
fields := map[string]interface{}{}
Expand Down
2 changes: 0 additions & 2 deletions bulk/v1/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ func (b *JobBatches) fetchInfo(ctx context.Context) (err error) {
if err != nil {
return err
}
request.Header.Add("Accept", "application/json")
request.Header.Add("Content-Type", "application/json")
b.session.AuthorizationHeader(request)

b.info, err = b.infoResponse(request)
Expand Down

0 comments on commit 0a6cc4c

Please sign in to comment.