Skip to content

Commit

Permalink
Merge pull request #74 from nyaruka/report_failed_rollups
Browse files Browse the repository at this point in the history
Include rollups in monthlies failed metric
  • Loading branch information
rowanseymour committed Jul 8, 2022
2 parents 26c2000 + 3f4b674 commit 9add721
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
43 changes: 13 additions & 30 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,19 +770,21 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s
}

// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType})
created := make([]*Archive, 0, 1)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, err
return nil, nil, err
}

created := make([]*Archive, 0, len(archives))
failed := make([]*Archive, 0, 1)

// build them from rollups
for _, archive := range archives {
log := log.WithFields(logrus.Fields{"start_date": archive.StartDate})
Expand All @@ -791,20 +793,23 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error building monthly archive")
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
log.WithError(err).Error("error writing archive to s3")
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
log.WithError(err).Error("error writing record to db")
failed = append(failed, archive)
continue
}

Expand All @@ -820,7 +825,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
created = append(created, archive)
}

return created, nil
return created, failed, nil
}

const setArchiveDeleted = `
Expand All @@ -829,21 +834,6 @@ SET needs_deletion = FALSE, deleted_on = $2
WHERE id = $1
`

// helper method to safely execute an IN query in the passed in transaction
func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error {
q, vs, err := sqlx.In(query, ids)
if err != nil {
return err
}
q = tx.Rebind(q)

_, err = tx.ExecContext(ctx, q, vs...)
if err != nil {
tx.Rollback()
}
return err
}

var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
Expand Down Expand Up @@ -909,12 +899,14 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
}

monthliesRolledUp, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives")
}

monthliesCreated = append(monthliesCreated, monthliesRolledUp...)
monthliesCreated = append(monthliesCreated, rollupsCreated...)
monthliesFailed = append(monthliesFailed, rollupsFailed...)
monthliesFailed = removeDuplicates(monthliesFailed) // don't double report monthlies that fail being built from db and rolled up from dailies

// finally delete any archives not yet actually archived
var deleted []*Archive
Expand Down Expand Up @@ -997,12 +989,3 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {

return nil
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}
47 changes: 47 additions & 0 deletions archives/utils.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
package archives

import (
"context"
"fmt"
"time"

"github.com/jmoiron/sqlx"
)

// helper method to safely execute an IN query in the passed in transaction
func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) error {
q, vs, err := sqlx.In(query, ids)
if err != nil {
return err
}
q = tx.Rebind(q)

_, err = tx.ExecContext(ctx, q, vs...)
if err != nil {
tx.Rollback()
}
return err
}

// counts the records in the given archives
func countRecords(as []*Archive) int {
n := 0
for _, a := range as {
n += a.RecordCount
}
return n
}

// removes duplicates from a slice of archives
func removeDuplicates(as []*Archive) []*Archive {
unique := make([]*Archive, 0, len(as))
seen := make(map[string]bool)

for _, a := range as {
key := fmt.Sprintf("%s:%s:%s", a.ArchiveType, a.Period, a.StartDate.Format(time.RFC3339))
if !seen[key] {
unique = append(unique, a)
seen[key] = true
}
}
return unique
}

// chunks a slice of in64 IDs
func chunkIDs(ids []int64, size int) [][]int64 {
chunks := make([][]int64, 0, len(ids)/size+1)
Expand Down

0 comments on commit 9add721

Please sign in to comment.