From c6a8740523d454ad89aa80ebc3184c08de80163b Mon Sep 17 00:00:00 2001 From: Jamie Turner Date: Thu, 13 Oct 2011 14:21:46 -0700 Subject: [PATCH] Make sure we close the subhub client if an error happens! --- diesel/protocols/redis.py | 102 +++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/diesel/protocols/redis.py b/diesel/protocols/redis.py index 908db83..c4dab59 100644 --- a/diesel/protocols/redis.py +++ b/diesel/protocols/redis.py @@ -874,57 +874,57 @@ def __isglob(self, glob): return '*' in glob or '?' in glob or ('[' in glob and ']' and glob) def __call__(self): - conn = self.make_client() - subs = self.subs - for sub in subs: - if self.__isglob(sub): - conn.psubscribe(sub) - else: - conn.subscribe(sub) - while True: - new = rm = None - if self.sub_adds: - sa = self.sub_adds[:] - self.sub_adds = [] - new_subs, new_glob_subs = set(), set() - for k, q in sa: - new = new_glob_subs if self.__isglob(k) else new_subs - - if k not in subs: - new.add(k) - subs[k] = set([q]) - else: - subs[k].add(q) - - if new_subs: - conn.subscribe(*new_subs) - if new_glob_subs: - conn.psubscribe(*new_glob_subs) - - if self.sub_rms: - sr = self.sub_rms[:] - self.sub_rms = [] - rm_subs, rm_glob_subs = set(), set() - for k, q in sr: - rm = rm_glob_subs if self.__isglob(k) else rm_subs - - subs[k].remove(q) - if not subs[k]: - del subs[k] - rm.add(k) - - if rm_subs: - conn.unsubscribe(*rm_subs) - if rm_glob_subs: - conn.punsubscribe(*rm_glob_subs) - - if not self.sub_rms and not self.sub_adds: - r = conn.get_from_subscriptions(self.sub_wake_signal) - if r: - cls, key, msg = r - if cls in subs: - for q in subs[cls]: - q.put((key, msg)) + with self.make_client() as conn: + subs = self.subs + for sub in subs: + if self.__isglob(sub): + conn.psubscribe(sub) + else: + conn.subscribe(sub) + while True: + new = rm = None + if self.sub_adds: + sa = self.sub_adds[:] + self.sub_adds = [] + new_subs, new_glob_subs = set(), set() + for k, q in sa: + new = new_glob_subs if self.__isglob(k) else new_subs + + if k not in subs: + new.add(k) + subs[k] = set([q]) + else: + subs[k].add(q) + + if new_subs: + conn.subscribe(*new_subs) + if new_glob_subs: + conn.psubscribe(*new_glob_subs) + + if self.sub_rms: + sr = self.sub_rms[:] + self.sub_rms = [] + rm_subs, rm_glob_subs = set(), set() + for k, q in sr: + rm = rm_glob_subs if self.__isglob(k) else rm_subs + + subs[k].remove(q) + if not subs[k]: + del subs[k] + rm.add(k) + + if rm_subs: + conn.unsubscribe(*rm_subs) + if rm_glob_subs: + conn.punsubscribe(*rm_glob_subs) + + if not self.sub_rms and not self.sub_adds: + r = conn.get_from_subscriptions(self.sub_wake_signal) + if r: + cls, key, msg = r + if cls in subs: + for q in subs[cls]: + q.put((key, msg)) @contextmanager def sub(self, classes):