Skip to content

Commit

Permalink
chore: cleanup *jobsdb.Handle.checkIfFullDSInTx(...) (#4634)
Browse files Browse the repository at this point in the history
Co-authored-by: Akash Chetty <achetty.iitr@gmail.com>
  • Loading branch information
Sidddddarth and achettyiitr committed May 3, 2024
1 parent c6a28bb commit c64dd3a
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,21 +1190,21 @@ func (jd *Handle) doRefreshDSRangeList(l lock.LockToken) error {
return nil
}

func (jd *Handle) getTableRowCount(jobTable string) int {
func (jd *Handle) getTableRowCount(tx *Tx, jobTable string) int {
var count int

sqlStatement := fmt.Sprintf(`SELECT COUNT(*) from %q`, jobTable)
row := jd.dbHandle.QueryRow(sqlStatement)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&count)
jd.assertError(err)
return count
}

func (jd *Handle) getTableSize(jobTable string) int64 {
func (jd *Handle) getTableSize(tx *Tx, jobTable string) int64 {
var tableSize int64

sqlStatement := fmt.Sprintf(`SELECT PG_TOTAL_RELATION_SIZE('%s')`, jobTable)
row := jd.dbHandle.QueryRow(sqlStatement)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&tableSize)
jd.assertError(err)
return tableSize
Expand All @@ -1213,26 +1213,28 @@ func (jd *Handle) getTableSize(jobTable string) int64 {
func (jd *Handle) checkIfFullDSInTx(tx *Tx, ds dataSetT) (bool, error) {
if jd.conf.maxDSRetentionPeriod.Load() > 0 {
var minJobCreatedAt sql.NullTime
sqlStatement := fmt.Sprintf(`SELECT MIN(created_at) FROM %q`, ds.JobTable)
sqlStatement := fmt.Sprintf(`SELECT created_at FROM %q ORDER BY job_id ASC LIMIT 1`, ds.JobTable)
row := tx.QueryRow(sqlStatement)
err := row.Scan(&minJobCreatedAt)
if err != nil && err != sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
} else if err != nil {
return false, err
}
if err == nil && minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod.Load() {
if minJobCreatedAt.Valid && time.Since(minJobCreatedAt.Time) > jd.conf.maxDSRetentionPeriod.Load() {
return true, nil
}
}

tableSize := jd.getTableSize(ds.JobTable)
tableSize := jd.getTableSize(tx, ds.JobTable)
totalCount := jd.getTableRowCount(tx, ds.JobTable)
if tableSize > jd.conf.maxTableSize.Load() {
jd.logger.Infof("[JobsDB] %s is full in size. Count: %v, Size: %v", ds.JobTable, jd.getTableRowCount(ds.JobTable), tableSize)
jd.logger.Infof("[JobsDB] %s is full in size. Count: %v, Size: %v", ds.JobTable, totalCount, tableSize)
return true, nil
}

totalCount := jd.getTableRowCount(ds.JobTable)
if totalCount > jd.conf.MaxDSSize.Load() {
jd.logger.Infof("[JobsDB] %s is full by rows. Count: %v, Size: %v", ds.JobTable, totalCount, jd.getTableSize(ds.JobTable))
jd.logger.Infof("[JobsDB] %s is full by rows. Count: %v, Size: %v", ds.JobTable, totalCount, tableSize)
return true, nil
}

Expand Down

0 comments on commit c64dd3a

Please sign in to comment.