Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add a new protocol opcode: Unsubscribe + fix a small typo

  • Loading branch information...
commit 34a014fcdd35c72e076c6706ac9dc54bdade42b2 1 parent de95316
Franck Guénichot malphx authored
Showing with 23 additions and 1 deletion.
  1. +23 −1 broker/feedbroker.py
24 broker/feedbroker.py
View
@@ -24,6 +24,7 @@
OP_AUTH = 2
OP_PUBLISH = 3
OP_SUBSCRIBE = 4
+OP_UNSUBSCRIBE = 5
MAXBUF = 1024**2
SIZES = {
@@ -32,6 +33,7 @@
OP_AUTH: 5+256+20,
OP_PUBLISH: 5+MAXBUF,
OP_SUBSCRIBE: 5+256*2,
+ OP_UNSUBSCRIBE: 5+256*2,
}
@@ -86,7 +88,7 @@ def sendinfo(self):
self.conn.write(self.msginfo())
def auth(self, ident, hash):
- p = self.db.query('hpfeeds.authkey', {'identifier': str(ident)}, limit=1)
+ p = self.db.query('hpfeeds.auth_key', {'identifier': str(ident)}, limit=1)
p._when(self.checkauth, hash)
def dbexc(e):
@@ -152,6 +154,19 @@ def io_in(self, data):
continue
self._event('subscribe', self, chan)
+ elif opcode == OP_UNSUBSCRIBE:
+ rest = buffer(data, 0)
+ ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
+
+ if not ident in self.idents:
+ self.error('identfail.')
+ continue
+
+ if not chan in self.subchans:
+ self.error('accessfail.')
+ continue
+
+ self._event('unsubscribe', self, chan)
elif opcode == OP_AUTH:
rest = buffer(data, 0)
ident, hash = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
@@ -216,6 +231,7 @@ def _newconn(self, c, addr):
self.connections.add(fc)
fc._on('close', self._connclose)
fc._on('subscribe', self._subscribe)
+ fc._on('unsubscribe', self._unsubscribe)
fc._on('publish', self._publish)
def _newconnplain(self, c, addr):
@@ -224,6 +240,7 @@ def _newconnplain(self, c, addr):
self.connections.add(fc)
fc._on('close', self._connclose)
fc._on('subscribe', self._subscribe)
+ fc._on('unsubscribe', self._unsubscribe)
fc._on('publish', self._publish)
def _connclose(self, c):
@@ -242,6 +259,11 @@ def _subscribe(self, c, chan):
logging.debug('broker subscribe to {0} by {1}'.format(chan, c.addr))
self.subscribermap[chan].append(c)
self.conn2chans[c].append(chan)
+
+ def _unsubscribe(self, c, chan):
+ logging.debug('broker unsubscribe to {0} by {1}'.format(chan, c.addr))
+ self.subscribermap[chan].remove(c)
+ self.conn2chans[c].remove(chan)
def main():
fb = FeedBroker()
Please sign in to comment.
Something went wrong with that request. Please try again.