Skip to content
Permalink
Browse files

partial websocket support

  • Loading branch information
lunixbochs committed Jul 29, 2012
1 parent 80f7688 commit ca1483525efc1f2b4b8b57eaf86ccead411bb6e8
Showing with 179 additions and 25 deletions.
  1. +137 −4 Client.py
  2. +7 −1 DataHandler.py
  3. +12 −9 Dispatcher.py
  4. +23 −11 server.py
141 Client.py
@@ -1,10 +1,19 @@
import socket, time, sys, thread, ip2country, errno
import Telnet

import socket, time, thread, ip2country, errno
from collections import defaultdict

from BaseHTTPServer import BaseHTTPRequestHandler
from StringIO import StringIO
import base64
import struct
# TODO: compatibility for python < 2.6?
from hashlib import sha1

import Telnet

class Client:
'this object represents one connected client'
websocket = False
handshake = False

def __init__(self, root, connection, address, session_id):
'initial setup for the connected client'
@@ -79,7 +88,7 @@ def __init__(self, root, connection, address, session_id):
self.data = ''

# holds compatibility flags - will be set by Protocol as necessary
self.compat = defaultdict(False)
self.compat = defaultdict(lambda: False)
self.scriptPassword = None

now = time.time()
@@ -166,6 +175,15 @@ def SendNow(self, msg):
if self.telnet:
msg = Telnet.filter_out(msg)
if not msg: return

if self.websocket:
if self.handshake:
msg = self.NewFrame(msg)
else:
# haven't done a handshake yet, doesn't make sense to send a message
# let's queue it instead just in case they finish the handshake before they disconnect
self.Send(msg)
return
try:
self.conn.send(msg+self.nl)
except socket.error:
@@ -184,6 +202,9 @@ def FlushBuffer(self):
message = self.sendbuffer.pop(0)
self.sendingmessage = message
senddata = self.sendingmessage# [:64] # smaller chunks interpolate better, maybe base this off of number of clients?

if self.websocket:
senddata = self.NewFrame(senddata)
try:
sent = self.conn.send(senddata)
self.sendingmessage = self.sendingmessage[sent:] # only removes the number of bytes sent
@@ -247,3 +268,115 @@ def isAdmin(self):

def isMod(self):
return self.isAdmin() or ('mod' in self.accesslevels) # maybe cache these

class HTTPRequest(BaseHTTPRequestHandler):
def __init__(self, request_text):
self.rfile = StringIO(request_text)
self.raw_requestline = self.rfile.readline()
self.error_code = self.error_message = None
self.parse_request()

def send_error(self, code, message):
self.error_code = code
self.error_message = message

websocket_upgrade_template = '''HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: %s
'''

class WebSocket(Client):
websocket = True

def NewFrame(self, msg):
first = int('10000001', 2)
frame = struct.pack('B', first)

length = len(msg)
if length < 126:
frame += struct.pack('B', length)
elif length < 65535:
frame += struct.pack('!H', length)
else:
frame += struct.pack('!Q', length)

frame += msg
return frame

def ParseFrame(self, frame):
byte = struct.Struct('B')
# first, = byte.unpack(frame[0])
second, = byte.unpack(frame[1])
mask = (second & 128 == 128)
second = second & 127

if second < 126:
length = second
mask_pos = 2
elif second == 126:
length, = struct.unpack('!H', frame[2:6])
mask_pos = 4
else:
length, = struct.unpack('!Q', frame[2:10])
mask_pos = 12

if mask:
masks = frame[mask_pos:mask_pos+4]
frame = frame[mask_pos+4:]
masks = [byte.unpack(b)[0] for b in masks]

msg = ''
for i in xrange(length):
msg += byte.pack(
byte.unpack(frame[i])[0] ^ masks[i % 4]
)
else:
msg = frame[mask_pos:]

return msg

# TODO: handle data after the end of the frame (put in a buffer for the next frame)

def HTTPError(self, req):
self.conn.send(' '.join((req.request_version, str(req.error_code), req.error_message)))
self.conn.send('\r\n\r\n')

def Handle(self, data):
if not self.handshake:
if '\r\n\r\n' in data:
frame, self.data = data.split('\r\n\r\n', 1)
elif '\n\n' in data:
frame, self.data = data.split('\n\n', 1)
else:
self.data = data
return

req = HTTPRequest(frame)
if req.error_code:
self.HTTPError(req)
elif req.command == 'GET' and req.path == '/websocket':
if ('Sec-WebSocket-Key' in req.headers and
'Upgrade' in req.headers and
req.headers['Upgrade'] == 'websocket'
):
if 'Sec-WebSocket-Key' in req.headers:
key = req.headers['Sec-WebSocket-Key'] + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
accept = base64.b64encode(sha1(key).digest())
response = websocket_upgrade_template % accept

if not '\r\n' in response:
response = response.replace('\r', '\n').replace('\n', '\r\n')
self.conn.send(response)
self.handshake = True
return

