Skip to content

Commit

Permalink
fail invalid hashes to remove them from the job queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Jun 4, 2020
1 parent 4e268b2 commit a8bf4ff
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
9 changes: 7 additions & 2 deletions redis_consumer/consumers/base_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,13 @@ def get_redis_hash(self):
self.logger.warning('Found invalid hash in %s: `%s` with '
'hvals: %s', self.queue, redis_hash,
self.redis.hgetall(redis_hash))
# self.redis.lrem(self.processing_queue, 1, redis_hash)
self._put_back_hash(redis_hash)
# self._put_back_hash(redis_hash)
self.redis.lrem(self.processing_queue, 1, redis_hash)
# Update redis with failed status
self.update_key(redis_hash, {
'status': self.failed_status,
'reason': 'Invalid filetype for "%s" job.'.format(self.queue),
})

def _handle_error(self, err, redis_hash):
"""Update redis with failure information, and log errors.
Expand Down
5 changes: 3 additions & 2 deletions redis_consumer/consumers/base_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ def test_get_redis_hash(self):

rhash = consumer.get_redis_hash()
assert rhash == items[0]
assert redis_client.work_queue == items[1:]
assert redis_client.processing_queue == items[0:1]
assert not redis_client.work_queue
assert len(redis_client.processing_queue)
assert redis_client.processing_queue == [rhash]

def test_purge_processing_queue(self):
queue_name = 'q'
Expand Down

0 comments on commit a8bf4ff

Please sign in to comment.