Permalink
Browse files

broker - join/leave on special ..broker chan

  • Loading branch information...
1 parent 4fa5e29 commit 5b0ed0dd36fe263d4ba83b2e0aa20ac1135b80a0 @rep committed Apr 23, 2012
Showing with 21 additions and 7 deletions.
  1. +21 −7 broker/feedbroker.py
View
28 broker/feedbroker.py
@@ -35,7 +35,6 @@
OP_SUBSCRIBE: 5+256*2,
OP_UNSUBSCRIBE: 5+256*2,
}
-
class BadClient(Exception):
pass
@@ -153,7 +152,7 @@ def io_in(self, data):
self.error('accessfail.')
continue
- self._event('subscribe', self, chan)
+ self._event('subscribe', self, chan, ident)
elif opcode == OP_UNSUBSCRIBE:
rest = buffer(data, 0)
ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
@@ -166,7 +165,7 @@ def io_in(self, data):
self.error('accessfail.')
continue
- self._event('unsubscribe', self, chan)
+ self._event('unsubscribe', self, chan, ident)
elif opcode == OP_AUTH:
rest = buffer(data, 0)
ident, hash = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
@@ -191,6 +190,11 @@ def msginfo(self):
def msghdr(self, op, data):
return struct.pack('!iB', 5+len(data), op) + data
+ def msgpublish(self, ident, chan, data):
+ return self.msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
+
+ def publish(self, ident, chan, data):
+ self.conn.write(self.msgpublish(ident, chan, data))
class FeedBroker(object):
def __init__(self):
@@ -247,23 +251,33 @@ def _connclose(self, c):
self.connections.remove(c)
for chan in self.conn2chans[c]:
self.subscribermap[chan].remove(c)
+ for ident in c.idents:
+ self._brokerchan(c, chan, ident, 0)
def _publish(self, c, chan, data):
logging.debug('broker publish to {0} by {1}'.format(chan, c.addr))
for c2 in self.subscribermap[chan]:
- if c2 == c:
- continue
+ if c2 == c: continue
c2.forward(data)
- def _subscribe(self, c, chan):
+ def _subscribe(self, c, chan, ident):
logging.debug('broker subscribe to {0} by {1}'.format(chan, c.addr))
self.subscribermap[chan].append(c)
self.conn2chans[c].append(chan)
+ self._brokerchan(c, chan, ident, 1)
- def _unsubscribe(self, c, chan):
+ def _unsubscribe(self, c, chan, ident):
logging.debug('broker unsubscribe to {0} by {1}'.format(chan, c.addr))
self.subscribermap[chan].remove(c)
self.conn2chans[c].remove(chan)
+ self._brokerchan(c, chan, ident, 0)
+
+ def _brokerchan(self, c, chan, ident, subscribe=0):
+ data = 'join' if subscribe else 'leave'
+ if self.subscribermap[chan+'..broker']:
+ for c2 in self.subscribermap[chan+'..broker']:
+ if c2 == c: continue
+ c2.publish(ident, chan+'..broker', data)
def main():
fb = FeedBroker()

0 comments on commit 5b0ed0d

Please sign in to comment.