Skip to content

Commit

Permalink
client enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
Sun Ning committed May 18, 2012
1 parent 8526db1 commit 8dc0349
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -7,7 +7,7 @@
long_desc=open('README.rst','r').read()

setup(name="slacker-python",
version="0.1.1",
version="0.1.2",
author="Sun Ning",
author_email="sunng@about.me",
description="python client of slacker RPC",
Expand Down
69 changes: 50 additions & 19 deletions slacker/client.py
@@ -1,5 +1,6 @@
import random
import itertools
import struct

import gevent
import gevent.socket
Expand All @@ -10,28 +11,47 @@

class Connection(object):
def __init__(self, addr):
self.host = addr.split(":")[0]
self.port = int(addr.split(":")[1])
self.addr = (addr.split(":")[0], int(addr.split(":")[1]))
self.sock = None
self.connect_lock = gevent.event.Event()
self.transid = itertools.count()
self.reqs = {}

def connect(self):
self.sock = gevent.socket.create_connection((self.host, self.port))
self.clientLoop = gevent.spawn(self.readLoop)
while True:
try:
self.sock = gevent.socket.create_connection(self.addr)
self.connect_lock.set()
self.eloop = gevent.spawn(self.readLoop)
break
except:
gevent.sleep(5)

def reconnect(self):
self.close()
self.connect()


def readLoop(self):
while True:
_, tid, packetType = readHeader(self.sock)
resp = None
if packetType == PROTOCOL_PACKET_TYPE_RESPONSE:
resp = readResponse(self.sock)

elif packetType == PROTOCOL_PACKET_TYPE_ERROR:
resp = readError(self.sock)
cb = self.reqs[tid]
cb.set(resp)
del self.reqs[tid]
try:
_, tid, packetType = readHeader(self.sock)
resp = None
if packetType == PROTOCOL_PACKET_TYPE_RESPONSE:
resp = readResponse(self.sock)

elif packetType == PROTOCOL_PACKET_TYPE_ERROR:
resp = readError(self.sock)

cb = self.reqs[tid]
cb.set(resp)
del self.reqs[tid]
except struct.error:
self.reconnect()
break
except IOError:
self.reconnect()
break

def send(self, request):
transid = self.transid.next()
Expand All @@ -43,15 +63,23 @@ def send(self, request):
writeRequest(buf, request)
data = buf.getvalue()
buf.close()

self.sock.send(data)

try:
self.connect_lock.wait()
self.sock.send(data)
except:
## reconnect
self.reconnect()

return cb

def close(self):
self.clientLoop.kill()
self.sock.close()
if self.eloop:
gevent.kill(self.eloop)
if self.sock:
self.sock.close()
self.reqs.clear()
self.connect_lock.clear()

class Client(object):
def __init__(self, addrs):
Expand All @@ -64,7 +92,10 @@ def call(self, fname, args):
req.serialize()
conn = random.choice(self.connections)
cb = conn.send(req)
result = cb.get()
try:
result = cb.get(timeout=10)
except Timeout, t:
raise RuntimeError("Timeout")
if isinstance(result, SlackerResponse):
if result.code == PROTOCOL_RESULT_CODE_SUCCESS:
result.desrialize()
Expand Down

0 comments on commit 8dc0349

Please sign in to comment.