Skip to content

Commit

Permalink
data packets are now binary websocket messages
Browse files Browse the repository at this point in the history
  • Loading branch information
progrium committed Sep 25, 2011
1 parent ed80ca8 commit b7a4372
Showing 1 changed file with 54 additions and 41 deletions.
95 changes: 54 additions & 41 deletions localtunnel/server.py
Expand Up @@ -15,6 +15,12 @@
from ws4py.client.geventclient import WebSocketClient
from ws4py.server.wsgi.middleware import WebSocketUpgradeMiddleware

def encode_data_packet(conn_id, data):
return ''.join([chr(conn_id), data])

def decode_data_packet(data):
return data[0], str(data[1:])

class CodependentGroup(Group):
"""
A greenlet group that will kill all greenlets if a single one dies.
Expand Down Expand Up @@ -107,7 +113,7 @@ def handle(self):

def _proxy_in(self, socket, conn):
while True:
data = socket.recv(1024)
data = socket.recv(2048)
if not data:
return
conn.send(data)
Expand Down Expand Up @@ -142,15 +148,22 @@ def handle_websocket(self, websocket, environ):
websocket.close()

def _tunnel_in(self, tunnel, websocket):
for msg in tunnel:
websocket.send(msg)
for type, msg in tunnel:
if type == 'binary':
websocket.send(msg, binary=True)
elif type == 'text':
websocket.send(msg)

def _tunnel_out(self, websocket, tunnel):
while True:
msg = websocket.receive()
msg = websocket.receive(msg_obj=True)
if msg is None:
return
tunnel.dispatch(msg)
if msg.is_text:
tunnel.dispatch(message=str(msg))
elif msg.is_binary:
tunnel.dispatch(data=msg.data)



class Tunnel(object):
Expand Down Expand Up @@ -178,20 +191,21 @@ def __iter__(self):
def next(self):
return self.tunnelq.get()

def dispatch(self, message):
def dispatch(self, message=None, data=None):
""" From the tunnel (server) to the proxy (client) """
try:
parsed = json.loads(str(message))
except ValueError:
raise
conn_id, event = parsed[0:2]
if conn_id not in self.connections:
return
if event == 'closed':
conn = self.connections.pop(conn_id)
conn.close()
elif event == 'data':
data = base64.b64decode(parsed[2])
if message:
try:
parsed = json.loads(message)
except ValueError:
raise
conn_id, event = parsed[0:2]
if conn_id not in self.connections:
return
if event == 'closed':
conn = self.connections.pop(conn_id)
conn.close()
elif data:
conn_id, data = decode_data_packet(data)
self.connections[conn_id].recvq.put(data)

class ConnectionProxy(object):
Expand All @@ -208,11 +222,13 @@ def send(self, data=None, open=None):
""" From the proxy (client) to the tunnel (server) """
if open is True:
msg = [self.id, 'open']
self.tunnel.tunnelq.put(('text', json.dumps(msg)))
elif open is False:
msg = [self.id, 'closed']
self.tunnel.tunnelq.put(('text', json.dumps(msg)))
else:
msg = [self.id, 'data', base64.b64encode(data)]
self.tunnel.tunnelq.put(json.dumps(msg))
data = encode_data_packet(self.id, data)
self.tunnel.tunnelq.put(('binary', data))

def close(self):
self.recvq.put(None)
Expand All @@ -225,15 +241,14 @@ class TunnelClient(Service):
hostname = Option('hostname', default="localtunnel.com")

def __init__(self):
self.ws = WebSocketClient('http://127.0.0.1:9999/t/test.localtunnel.local')
#self.ws = WebSocketClient('http://%s:%s/t/test.localtunnel.local' %
#(self.hostname, self.server_port))
self.ws = WebSocketClient('http://%s:%s/t/test.localtunnel.local' %
(self.hostname, self.server_port))
self.connections = {}

def do_start(self):
self.ws.connect()
gevent.spawn(self.listen)
gevent.spawn(self.visual_heartbeat)
#gevent.spawn(self.visual_heartbeat)

def visual_heartbeat(self):
while True:
Expand All @@ -242,19 +257,19 @@ def visual_heartbeat(self):

def listen(self):
while True:
msg = self.ws.receive()
msg = self.ws.receive(msg_obj=True)
if msg is None:
print "Trying to stop"
self.stop()
#print "T>>>", msg
parsed = json.loads(str(msg))
conn_id, event = parsed[0:2]
if event == 'open':
self.local_open(conn_id)
elif event == 'closed':
self.local_close(conn_id)
elif event == 'data':
data = base64.b64decode(parsed[2])
if msg.is_text:
parsed = json.loads(str(msg))
conn_id, event = parsed[0:2]
if event == 'open':
self.local_open(conn_id)
elif event == 'closed':
self.local_close(conn_id)
elif msg.is_binary:
conn_id, data = decode_data_packet(msg.data)
self.local_send(conn_id, data)

def local_open(self, conn_id):
Expand All @@ -272,23 +287,21 @@ def local_close(self, conn_id):

def local_send(self, conn_id, data):
self.connections[conn_id].send(data)
#print "S<<<", len(data)

def local_recv(self, conn_id):
while True:
data = self.connections[conn_id].recv(2048)
data = self.connections[conn_id].recv(4096)
if not data:
break
#print "S>>>", len(data)
self.tunnel_send(conn_id, data)
self.tunnel_send(conn_id, open=False)

def tunnel_send(self, conn_id, data=None, open=None):
if open is False:
msg = [conn_id, 'closed']
self.ws.send(json.dumps(msg))
elif data:
msg = [conn_id, 'data', base64.b64encode(data)]
msg = encode_data_packet(conn_id, data)
self.ws.send(msg, binary=True)
else:
return
#print "T<<<", msg
self.ws.send(json.dumps(msg))
return

0 comments on commit b7a4372

Please sign in to comment.