Skip to content

Commit

Permalink
feat: include error codes in v2 failed-records response payload (#4116)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Nov 27, 2023
1 parent f97b69c commit e803bf9
Show file tree
Hide file tree
Showing 17 changed files with 1,562 additions and 615 deletions.
20 changes: 10 additions & 10 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,18 +360,14 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error {
component := "gateway"
srvMux := chi.NewRouter()
// rudder-sources new APIs
rsourcesHandler := rsources_http.NewV1Handler(
rsourcesHandlerV1 := rsources_http.NewV1Handler(
gw.rsourcesService,
gw.logger.Child("rsources"),
)
failedKeysHandler := rsources_http.FailedKeysHandler(
rsourcesHandlerV2 := rsources_http.NewV2Handler(
gw.rsourcesService,
gw.logger.Child("rsources_failed_keys"),
)
jobStatusHandler := rsources_http.JobStatusHandler(
gw.rsourcesService,
gw.logger.Child("rsources_job_status"),
)
srvMux.Use(
chiware.StatMiddleware(ctx, stats.Default, component),
middleware.LimitConcurrentRequests(gw.conf.maxConcurrentRequests),
Expand All @@ -383,11 +379,15 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error {
r.Get("/v1/warehouse/fetch-tables", gw.whProxy.ServeHTTP)
r.Post("/v1/audiencelist", gw.webAudienceListHandler())
r.Post("/v1/replay", gw.webReplayHandler())
r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandler.ServeHTTP))
r.Mount("/v2/failed-keys", withContentType("application/json; charset=utf-8", failedKeysHandler.ServeHTTP))
r.Mount("/v2/job-status", withContentType("application/json; charset=utf-8", jobStatusHandler.ServeHTTP))

// TODO: delete this handler once we are ready to remove support for the v1 api
r.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV1.ServeHTTP))

r.Mount("/v2/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV2.ServeHTTP))
})
srvMux.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandler.ServeHTTP))

// TODO: delete this handler once we are ready to remove support for the v1 api
srvMux.Mount("/v1/job-status", withContentType("application/json; charset=utf-8", rsourcesHandlerV1.ServeHTTP))

