Skip to content

Commit

Permalink
Merge pull request #1 from raymondguo-db/master
Browse files Browse the repository at this point in the history
Main worker should use zadd(xx=True) to update heartbeat. (rq#1550)
  • Loading branch information
rauchy committed Feb 21, 2022
2 parents acceaac + 40707af commit facbc56
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ def set_id(self, value):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value

def heartbeat(self, heartbeat, ttl, pipeline=None):
def heartbeat(self, heartbeat, ttl, pipeline=None, xx=False):
self.last_heartbeat = heartbeat
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
self.started_job_registry.add(self, ttl, pipeline=pipeline)
self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx)

id = property(get_id, set_id)

Expand Down
6 changes: 3 additions & 3 deletions rq/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ def count(self):
self.cleanup()
return self.connection.zcard(self.key)

def add(self, job, ttl=0, pipeline=None):
def add(self, job, ttl=0, pipeline=None, xx=False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1:
score = '+inf'
if pipeline is not None:
return pipeline.zadd(self.key, {job.id: score})
return pipeline.zadd(self.key, {job.id: score}, xx=xx)

return self.connection.zadd(self.key, {job.id: score})
return self.connection.zadd(self.key, {job.id: score}, xx=xx)

def remove(self, job, pipeline=None, delete_job=False):
"""Removes job from registry and deletes it if `delete_job == True`"""
Expand Down
2 changes: 1 addition & 1 deletion rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def monitor_work_horse(self, job, queue):
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline)
job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
pipeline.execute()

except OSError as e:
Expand Down

0 comments on commit facbc56

Please sign in to comment.