diff --git a/redis_consumer/consumers/base_consumer.py b/redis_consumer/consumers/base_consumer.py index 54c370cf..c5ceca95 100644 --- a/redis_consumer/consumers/base_consumer.py +++ b/redis_consumer/consumers/base_consumer.py @@ -116,8 +116,13 @@ def get_redis_hash(self): self.logger.warning('Found invalid hash in %s: `%s` with ' 'hvals: %s', self.queue, redis_hash, self.redis.hgetall(redis_hash)) - # self.redis.lrem(self.processing_queue, 1, redis_hash) - self._put_back_hash(redis_hash) + # self._put_back_hash(redis_hash) + self.redis.lrem(self.processing_queue, 1, redis_hash) + # Update redis with failed status + self.update_key(redis_hash, { + 'status': self.failed_status, + 'reason': 'Invalid filetype for "%s" job.'.format(self.queue), + }) def _handle_error(self, err, redis_hash): """Update redis with failure information, and log errors. diff --git a/redis_consumer/consumers/base_consumer_test.py b/redis_consumer/consumers/base_consumer_test.py index b3be6527..412fd791 100644 --- a/redis_consumer/consumers/base_consumer_test.py +++ b/redis_consumer/consumers/base_consumer_test.py @@ -194,8 +194,9 @@ def test_get_redis_hash(self): rhash = consumer.get_redis_hash() assert rhash == items[0] - assert redis_client.work_queue == items[1:] - assert redis_client.processing_queue == items[0:1] + assert not redis_client.work_queue + assert len(redis_client.processing_queue) + assert redis_client.processing_queue == [rhash] def test_purge_processing_queue(self): queue_name = 'q'