Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Make sure we close the subhub client if an error happens!
  • Loading branch information
Jamie Turner committed Oct 13, 2011
1 parent fcb36ea commit c6a8740
Showing 1 changed file with 51 additions and 51 deletions.
102 changes: 51 additions & 51 deletions diesel/protocols/redis.py
Expand Up @@ -874,57 +874,57 @@ def __isglob(self, glob):
return '*' in glob or '?' in glob or ('[' in glob and ']' and glob)

def __call__(self):
conn = self.make_client()
subs = self.subs
for sub in subs:
if self.__isglob(sub):
conn.psubscribe(sub)
else:
conn.subscribe(sub)
while True:
new = rm = None
if self.sub_adds:
sa = self.sub_adds[:]
self.sub_adds = []
new_subs, new_glob_subs = set(), set()
for k, q in sa:
new = new_glob_subs if self.__isglob(k) else new_subs

if k not in subs:
new.add(k)
subs[k] = set([q])
else:
subs[k].add(q)

if new_subs:
conn.subscribe(*new_subs)
if new_glob_subs:
conn.psubscribe(*new_glob_subs)

if self.sub_rms:
sr = self.sub_rms[:]
self.sub_rms = []
rm_subs, rm_glob_subs = set(), set()
for k, q in sr:
rm = rm_glob_subs if self.__isglob(k) else rm_subs

subs[k].remove(q)
if not subs[k]:
del subs[k]
rm.add(k)

if rm_subs:
conn.unsubscribe(*rm_subs)
if rm_glob_subs:
conn.punsubscribe(*rm_glob_subs)

if not self.sub_rms and not self.sub_adds:
r = conn.get_from_subscriptions(self.sub_wake_signal)
if r:
cls, key, msg = r
if cls in subs:
for q in subs[cls]:
q.put((key, msg))
with self.make_client() as conn:
subs = self.subs
for sub in subs:
if self.__isglob(sub):
conn.psubscribe(sub)
else:
conn.subscribe(sub)
while True:
new = rm = None
if self.sub_adds:
sa = self.sub_adds[:]
self.sub_adds = []
new_subs, new_glob_subs = set(), set()
for k, q in sa:
new = new_glob_subs if self.__isglob(k) else new_subs

if k not in subs:
new.add(k)
subs[k] = set([q])
else:
subs[k].add(q)

if new_subs:
conn.subscribe(*new_subs)
if new_glob_subs:
conn.psubscribe(*new_glob_subs)

if self.sub_rms:
sr = self.sub_rms[:]
self.sub_rms = []
rm_subs, rm_glob_subs = set(), set()
for k, q in sr:
rm = rm_glob_subs if self.__isglob(k) else rm_subs

subs[k].remove(q)
if not subs[k]:
del subs[k]
rm.add(k)

if rm_subs:
conn.unsubscribe(*rm_subs)
if rm_glob_subs:
conn.punsubscribe(*rm_glob_subs)

if not self.sub_rms and not self.sub_adds:
r = conn.get_from_subscriptions(self.sub_wake_signal)
if r:
cls, key, msg = r
if cls in subs:
for q in subs[cls]:
q.put((key, msg))

@contextmanager
def sub(self, classes):
Expand Down

0 comments on commit c6a8740

Please sign in to comment.