srvMux.Route("/v1", func(r chi.Router) {
r.Post("/alias", gw.webAliasHandler())
Expand Down
8 changes: 6 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -2573,8 +2574,11 @@ var _ = Describe("Processor", Ordered, func() {

Expect(len(m.failedJobs)).To(Equal(2))
Expect(len(m.failedMetrics)).To(Equal(2))
Expect(m.failedMetrics[0].StatusDetail.StatusCode).To(Equal(400))
Expect(m.failedMetrics[1].StatusDetail.StatusCode).To(Equal(299))
slices.SortFunc(m.failedMetrics, func(a, b *types.PUReportedMetric) int {
return a.StatusDetail.StatusCode - b.StatusDetail.StatusCode
})
Expect(m.failedMetrics[0].StatusDetail.StatusCode).To(Equal(299))
Expect(m.failedMetrics[1].StatusDetail.StatusCode).To(Equal(400))
Expect(int(m.failedCountMap[key])).To(Equal(2))

Expect(len(m.filteredJobs)).To(Equal(1))
Expand Down
128 changes: 115 additions & 13 deletions services/rsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (*sourcesHandler) IncrementStats(ctx context.Context, tx *sql.Tx, jobRunId
return err
}

func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobRunId string, key JobTargetKey, records []json.RawMessage) error {
func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobRunId string, key JobTargetKey, records []FailedRecord) error {
if sh.config.SkipFailedRecordsCollection {
return nil
}
Expand All @@ -126,7 +126,7 @@ func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobR
return fmt.Errorf("scanning rsources_failed_keys_v2 id: %w", err)
}

stmt, err := tx.Prepare(`INSERT INTO rsources_failed_keys_v2_records (id, record_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`)
stmt, err := tx.Prepare(`INSERT INTO rsources_failed_keys_v2_records (id, record_id, code) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING`)
if err != nil {
return err
}
Expand All @@ -136,22 +136,24 @@ func (sh *sourcesHandler) AddFailedRecords(ctx context.Context, tx *sql.Tx, jobR
if _, err = stmt.ExecContext(
ctx,
id,
records[i]); err != nil {
records[i].Record,
records[i].Code,
); err != nil {
return fmt.Errorf("inserting into rsources_failed_keys_v2_records: %w", err)
}
}
return nil
}

func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string, filter JobFilter, paging PagingInfo) (JobFailedRecords, error) {
func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string, filter JobFilter, paging PagingInfo) (JobFailedRecordsV2, error) {
if sh.config.SkipFailedRecordsCollection {
return JobFailedRecords{ID: jobRunId}, ErrOperationNotSupported
return JobFailedRecordsV2{ID: jobRunId}, ErrOperationNotSupported
}
var nextPageToken NextPageToken
var err error
if paging.Size > 0 {
if nextPageToken, err = NextPageTokenFromString(paging.NextPageToken); err != nil {
return JobFailedRecords{ID: jobRunId}, ErrInvalidPaginationToken
return JobFailedRecordsV2{ID: jobRunId}, ErrInvalidPaginationToken
}
}

Expand All @@ -177,7 +179,7 @@ func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string,
return ids, nil
}()
if err != nil {
return JobFailedRecords{ID: jobRunId}, fmt.Errorf("failed to get failed record ids: %w", err)
return JobFailedRecordsV2{ID: jobRunId}, fmt.Errorf("failed to get failed record ids: %w", err)
}

filters := "WHERE r.id = ANY($1)"
Expand All @@ -194,36 +196,132 @@ func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string,
k.source_id,
k.destination_id,
r.id,
r.record_id
r.record_id,
r.code
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
ORDER BY r.id, r.record_id ASC %[2]s`,
filters, limit)

failedRecordsMap := map[JobTargetKey]FailedRecords{}
failedRecordsMap := map[JobTargetKey][]FailedRecord{}
rows, err := sh.readDB().QueryContext(ctx, sqlStatement, params...)
if err != nil {
return JobFailedRecords{ID: jobRunId}, err
return JobFailedRecordsV2{ID: jobRunId}, err
}
defer func() { _ = rows.Close() }()
var queryResultSize int
for rows.Next() {
var key JobTargetKey
var code int
err := rows.Scan(
&key.TaskRunID,
&key.SourceID,
&key.DestinationID,
&nextPageToken.ID,
&nextPageToken.RecordID,
&code,
)
if err != nil {
return JobFailedRecords{ID: jobRunId}, err
return JobFailedRecordsV2{ID: jobRunId}, err
}
failedRecordsMap[key] = append(failedRecordsMap[key], FailedRecord{Record: json.RawMessage(nextPageToken.RecordID), Code: code})
queryResultSize++
}
if err := rows.Err(); err != nil {
return JobFailedRecordsV2{ID: jobRunId}, err
}

res := failedRecordsFromQueryResult(jobRunId, failedRecordsMap)
if limit != "" && queryResultSize == paging.Size {
res.Paging = &PagingInfo{
Size: paging.Size,
NextPageToken: nextPageToken.String(),
}
}
return JobFailedRecordsV2(res), nil
}

func (sh *sourcesHandler) GetFailedRecordsV1(ctx context.Context, jobRunId string, filter JobFilter, paging PagingInfo) (JobFailedRecordsV1, error) {
if sh.config.SkipFailedRecordsCollection {
return JobFailedRecordsV1{ID: jobRunId}, ErrOperationNotSupported
}
var nextPageToken NextPageToken
var err error
if paging.Size > 0 {
if nextPageToken, err = NextPageTokenFromString(paging.NextPageToken); err != nil {
return JobFailedRecordsV1{ID: jobRunId}, ErrInvalidPaginationToken
}
}

// first find the list of ids (postgres query planner uses an inefficient plan if there is one id with millions of records and a few ids with a few records)
ids, err := func() ([]string, error) {
var ids []string
filters, params := sqlFilters(jobRunId, filter)
rows, err := sh.readDB().QueryContext(ctx, `SELECT id FROM "rsources_failed_keys_v2" `+filters, params...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return ids, nil
}()
if err != nil {
return JobFailedRecordsV1{ID: jobRunId}, fmt.Errorf("failed to get failed record ids: %w", err)
}

filters := "WHERE r.id = ANY($1)"
params := []interface{}{pq.Array(ids)}
var limit string
if paging.Size > 0 {
filters = filters + fmt.Sprintf(` AND r.id >= $%[1]d AND r.record_id > $%[2]d`, len(params)+1, len(params)+2)
params = append(params, nextPageToken.ID, nextPageToken.RecordID)
limit = fmt.Sprintf(`LIMIT %d`, paging.Size)
}
sqlStatement := fmt.Sprintf(
`SELECT
k.task_run_id,
k.source_id,
k.destination_id,
r.id,
r.record_id
FROM "rsources_failed_keys_v2_records" r
JOIN "rsources_failed_keys_v2" k ON r.id = k.id %[1]s
ORDER BY r.id, r.record_id ASC %[2]s`,
filters, limit)

failedRecordsMap := map[JobTargetKey][]json.RawMessage{}
rows, err := sh.readDB().QueryContext(ctx, sqlStatement, params...)
if err != nil {
return JobFailedRecordsV1{ID: jobRunId}, err
}
defer func() { _ = rows.Close() }()
var queryResultSize int
for rows.Next() {
var key JobTargetKey
err := rows.Scan(
&key.TaskRunID,
&key.SourceID,
&key.DestinationID,
&nextPageToken.ID,
&nextPageToken.RecordID,
)
if err != nil {
return JobFailedRecordsV1{ID: jobRunId}, err
}
failedRecordsMap[key] = append(failedRecordsMap[key], json.RawMessage(nextPageToken.RecordID))
queryResultSize++
}
if err := rows.Err(); err != nil {
return JobFailedRecords{ID: jobRunId}, err
return JobFailedRecordsV1{ID: jobRunId}, err
}

res := failedRecordsFromQueryResult(jobRunId, failedRecordsMap)
Expand All @@ -233,7 +331,7 @@ func (sh *sourcesHandler) GetFailedRecords(ctx context.Context, jobRunId string,
NextPageToken: nextPageToken.String(),
}
}
return res, nil
return JobFailedRecordsV1(res), nil
}

func (sh *sourcesHandler) Delete(ctx context.Context, jobRunId string, filter JobFilter) error {
Expand Down Expand Up @@ -562,6 +660,10 @@ func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string,
return err
}
}
if _, err := db.ExecContext(ctx, `alter table rsources_failed_keys_v2_records add column if not exists code numeric(4) not null default 0`); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit e803bf9

Please sign in to comment.