Skip to content

Commit

Permalink
add worker ttl (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
devppjr committed Jul 19, 2023
1 parent c2f4615 commit 1c2b436
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
9 changes: 8 additions & 1 deletion remotecv/unique_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pyres.worker import Worker

from remotecv.timing import get_time, get_interval
from remotecv.utils import context, logger
from remotecv.utils import config, context, logger


class UniqueQueue(ResQ):
Expand Down Expand Up @@ -84,3 +84,10 @@ def reserve(self, timeout=10):
get_interval(start_time, get_time()),
)
return job

def register_worker(self):
super().register_worker()
if config.worker_ttl:
self.resq.redis.expire(
f"resque:worker:{str(self)}:started", config.worker_ttl
)
9 changes: 9 additions & 0 deletions remotecv/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ def import_modules():
type=click.INT,
help="Timeout in seconds for image detection",
)
@optgroup.option(
"--worker-ttl",
envvar="WORKER_TTL",
show_envvar=True,
default=None,
type=click.INT,
help="TTL in seconds for worker",
)
@optgroup.option(
"--sentry-url",
envvar="SENTRY_URL",
Expand Down Expand Up @@ -314,6 +322,7 @@ def main(**params):
config.polling_interval = params["polling_interval"]

config.timeout = params["timeout"]
config.worker_ttl = params["worker_ttl"]
config.server_port = params["server_port"]
config.log_level = params["level"].upper()
config.loader = import_module(params["loader"])
Expand Down
20 changes: 18 additions & 2 deletions tests/test_unique_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def test_should_send_metrics(self):

class UniqueWorkerTestCase(TestCase):
def setUp(self):
redis = redis_client()
self.redis = redis_client()
self.unique_queue = UniqueWorker(
queues=["Detect"],
server=redis,
server=self.redis,
timeout=None,
)

Expand All @@ -109,3 +109,19 @@ def test_should_send_queue_metrics(self):
context.metrics.timing.assert_called_once_with(
"worker.read_queue.time", mock.ANY
)

def test_should_create_unique_worker_without_ttl(self):
config.worker_ttl = None
worker = UniqueWorker(server=self.redis, queues=["Detect"])
worker.register_worker()
expect(
self.redis.ttl(f"resque:worker:{str(worker)}:started")
).to_equal(-1)

def test_should_create_unique_worker_with_ttl(self):
config.worker_ttl = 60
worker = UniqueWorker(server=self.redis, queues=["Detect"])
worker.register_worker()
expect(
self.redis.ttl(f"resque:worker:{str(worker)}:started")
).Not.to_equal(-1)

0 comments on commit 1c2b436

Please sign in to comment.