From 6381c3b66f0a960e17ba9585ce0d8a26ae8af71c Mon Sep 17 00:00:00 2001 From: willgraf <7930703+willgraf@users.noreply.github.com> Date: Thu, 4 Jun 2020 14:18:42 -0700 Subject: [PATCH] fail invalid hashes to remove them from the job queue. (#111) --- redis_consumer/consumers/base_consumer.py | 9 +++++++-- redis_consumer/consumers/base_consumer_test.py | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/redis_consumer/consumers/base_consumer.py b/redis_consumer/consumers/base_consumer.py index 54c370cf..c5ceca95 100644 --- a/redis_consumer/consumers/base_consumer.py +++ b/redis_consumer/consumers/base_consumer.py @@ -116,8 +116,13 @@ def get_redis_hash(self): self.logger.warning('Found invalid hash in %s: `%s` with ' 'hvals: %s', self.queue, redis_hash, self.redis.hgetall(redis_hash)) - # self.redis.lrem(self.processing_queue, 1, redis_hash) - self._put_back_hash(redis_hash) + # self._put_back_hash(redis_hash) + self.redis.lrem(self.processing_queue, 1, redis_hash) + # Update redis with failed status + self.update_key(redis_hash, { + 'status': self.failed_status, + 'reason': 'Invalid filetype for "%s" job.'.format(self.queue), + }) def _handle_error(self, err, redis_hash): """Update redis with failure information, and log errors. diff --git a/redis_consumer/consumers/base_consumer_test.py b/redis_consumer/consumers/base_consumer_test.py index b3be6527..412fd791 100644 --- a/redis_consumer/consumers/base_consumer_test.py +++ b/redis_consumer/consumers/base_consumer_test.py @@ -194,8 +194,9 @@ def test_get_redis_hash(self): rhash = consumer.get_redis_hash() assert rhash == items[0] - assert redis_client.work_queue == items[1:] - assert redis_client.processing_queue == items[0:1] + assert not redis_client.work_queue + assert len(redis_client.processing_queue) + assert redis_client.processing_queue == [rhash] def test_purge_processing_queue(self): queue_name = 'q'