Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

complete fork test, but (perhaps single-process related?) network tes…

…ts are failing
  • Loading branch information...
commit b8863b81c45d48fadce39e731fc76bd2705d0c43 1 parent 7d4c776
@jmoiron authored
View
5 gaspar/__init__.py
@@ -0,0 +1,5 @@
+from producers import Producer, SimpleProducer
+from consumers import Consumer, SimpleConsumer
+
+__all__ = [p for p in dir() if not p.startswith('_')]
+
View
64 gaspar/consumers.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""Gaspar consumers (workers)."""
+
+from multiprocessing import cpu_count, Process
+from gevent import spawn
+from gevent_zeromq import zmq
+from gevent.event import Event
+
+num_cpus = cpu_count()
+
+class Consumer(object):
+ def __init__(self, producer, processes=num_cpus):
+ self.producer = producer
+ self.num_processes = processes
+ self.running = Event()
+ self.stopped = Event()
+ spawn(self.wait_for_start)
+ spawn(self.wait_for_stop)
+
+ def wait_for_start(self):
+ self.producer.start_event.wait()
+ self.start()
+
+ def wait_for_stop(self):
+ self.producer.stop_event.wait()
+ self.stop()
+
+ def start(self):
+ self.processes = [Process(target=self.subprocess) for i in range(self.num_processes)]
+ for process in self.processes:
+ process.start()
+ self.running.set()
+
+ def stop(self):
+ for process in self.processes:
+ if process.is_alive():
+ process.terminate()
+ process.join()
+ self.running.clear()
+ self.stopped.set()
+
+ def subprocess(self):
+ context = zmq.Context()
+ socket = context.socket(zmq.REP)
+ socket.connect("tcp://%s:%s" % (self.producer.host, self.producer.zmq_port))
+ while True:
+ msg = socket.recv()
+ reply = self.handle(msg)
+ socket.send(reply)
+
+ def handle(self, msg):
+ print "Received message length %s" % (len(msg))
+ return "Hello"
+
+
+class SimpleConsumer(Consumer):
+ def __init__(self, producer, handler, processes=num_cpus):
+ self.handler = handler
+ super(SimpleConsumer, self).__init__(producer, processes)
+
+ def handle(self, msg):
+ return self.handler(msg)
View
66 gaspar/producers.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""Gaspar producers."""
+
+from gevent import sleep, socket
+from gevent_zeromq import zmq
+from gevent.pool import Pool
+from gevent.server import StreamServer
+from gevent.event import Event
+
+class Producer(object):
+ def __init__(self, port, host):
+ self.port = port
+ self.host = host
+ self.blocking = False
+ self.server = StreamServer((self.host, self.port), self.handler, spawn=Pool(1000))
+ self.start_event = Event()
+ self.stop_event = Event()
+
+ def setup_zmq(self):
+ context = zmq.Context()
+ socket = context.socket(zmq.REQ)
+ port = socket.bind_to_random_port("tcp://%s" % self.host)
+ self.zmq_port = port
+ self.zmq_socket = socket
+ self.zmq_context = context
+
+ def start(self, blocking=True):
+ self.blocking = blocking
+ self.setup_zmq()
+ if blocking:
+ self.start_event.set()
+ self.server.serve_forever()
+ self.stop()
+ else:
+ self.server.start()
+ self.start_event.set()
+
+ def stop(self):
+ self.server.stop()
+ self.zmq_socket.close()
+ self.stop_event.set()
+ # let event listeners listening to this event run
+ sleep(0)
+
+ def handler(self, sock, address):
+ print "New connection from %s:%s" % address
+ sockfile = sock.makefile()
+ request = sockfile.readline().strip()
+ if not request:
+ return
+ self.zmq_socket.send(request)
+ response = self.zmq_socket.recv()
+ sockfile.write(response)
+
+
+class SimpleProducer(Producer):
+ def __init__(self, port, incoming, host='127.0.0.1'):
+ self.incoming = incoming
+ super(SimpleProducer, self).__init__()
+
+ def handler(self, sock, address):
+ self.incoming(sock, address)
+
+
View
2  setup.cfg
@@ -1,2 +1,2 @@
-[egg_info]
+[upload_docs]
upload-dir = docs/_build/html
View
2  setup.py
@@ -36,7 +36,7 @@
install_requires=[
# -*- Extra requirements: -*-
'gevent',
- 'gevent_zeromq,
+ 'gevent_zeromq',
],
entry_points="""
# -*- Entry points: -*-
View
59 tests/gaspartest.py
@@ -8,6 +8,7 @@
import os
import time
from threading import Thread
+from gevent import sleep
def check_pid(pid):
""" Check For the existence of a unix pid. """
@@ -18,21 +19,23 @@ def check_pid(pid):
else:
return True
+def noop(message): pass
+
def pause_2(message):
time.sleep(2)
return message
-class GasparTest(TestCase):
+class ForkTest(TestCase):
def setUp(self):
- from gaspar import SimpleProducer, SimpleConsumer
- self.producer = SimpleProducer(port=43215)
- self.consumer = SimpleConsumer(self.producer, pause_2, processes=5)
- self.producer_thread = Thread(target=producer.start)
- self.producer_thread.start()
+ from gaspar import Producer, SimpleConsumer
+ self.producer = Producer(0, '127.0.0.1')
+ self.consumer = SimpleConsumer(self.producer, noop, processes=5)
+ self.producer.start(blocking=False)
+ self.consumer.running.wait()
def tearDown(self):
- self.producer.stop()
- self.producer_thread.join()
+ if not self.producer.stop_event.is_set():
+ self.producer.stop()
def test_forking(self):
self.assertEqual(len(self.consumer.processes), 5)
@@ -42,3 +45,43 @@ def test_forking(self):
self.assertTrue(process.pid)
self.assertTrue(check_pid(process.pid))
+ # make sure that stopping the producers stops the processes
+ self.producer.stop()
+ self.consumer.stopped.wait()
+ for process in self.consumer.processes:
+ self.assertFalse(process.is_alive())
+
+
+class CommunicationsTest(TestCase):
+ def recv(self, message):
+ self.received.append(message)
+ return "Hello, %s" % message
+
+ def connect(self, timeout=0.5, bufsize=1):
+ from gevent import socket
+ client = socket.create_connection(('127.0.0.1', self.producer.server.server_port))
+ fobj = client.makefile(bufsize=bufsize)
+ fobj._sock.settimeout(timeout)
+ return fobj
+
+ def setUp(self):
+ from gaspar import Producer, SimpleConsumer
+ self.received = []
+ self.producer = Producer(0, '127.0.0.1')
+ self.consumer = SimpleConsumer(self.producer, self.recv, processes=2)
+ self.producer.start(blocking=False)
+ self.consumer.running.wait()
+
+ def tearDown(self):
+ if not self.producer.stop_event.is_set():
+ self.producer.stop()
+
+ def test_message_retrieval(self):
+ client = self.connect()
+ client.write("Mesage\r\n")
+ client.flush()
+ result = client.read()
+ print "Received result: %s" % result
+ client.close()
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.