Permalink
Browse files

Merge pull request #1 from technoweenie/master

Graceful shutdown; git ignores
  • Loading branch information...
2 parents 9f6579f + 1a38837 commit 1ee0581063a7b5c2ed515e76fc660af362395a50 @srinikom srinikom committed Nov 23, 2011
Showing with 44 additions and 26 deletions.
  1. +3 −0 .gitignore
  2. +41 −26 leveldb-server.py
View
@@ -0,0 +1,3 @@
+level.db
+*.pyc
+clients/py/build/lib
View
@@ -8,23 +8,32 @@
import leveldb
import json
import optparse
+from time import sleep
class workerThread(threading.Thread):
"""workerThread"""
def __init__(self, context, db):
threading.Thread.__init__ (self)
self.context = context
self.db = db
-
+ self.running = True
+ self.processing = False
+ self.socket = self.context.socket(zmq.XREQ)
+
def run(self):
- socket = self.context.socket(zmq.XREQ)
- socket.connect('inproc://backend')
- while True:
- msg = socket.recv_multipart()
+ self.socket.connect('inproc://backend')
+ while self.running:
+ try:
+ msg = self.socket.recv_multipart()
+ except zmq.ZMQError:
+ self.running = False
+ continue
+
+ self.processing = True
if len(msg) != 3:
value = 'None'
reply = [msg[0], value]
- socket.send_multipart(reply)
+ self.socket.send_multipart(reply)
continue
id = msg[0]
op = msg[1]
@@ -74,10 +83,14 @@ def run(self):
else:
value = ""
reply.append(value)
- #print reply
- socket.send_multipart(reply)
-
- socket.close()
+ self.socket.send_multipart(reply)
+ self.processing = False
+
+ def close(self):
+ self.running = False
+ while self.processing:
+ sleep(1)
+ self.socket.close()
if __name__ == "__main__":
optparser = optparse.OptionParser(
@@ -114,20 +127,22 @@ def run(self):
worker.start()
workers.append(worker)
- while True:
- sockets = dict(poll.poll())
- if frontend in sockets:
- if sockets[frontend] == zmq.POLLIN:
- msg = frontend.recv_multipart()
- backend.send_multipart(msg)
-
- if backend in sockets:
- if sockets[backend] == zmq.POLLIN:
- msg = backend.recv_multipart()
- frontend.send_multipart(msg)
-
- #never here
- frontend.close()
- backend.close()
- context.term()
+ try:
+ while True:
+ sockets = dict(poll.poll())
+ if frontend in sockets:
+ if sockets[frontend] == zmq.POLLIN:
+ msg = frontend.recv_multipart()
+ backend.send_multipart(msg)
+
+ if backend in sockets:
+ if sockets[backend] == zmq.POLLIN:
+ msg = backend.recv_multipart()
+ frontend.send_multipart(msg)
+ except KeyboardInterrupt:
+ for worker in workers:
+ worker.close()
+ frontend.close()
+ backend.close()
+ context.term()

0 comments on commit 1ee0581

Please sign in to comment.