req.error_code = 500
req.error_message = 'Unable to open websocket'
self.HTTPError(req)
self.Remove()
else:
# parse websocket frame
frame = self.ParseFrame(data)
Client.Handle(self, frame)
@@ -20,6 +20,7 @@ def __init__(self):
self.console_buffer = []
self.port = 8200
self.natport = self.port+1
self.webport = self.port+2
self.dbtype = 'lan'
self.lanadmin = {'username':'', 'password':''}
self.latestspringversion = '*'
@@ -100,6 +101,8 @@ def parseArgv(self, argv):
print ' { Server will host on this port (default is 8200) }'
print ' -n, --natport number'
print ' { Server will use this port for NAT transversal (default is 8201) }'
print ' -w, --webport number'
print ' { Server will use this port for WebSocket clients (default is 8202) }'
print ' -l, --lan'
print ' { Users do not need to be registered to login - breaks rudimentary features like channel ops/founders, channel/battle bans, etc. }'
print ' -a, --lanadmin username password [hash] }'
@@ -158,6 +161,9 @@ def parseArgv(self, argv):
elif arg in ['n', 'natport']:
try: self.natport = int(argp[0])
except: print 'Invalid NAT port specification'
elif arg in ['w', 'webport']:
try: self.natport = int(argp[0])
except: print 'Invalid WebSocket port specification'
elif arg in ['l', 'lan']:
self.dbtype = 'lan'
elif arg in ['a', 'lanadmin']:
@@ -440,7 +446,7 @@ def error(self, error):
except KeyError: pass # the user was removed

def console_write(self, lines=''):
if type(lines) in(str, unicode):
if type(lines) in (str, unicode):
lines = lines.split('\n')
elif not type(lines) in (list, tuple, set):
try: lines = [lines.__repr__()]
@@ -1,10 +1,10 @@
import Multiplexer, Protocol, Client
import Multiplexer, Protocol
import socket, thread, traceback

class Dispatcher:
def __init__(self, root, server):
def __init__(self, root, servers):
self._root = root
self.server = server
self.servers = servers
self.poller = Multiplexer.BestMultiplexer()
self.socketmap = {}
self.workers = []
@@ -14,30 +14,33 @@ def __init__(self, root, server):
self.num = 0

def pump(self):
self.poller.register(self.server)
for s in self.servers.keys():
self.poller.register(s)

self.poller.pump(self.callback)

def callback(self, inputs, outputs, errors):
try:
for s in inputs:
if s == self.server:
if s in self.servers:
new_client = self.servers[s]
try:
conn, addr = self.server.accept()
conn, addr = s.accept()
except socket.error, e:
if e[0] == 24: # ulimit maxfiles, need to raise ulimit
self._root.console_write('Maximum files reached, refused new connection.')
else:
raise socket.error, e
client = Client.Client(self._root, conn, addr, self._root.session_id)
client = new_client(self._root, conn, addr, self._root.session_id)
self.addClient(client)
else:
try:
data = s.recv(1024)
if data:
if s in self.socketmap: # for threading, just need to pass this to a worker thread... remember to fix the problem for any calls to handler, and fix msg ids (handler.thread)
self.socketmap[s].Handle(data)
self.socketmap[s].Handle(data)
else:
print 'Problem, sockets are not being cleaned up properly.'
self._root.console_write('Problem: sockets are not being cleaned up properly.')
else:
raise socket.error, 'Connection closed.'
except socket.error:
@@ -5,10 +5,11 @@
from urllib import urlopen

from DataHandler import DataHandler
from Client import Client
from NATServer import NATServer
from Dispatcher import Dispatcher

import Client

import ip2country # just to make sure it's downloaded
import ChanServ

@@ -27,19 +28,25 @@ def sighup(sig, frame):
except AttributeError:
pass

def new_server(host, port, backlog=100):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
s.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
# fixes TIME_WAIT :D
s.bind((host, port))
s.listen(backlog)
return s

_root.console_write('-'*40)
_root.console_write('Starting uberserver...\n')

host = ''
port = _root.port
natport = _root.natport
backlog = 100
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR,
server.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 )
# fixes TIME_WAIT :D
server.bind((host,port))
server.listen(backlog)
webport = _root.webport

server = new_server(host, port)
websocket = new_server(host, webport)

try:
natserver = NATServer(natport)
@@ -69,10 +76,15 @@ def sighup(sig, frame):
_root.local_ip = local_addr
_root.online_ip = web_addr

_root.console_write('Listening for clients on port %i'%port)
_root.console_write('Using %i client handling thread(s).'%_root.max_threads)
_root.console_write('Listening for WebSocket connections on port %i' % webport)
_root.console_write('Listening for clients on port %i' % port)
_root.console_write('Using %i client handling thread(s).' % _root.max_threads)

dispatcher = Dispatcher(_root, server)
servers = {
server: (lambda root, conn, addr, sid: Client.Client(root, conn, addr, sid)),
websocket: (lambda root, conn, addr, sid: Client.WebSocket(root, conn, addr, sid))
}
dispatcher = Dispatcher(_root, servers)
_root.dispatcher = dispatcher

chanserv = True

0 comments on commit ca14835

Please sign in to comment.
You can’t perform that action at this time.