Skip to content
This repository has been archived by the owner on Oct 31, 2020. It is now read-only.

Commit

Permalink
Merge pull request #4 from oasiswork/nb-fix-bytes
Browse files Browse the repository at this point in the history
Handle bytes id provided by queue
  • Loading branch information
icgood committed May 15, 2016
2 parents a3599c3 + 010a2ff commit 3d40dfa
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions slimta/redisstorage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@

from __future__ import absolute_import

import os
import uuid
import time

from six.moves import cPickle

import redis
import gevent
from gevent import socket

from slimta.queue import QueueStorage
Expand Down Expand Up @@ -81,11 +79,18 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
self.prefix = prefix
self.queue_key = '{0}queue'.format(prefix)

def _get_key(self, id):
try:
id = id.decode('ascii')
except AttributeError:
pass
return self.prefix + id

def write(self, envelope, timestamp):
envelope_raw = cPickle.dumps(envelope, cPickle.HIGHEST_PROTOCOL)
while True:
id = uuid.uuid4().hex
key = self.prefix + id
key = self._get_key(id)
if self.redis.hsetnx(key, 'envelope', envelope_raw):
queue_raw = cPickle.dumps((timestamp, id),
cPickle.HIGHEST_PROTOCOL)
Expand All @@ -98,20 +103,20 @@ def write(self, envelope, timestamp):
return id

def set_timestamp(self, id, timestamp):
self.redis.hset(self.prefix+id, 'timestamp', timestamp)
self.redis.hset(self._get_key(id), 'timestamp', timestamp)
log.update_meta(id, timestamp=timestamp)

def increment_attempts(self, id):
new_attempts = self.redis.hincrby(self.prefix+id, 'attempts', 1)
new_attempts = self.redis.hincrby(self._get_key(id), 'attempts', 1)
log.update_meta(id, attempts=new_attempts)
return new_attempts

def set_recipients_delivered(self, id, rcpt_indexes):
current = self.redis.hget(self.prefix+id, 'delivered_indexes')
current = self.redis.hget(self._get_key(id), 'delivered_indexes')
new_indexes = rcpt_indexes
if current:
new_indexes = cPickle.loads(current) + rcpt_indexes
self.redis.hset(self.prefix+id, 'delivered_indexes',
self.redis.hset(self._get_key(id), 'delivered_indexes',
cPickle.dumps(new_indexes, cPickle.HIGHEST_PROTOCOL))
log.update_meta(id, delivered_indexes=rcpt_indexes)

Expand All @@ -124,7 +129,7 @@ def load(self):

def get(self, id):
envelope_raw, attempts, delivered_indexes_raw = \
self.redis.hmget(self.prefix+id, 'envelope', 'attempts',
self.redis.hmget(self._get_key(id), 'envelope', 'attempts',
'delivered_indexes')
if not envelope_raw:
raise KeyError(id)
Expand All @@ -136,7 +141,7 @@ def get(self, id):
return envelope, int(attempts or 0)

def remove(self, id):
self.redis.delete(self.prefix+id)
self.redis.delete(self._get_key(id))
log.remove(id)

def wait(self):
Expand Down

0 comments on commit 3d40dfa

Please sign in to comment.