Skip to content
Newer
Older
100644 292 lines (227 sloc) 7.16 KB
2e58a01 @rep initial import
authored
1 #!/usr/bin/env python
2
3 import sys
4
5 import struct
6 import hashlib
7 import collections
8 import random
9
10 import logging
11
12 from evnet import loop, unloop, listenplain, EventGen
13 from evnet.mongodb import MongoConn
14
15 FBIP = '0.0.0.0'
84fd0a9 broker - some updates from cloud
root authored
16 FBPORT = 10000
17 FBNAME = '@hp2'
2e58a01 @rep initial import
authored
18 MONGOIP = '127.0.0.1'
19 MONGOPORT = 27017
20
21 OP_ERROR = 0
22 OP_INFO = 1
23 OP_AUTH = 2
24 OP_PUBLISH = 3
25 OP_SUBSCRIBE = 4
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
26 OP_UNSUBSCRIBE = 5
2e58a01 @rep initial import
authored
27
84fd0a9 broker - some updates from cloud
root authored
28 MAXBUF = 10* (1024**2)
7ba9966 @rep digest instead of hexdigest
authored
29 SIZES = {
30 OP_ERROR: 5+MAXBUF,
31 OP_INFO: 5+256+20,
32 OP_AUTH: 5+256+20,
33 OP_PUBLISH: 5+MAXBUF,
34 OP_SUBSCRIBE: 5+256*2,
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
35 OP_UNSUBSCRIBE: 5+256*2,
7ba9966 @rep digest instead of hexdigest
authored
36 }
37
2e58a01 @rep initial import
authored
38 class BadClient(Exception):
39 pass
40
41 class FeedUnpack(object):
42 def __init__(self):
43 self.buf = bytearray()
44 def __iter__(self):
45 return self
46 def next(self):
47 return self.unpack()
48 def feed(self, data):
49 self.buf.extend(data)
50 def unpack(self):
51 if len(self.buf) < 5:
52 raise StopIteration('No message.')
53
54 ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5))
7ba9966 @rep digest instead of hexdigest
authored
55 if ml > SIZES.get(opcode, MAXBUF):
56 raise BadClient('Not respecting MAXBUF.')
57
2e58a01 @rep initial import
authored
58 if len(self.buf) < ml:
59 raise StopIteration('No message.')
60
61 data = bytearray(buffer(self.buf, 5, ml-5))
62 del self.buf[:ml]
63 return opcode, data
64
65
66 class FeedConn(EventGen):
67 def __init__(self, conn, addr, db):
68 EventGen.__init__(self)
69 self.conn = conn
70 self.addr = addr
71 self.db = db
72 self.pubchans = set()
73 self.subchans = set()
74 self.idents = set()
75 self.delay = False
76
77 self.rand = struct.pack('<I', random.randint(2**31,2**32-1))
78 self.fu = FeedUnpack()
79
80 conn._on('read', self.io_in)
81 conn._on('close', self.closed)
82
83 self.sendinfo()
84
85 def sendinfo(self):
86 self.conn.write(self.msginfo())
87
88 def auth(self, ident, hash):
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
89 p = self.db.query('hpfeeds.auth_key', {'identifier': str(ident)}, limit=1)
2e58a01 @rep initial import
authored
90 p._when(self.checkauth, hash)
91
92 def dbexc(e):
93 logging.critical('Database query exception. {0}'.format(e))
94 self.error('Database query exception.')
95
96 p._except(dbexc)
97
98 self.delay = True
99
100 def checkauth(self, r, hash):
101 if len(r) > 0:
102 akobj = r[0]
7ba9966 @rep digest instead of hexdigest
authored
103 akhash = hashlib.sha1('{0}{1}'.format(self.rand, akobj['secret'])).digest()
2e58a01 @rep initial import
authored
104 if akhash == hash:
84fd0a9 broker - some updates from cloud
root authored
105 self.pubchans.update(akobj.get('publish', []))
106 self.subchans.update(akobj.get('subscribe', []))
2e58a01 @rep initial import
authored
107 self.idents.add(akobj['identifier'])
108 logging.info('Auth success by {0}.'.format(akobj['identifier']))
109 else:
110 self.error('authfail.')
111 logging.info('Auth failure by {0}.'.format(akobj['identifier']))
112 else:
113 self.error('authfail.')
114
115 self.delay = False
116 self.io_in(b'')
117
118 def closed(self, reason):
119 logging.debug('Connection closed, {0}'.format(reason))
120 self._event('close', self)
121
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
122 def may_publish(self, chan):
123 return chan in self.pubchans
124
125 def may_subscribe(self, chan):
126 return chan in self.subchans
127
2e58a01 @rep initial import
authored
128 def io_in(self, data):
129 self.fu.feed(data)
130 if self.delay:
131 return
132 try:
133 for opcode, data in self.fu:
134 if opcode == OP_PUBLISH:
135 rest = buffer(data, 0)
136 ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
137 chan, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
138
139 if not ident in self.idents:
140 self.error('identfail.')
141 continue
142
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
143 if not self.may_publish(chan):
2e58a01 @rep initial import
authored
144 self.error('accessfail.')
145 continue
146
147 self._event('publish', self, chan, data)
148 elif opcode == OP_SUBSCRIBE:
149 rest = buffer(data, 0)
150 ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
151
152 if not ident in self.idents:
153 self.error('identfail.')
154 continue
155
e5d1a21 @rep broker - fix subscribe checking on special ..broker chan
authored
156 checkchan = chan
157 if chan.endswith('..broker'): checkchan = chan.rsplit('..broker', 1)[0]
158
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
159 if not self.may_subscribe(checkchan):
2e58a01 @rep initial import
authored
160 self.error('accessfail.')
161 continue
162
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
163 self._event('subscribe', self, chan, ident)
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
164 elif opcode == OP_UNSUBSCRIBE:
165 rest = buffer(data, 0)
166 ident, chan = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
167
168 if not ident in self.idents:
169 self.error('identfail.')
170 continue
171
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
172 if not self.may_subscribe(chan):
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
173 self.error('accessfail.')
174 continue
175
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
176 self._event('unsubscribe', self, chan, ident)
2e58a01 @rep initial import
authored
177 elif opcode == OP_AUTH:
178 rest = buffer(data, 0)
179 ident, hash = rest[1:1+ord(rest[0])], rest[1+ord(rest[0]):]
180 self.auth(ident, hash)
734e670 @bg6cq fix identfail. error
bg6cq authored
181 if self.delay:
182 return
2e58a01 @rep initial import
authored
183
184 except BadClient:
185 self.conn.close()
186 logging.warn('Disconnecting bad client: {0}'.format(self.addr))
187
188 def forward(self, data):
189 self.conn.write(self.msghdr(OP_PUBLISH, data))
190
191 def error(self, emsg):
192 self.conn.write(self.msgerror(emsg))
193
194 def msgerror(self, emsg):
195 return self.msghdr(OP_ERROR, emsg)
196
197 def msginfo(self):
198 return self.msghdr(OP_INFO, '{0}{1}{2}'.format(chr(len(FBNAME)%0xff), FBNAME, self.rand))
199
200 def msghdr(self, op, data):
201 return struct.pack('!iB', 5+len(data), op) + data
202
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
203 def msgpublish(self, ident, chan, data):
204 return self.msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
205
206 def publish(self, ident, chan, data):
207 self.conn.write(self.msgpublish(ident, chan, data))
2e58a01 @rep initial import
authored
208
209 class FeedBroker(object):
210 def __init__(self):
211 self.ready = False
212
213 self.db = None
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
214 self.initdb()
2e58a01 @rep initial import
authored
215
216 self.listener = listenplain(host=FBIP, port=FBPORT)
217 self.listener._on('close', self._lclose)
218 self.listener._on('connection', self._newconn)
219
220 self.connections = set()
221 self.subscribermap = collections.defaultdict(list)
222 self.conn2chans = collections.defaultdict(list)
223
204de2e @rep moving broker fns around for the non-auth testbroker (no db)
authored
224 def initdb(self):
225 self.db = MongoConn(MONGOIP, MONGOPORT)
226 self.db._on('ready', self._dbready)
227 self.db._on('close', self._dbclose)
228
2e58a01 @rep initial import
authored
229 def _dbready(self):
230 self.ready = True
231 logging.info('Database ready.')
232
233 def _dbclose(self, e):
234 logging.critical('Database connection closed ({0}). Exiting.'.format(e))
235 unloop()
236
237 def _lclose(self, e):
238 logging.critical('Listener closed ({0}). Exiting.'.format(e))
239 unloop()
240
241 def _newconn(self, c, addr):
242 logging.debug('Connection from {0}.'.format(addr))
243 fc = FeedConn(c, addr, self.db)
244 self.connections.add(fc)
245 fc._on('close', self._connclose)
246 fc._on('subscribe', self._subscribe)
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
247 fc._on('unsubscribe', self._unsubscribe)
2e58a01 @rep initial import
authored
248 fc._on('publish', self._publish)
249
250 def _connclose(self, c):
251 self.connections.remove(c)
252 for chan in self.conn2chans[c]:
253 self.subscribermap[chan].remove(c)
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
254 for ident in c.idents:
255 self._brokerchan(c, chan, ident, 0)
2e58a01 @rep initial import
authored
256
257 def _publish(self, c, chan, data):
258 logging.debug('broker publish to {0} by {1}'.format(chan, c.addr))
259 for c2 in self.subscribermap[chan]:
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
260 if c2 == c: continue
2e58a01 @rep initial import
authored
261 c2.forward(data)
262
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
263 def _subscribe(self, c, chan, ident):
2e58a01 @rep initial import
authored
264 logging.debug('broker subscribe to {0} by {1}'.format(chan, c.addr))
265 self.subscribermap[chan].append(c)
266 self.conn2chans[c].append(chan)
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
267 self._brokerchan(c, chan, ident, 1)
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
268
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
269 def _unsubscribe(self, c, chan, ident):
34a014f @malphx Add a new protocol opcode: Unsubscribe + fix a small typo
malphx authored
270 logging.debug('broker unsubscribe to {0} by {1}'.format(chan, c.addr))
271 self.subscribermap[chan].remove(c)
272 self.conn2chans[c].remove(chan)
5b0ed0d @rep broker - join/leave on special ..broker chan
authored
273 self._brokerchan(c, chan, ident, 0)
274
275 def _brokerchan(self, c, chan, ident, subscribe=0):
276 data = 'join' if subscribe else 'leave'
277 if self.subscribermap[chan+'..broker']:
278 for c2 in self.subscribermap[chan+'..broker']:
279 if c2 == c: continue
280 c2.publish(ident, chan+'..broker', data)
2e58a01 @rep initial import
authored
281
282 def main():
283 fb = FeedBroker()
284
285 loop()
286 return 0
287
288 if __name__ == '__main__':
67e38be @rep log level only if __main__, testbroker simpler
authored
289 logging.basicConfig(level=logging.INFO)
2e58a01 @rep initial import
authored
290 sys.exit(main())
291
Something went wrong with that request. Please try again.