Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 237 lines (193 sloc) 6.268 kB
f71f068 @rep add kippo module
authored
1 from kippo.core import dblog
2 from twisted.python import log
3
4 import os
5 import struct
6 import hashlib
7 import json
8 import socket
9 import uuid
10
11 BUFSIZ = 16384
12
13 OP_ERROR = 0
14 OP_INFO = 1
15 OP_AUTH = 2
16 OP_PUBLISH = 3
17 OP_SUBSCRIBE = 4
18
19 MAXBUF = 1024**2
20 SIZES = {
21 OP_ERROR: 5+MAXBUF,
22 OP_INFO: 5+256+20,
23 OP_AUTH: 5+256+20,
24 OP_PUBLISH: 5+MAXBUF,
25 OP_SUBSCRIBE: 5+256*2,
26 }
27
28 KIPPOCHAN = 'kippo.sessions'
29
30 class BadClient(Exception):
31 pass
32
33 # packs a string with 1 byte length field
34 def strpack8(x):
35 if isinstance(x, str): x = x.encode('latin1')
36 return struct.pack('!B', len(x)) + x
37
38 # unpacks a string with 1 byte length field
39 def strunpack8(x):
40 l = x[0]
41 return x[1:1+l], x[1+l:]
42
43 def msghdr(op, data):
44 return struct.pack('!iB', 5+len(data), op) + data
45 def msgpublish(ident, chan, data):
46 return msghdr(OP_PUBLISH, strpack8(ident) + strpack8(chan) + data)
47 def msgsubscribe(ident, chan):
48 if isinstance(chan, str): chan = chan.encode('latin1')
49 return msghdr(OP_SUBSCRIBE, strpack8(ident) + chan)
50 def msgauth(rand, ident, secret):
51 hash = hashlib.sha1(bytes(rand)+secret).digest()
52 return msghdr(OP_AUTH, strpack8(ident) + hash)
53
54 class FeedUnpack(object):
55 def __init__(self):
56 self.buf = bytearray()
57 def __iter__(self):
58 return self
59 def next(self):
60 return self.unpack()
61 def feed(self, data):
62 self.buf.extend(data)
63 def unpack(self):
64 if len(self.buf) < 5:
65 raise StopIteration('No message.')
66
67 ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5))
68 if ml > SIZES.get(opcode, MAXBUF):
69 raise BadClient('Not respecting MAXBUF.')
70
71 if len(self.buf) < ml:
72 raise StopIteration('No message.')
73
74 data = bytearray(buffer(self.buf, 5, ml-5))
75 del self.buf[:ml]
76 return opcode, data
77
78 class hpclient(object):
79 def __init__(self, server, port, ident, secret, debug):
80 print 'hpfeeds client init broker {0}:{1}, identifier {2}'.format(server, port, ident)
81 self.server, self.port = server, int(port)
82 self.ident, self.secret = ident.encode('latin1'), secret.encode('latin1')
83 self.debug = debug
84 self.unpacker = FeedUnpack()
85 self.state = 'INIT'
86
87 self.connect()
88 self.sendfiles = []
89 self.filehandle = None
90
91 def connect(self):
92 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
93 self.s.settimeout(3)
94 try: self.s.connect((self.server, self.port))
95 except:
96 print 'hpfeeds client could not connect to broker.'
97 self.s = None
98 else:
99 self.s.settimeout(None)
100 self.handle_established()
101
102 def send(self, data):
103 if not self.s: return
104 self.s.send(data)
105
106 def close(self):
107 self.s.close()
108 self.s = None
109
110 def handle_established(self):
111 if self.debug: print 'hpclient established'
112 while self.state != 'GOTINFO':
113 self.read()
114
115 #quickly try to see if there was an error message
116 self.s.settimeout(0.5)
117 self.read()
118 self.s.settimeout(None)
119
120 def read(self):
121 if not self.s: return
122 try: d = self.s.recv(BUFSIZ)
123 except socket.timeout:
124 return
125
126 if not d:
127 if self.debug: log.msg('hpclient connection closed?')
128 self.close()
129 return
130
131 self.unpacker.feed(d)
132 try:
133 for opcode, data in self.unpacker:
134 if self.debug: log.msg('hpclient msg opcode {0} data {1}'.format(opcode, data))
135 if opcode == OP_INFO:
136 name, rand = strunpack8(data)
137 if self.debug: log.msg('hpclient server name {0} rand {1}'.format(name, rand))
138 self.send(msgauth(rand, self.ident, self.secret))
139 self.state = 'GOTINFO'
140
141 elif opcode == OP_PUBLISH:
142 ident, data = strunpack8(data)
143 chan, data = strunpack8(data)
144 if self.debug: log.msg('publish to {0} by {1}: {2}'.format(chan, ident, data))
145
146 elif opcode == OP_ERROR:
147 log.err('errormessage from server: {0}'.format(data))
148 else:
149 log.err('unknown opcode message: {0}'.format(opcode))
150 except BadClient:
151 log.err('unpacker error, disconnecting.')
152 self.close()
153
154 def publish(self, channel, **kwargs):
155 self.send(msgpublish(self.ident, channel, json.dumps(kwargs).encode('latin1')))
156
157 def sendfile(self, filepath):
158 # does not read complete binary into memory, read and send chunks
159 if not self.filehandle:
160 self.sendfileheader(i.file)
161 self.sendfiledata()
162 else: self.sendfiles.append(filepath)
163
164 def sendfileheader(self, filepath):
165 self.filehandle = open(filepath, 'rb')
166 fsize = os.stat(filepath).st_size
167 headc = strpack8(self.ident) + strpack8(UNIQUECHAN)
168 headh = struct.pack('!iB', 5+len(headc)+fsize, OP_PUBLISH)
169 self.send(headh + headc)
170
171 def sendfiledata(self):
172 tmp = self.filehandle.read(BUFSIZ)
173 if not tmp:
174 if self.sendfiles:
175 fp = self.sendfiles.pop(0)
176 self.sendfileheader(fp)
177 else:
178 self.filehandle = None
179 self.handle_io_in(b'')
180 else:
181 self.send(tmp)
182
183
184 class DBLogger(dblog.DBLogger):
185 def start(self, cfg):
186 print 'hpfeeds DBLogger start'
187
188 server = cfg.get('database_hpfeeds', 'server')
189 port = cfg.get('database_hpfeeds', 'port')
190 ident = cfg.get('database_hpfeeds', 'identifier')
191 secret = cfg.get('database_hpfeeds', 'secret')
192 debug = cfg.get('database_hpfeeds', 'debug')
193
194 self.client = hpclient(server, port, ident, secret, debug)
195 self.meta = {}
196
197 # We have to return an unique ID
198 def createSession(self, peerIP, peerPort, hostIP, hostPort):
199 session = uuid.uuid4().hex
200 self.meta[session] = {'peerIP': peerIP, 'peerPort': peerPort,
201 'hostIP': hostIP, 'hostPort': hostPort, 'loggedin': None,
202 'credentials':[], 'version': None, 'ttylog': None }
203 return session
204
205 def handleConnectionLost(self, session, args):
206 log.msg('publishing metadata to hpfeeds')
207 meta = self.meta[session]
208 ttylog = self.ttylog(session)
209 if ttylog: meta['ttylog'] = ttylog.encode('hex')
210 self.client.publish(KIPPOCHAN, **meta)
211
212 def handleLoginFailed(self, session, args):
213 u, p = args['username'], args['password']
214 self.meta[session]['credentials'].append((u,p))
215
216 def handleLoginSucceeded(self, session, args):
217 u, p = args['username'], args['password']
218 self.meta[session]['loggedin'] = (u,p)
219
220 def handleCommand(self, session, args):
221 pass
222
223 def handleUnknownCommand(self, session, args):
224 pass
225
226 def handleInput(self, session, args):
227 pass
228
229 def handleTerminalSize(self, session, args):
230 pass
231
232 def handleClientVersion(self, session, args):
233 v = args['version']
234 self.meta[session]['version'] = v
235
236 # vim: set sw=4 et:
Something went wrong with that request. Please try again.