Skip to content

Commit

Permalink
Merge pull request #8 from cycloidio/fg-new-flags
Browse files Browse the repository at this point in the history
Added: '-force-prune' to remove workers not on the heartbeat list
  • Loading branch information
xescugc committed Jul 30, 2021
2 parents 46e3911 + 92b4071 commit df57256
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 4 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

- Flag `-max-age-retries` to remove retried failed jobs after that duration
([PR #9](https://github.com/cycloidio/goworker/pull/9))
- New flag `-force-prune` to remove workers not on the heartbeat list
([PR #8](https://github.com/cycloidio/goworker/pull/8))

## [0.1.8] _2021-06-11_

### Added

- Closed function to be able to wait for the workers to fully finish
([PR #6](https://github.com/cycloidio/goworker/issues/6))
([PR #6](https://github.com/cycloidio/goworker/pull/6))

## [0.1.7] _2021-06-09_

Expand Down
9 changes: 9 additions & 0 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@
// specified max age/duration (time.Duration).
// By default is disabled if enabled it'll
// check every 1m for old retries.
// -force-prune=false
// — Will prune all workers that are not inside
// of the heartbeat set, not just the expired ones.
// This option is not compatible with older
// versions of Resque (any port) as older versions
// may not have heartbeat so this would delete
// real working workers.
//
// You can also configure your own flags for use
// within your workers. Be sure to set them
Expand Down Expand Up @@ -137,6 +144,8 @@ func init() {
flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation")

flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them")

flag.BoolVar(&workerSettings.ForcePrune, "force-prune", false, "Forced the deletion of workers that are not present on the heartbeat set. WARNING: This is not compatible with older versions of Resque (any port) that do not have heartbeat as it'll then delete working workers.")
}

func flags() error {
Expand Down
11 changes: 11 additions & 0 deletions goworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type WorkerSettings struct {
SkipTLSVerify bool
TLSCertPath string
MaxAgeRetries time.Duration
ForcePrune bool

closed chan struct{}
}
Expand Down Expand Up @@ -174,15 +175,25 @@ func Work() error {
}

var monitor sync.WaitGroup
var wk *worker

for id := 0; id < workerSettings.Concurrency; id++ {
worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues)
if err != nil {
return err
}
if wk != nil {
wk = worker
}
worker.work(jobs, &monitor)
}

// Once all the workers have started we prune the dead ones
// this way we prevent from pruning workers that have just
// started and not registered to the Heartbeat in case
// of ForcePrune is enabled.
wk.pruneDeadWorkers(client)

if hasToCleanRetries() {
cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval)
waitChan := make(chan struct{})
Expand Down
1 change: 1 addition & 0 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (p *process) open(c *redis.Client) error {
return err
}

// We set the heartbeat as the first thing
err = c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String(), time.Now().Format(time.RFC3339)).Err()
if err != nil {
return err
Expand Down
10 changes: 7 additions & 3 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) {

// If a worker is on the expired list kill it
for _, w := range workers {
if _, ok := hearbeatExpiredWorkers[w]; ok {
_, hbeok := hearbeatExpiredWorkers[w]
_, hbok := heartbeatWorkers[w]
// We want to prune workers that:
// * Are expired
// * Are not on the heartbeat set and ForcePrune is set
// If they are neither of those then we do not want to expire them
if hbeok || (!hbok && workerSettings.ForcePrune) {
logger.Infof("Pruning dead worker %q", w)

parts := strings.Split(w, ":")
Expand Down Expand Up @@ -239,8 +245,6 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) {
w.startHeartbeat(client)
defer w.heartbeatTicker.Stop()

w.pruneDeadWorkers(client)

monitor.Add(1)

go func() {
Expand Down

0 comments on commit df57256

Please sign in to comment.