Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

easy the API.

  • Loading branch information...
commit 10c156d3b8d6d851645230cb10f2e3c3abc9306d 1 parent c078695
benoitc authored
22 README.rst
View
@@ -12,7 +12,7 @@ Ex::
import time
import urllib2
- from pistil.arbiter import Arbiter, child
+ from pistil.arbiter import Arbiter
from pistil.worker import Worker
from pistil.tcp.sync_worker import TcpSyncWorker
from pistil.tcp.arbiter import TcpArbiter
@@ -26,7 +26,7 @@ Ex::
p = HttpStream(SocketReader(sock))
path = p.path()
- data = "welcome world"
+ data = "welcome wold"
sock.send("".join(["HTTP/1.1 200 OK\r\n",
"Content-Type: text/html\r\n",
"Content-Length:" + str(len(data)) + "\r\n",
@@ -45,16 +45,22 @@ Ex::
self.notify
class MyPoolArbiter(TcpArbiter):
- worker = child(MyTcpWorker, 30, "worker", {})
+ def on_init(self, conf):
+ TcpArbiter.on_init(self, conf)
+ # we return a spec
+ return (MyTcpWorker, 30, "worker", {}, "http_welcome",)
- class MyArbiter(Arbiter):
- pool = child(MyPoolArbiter, 30, "supervisor",
- {"num_workers": 3, "address": ("127.0.0.1", 5000)})
- grabber = child(UrlWorker, 30, "worker", {})
if __name__ == '__main__':
- arbiter = MyArbiter("master", local_conf={"num_workers": 3})
+ conf = {"num_workers": 3, "address": ("127.0.0.1", 5000)}
+
+ specs = [
+ (MyPoolArbiter, 30, "supervisor", {}, "tcp_pool"),
+ (UrlWorker, 30, "worker", {}, "grabber")
+ ]
+
+ arbiter = Arbiter(conf, specs)
arbiter.run()
4 examples/hello.py
View
@@ -20,5 +20,7 @@ def run(self):
if __name__ == "__main__":
conf = {"num_workers": 3}
- a = Arbiter(conf, MyWorker)
+
+ specs = [(MyWorker, 30, "worker", {}, "test")]
+ a = Arbiter(conf, specs)
a.run()
18 examples/multiworker.py
View
@@ -4,7 +4,7 @@
# See the NOTICE for more information.
import time
-from pistil.arbiter import Arbiter, child
+from pistil.arbiter import Arbiter
from pistil.worker import Worker
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
@@ -27,13 +27,17 @@ def run(self):
self.notify()
-class MyArbiter(Arbiter):
- worker = child(MyWorker, 30, "worker", {})
- worker2 = child(MyWorker2, 30, "worker", {})
- worker3 = child(MyWorker2, 30, "kill", {})
+if __name__ == '__main__':
+ conf = {}
-if __name__ == '__main__':
- arbiter = MyArbiter("master")
+ specs = [
+ (MyWorker, 30, "worker", {}, "w1"),
+ (MyWorker2, 30, "worker", {}, "w2"),
+ (MyWorker2, 30, "kill", {}, "w3")
+ ]
+
+
+ arbiter = Arbiter(conf, specs)
arbiter.run()
20 examples/multiworker2.py
View
@@ -6,7 +6,7 @@
import time
import urllib2
-from pistil.arbiter import Arbiter, child
+from pistil.arbiter import Arbiter
from pistil.worker import Worker
from pistil.tcp.sync_worker import TcpSyncWorker
from pistil.tcp.arbiter import TcpArbiter
@@ -39,16 +39,22 @@ def run(self):
self.notify
class MyPoolArbiter(TcpArbiter):
- worker = child(MyTcpWorker, 30, "worker", {})
+ def on_init(self, conf):
+ TcpArbiter.on_init(self, conf)
+ # we return a spec
+ return (MyTcpWorker, 30, "worker", {}, "http_welcome",)
-class MyArbiter(Arbiter):
- pool = child(MyPoolArbiter, 30, "supervisor",
- {"num_workers": 3, "address": ("127.0.0.1", 5000)})
- grabber = child(UrlWorker, 30, "worker", {})
if __name__ == '__main__':
- arbiter = MyArbiter("master", local_conf={"num_workers": 3})
+ conf = {"num_workers": 3, "address": ("127.0.0.1", 5000)}
+
+ specs = [
+ (MyPoolArbiter, 30, "supervisor", {}, "tcp_pool"),
+ (UrlWorker, 30, "worker", {}, "grabber")
+ ]
+
+ arbiter = Arbiter(conf, specs)
arbiter.run()
11 examples/serve_file.py
View
@@ -6,6 +6,12 @@
import mimetypes
import os
+
+from gevent import monkey
+monkey.noisy = False
+monkey.patch_all()
+
+
from http_parser.http import HttpStream
from http_parser.reader import SocketReader
@@ -132,8 +138,9 @@ def handle(self, sock, addr):
def main():
conf = {"address": ("127.0.0.1", 5000), "debug": True,
"num_workers": 3}
- print conf
- arbiter = TcpArbiter(conf, HttpWorker)
+ spec = (HttpWorker, 30, "send_file", {}, "worker",)
+
+ arbiter = TcpArbiter(conf, spec)
arbiter.run()
13 examples/tcp_hello.py
View
@@ -3,7 +3,6 @@
# This file is part of pistil released under the MIT license.
# See the NOTICE for more information.
-from pistil.arbiter import child
from pistil.tcp.sync_worker import TcpSyncWorker
from pistil.tcp.arbiter import TcpArbiter
@@ -16,20 +15,18 @@ def handle(self, sock, addr):
p = HttpStream(SocketReader(sock))
path = p.path()
- data = "welcome wold 2"
+ data = "hello world"
sock.send("".join(["HTTP/1.1 200 OK\r\n",
"Content-Type: text/html\r\n",
"Content-Length:" + str(len(data)) + "\r\n",
"Connection: close\r\n\r\n",
data]))
-
-class MyPoolArbiter(TcpArbiter):
- worker = child(MyTcpWorker, 30, "worker", {})
-
-
if __name__ == '__main__':
- arbiter = MyPoolArbiter("master", local_conf={"num_workers": 3})
+ conf = {"num_workers": 3}
+ spec = (MyTcpWorker, 30, "worker", {}, "worker",)
+
+ arbiter = TcpArbiter(conf, spec)
arbiter.run()
187 pistil/arbiter.py
View
@@ -7,7 +7,6 @@
import errno
import logging
-from logging.config import fileConfig
import os
import select
import signal
@@ -16,7 +15,6 @@
import traceback
from pistil.errors import HaltServer
-from pistil.pidfile import Pidfile
from pistil.workertmp import WorkerTmp
from pistil import util
from pistil import __version__, SERVER_SOFTWARE
@@ -30,51 +28,15 @@
}
DEFAULT_CONF = dict(
- name = __name__,
- pidfile = None,
- worker_connections = 1000,
- num_workers = 1,
- max_requests = 0,
- timeout = 30,
uid = os.geteuid(),
gid = os.getegid(),
umask = 0,
debug = False,
- log_config = None,
- log_file = '-',
- log_level= 'info')
+)
RESTART__WORKERS = ("worker", "supervisor")
-def configure_logging(loglevel='info', logfile='-', logconfig=None):
- """\
- Set the log level and choose the destination for log output.
- """
- logger = logging.getLogger(__name__)
-
- fmt = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s"
- datefmt = r"%Y-%m-%d %H:%M:%S"
- if not conf.get('log_config'):
- handlers = []
- if logfile != "-":
- handlers.append(logging.FileHandler(logfile))
- else:
- handlers.append(logging.StreamHandler())
-
- loglevel = LOG_LEVELS[loglevel]
- logger.setLevel(loglevel)
- for h in handlers:
- h.setFormatter(logging.Formatter(fmt, datefmt))
- logger.addHandler(h)
- else:
- if os.path.exists(logconfig):
- fileConfig(logconfig)
- else:
- raise RuntimeError("Error: logfile '%s' not found." %
- logconfig)
-
-
log = logging.getLogger(__name__)
@@ -83,67 +45,17 @@ def configure_logging(loglevel='info', logfile='-', logconfig=None):
-class child(object):
+class Child(object):
- def __init__(self, child_class, timeout=30, child_type="worker",
- args={}, name=None):
+ def __init__(self, child_class, timeout, child_type,
+ args, name):
self.child_class= child_class
self.timeout = timeout
self.child_type = child_type
self.args = args
self.name = name
-
-
- def __get__(self, instance, cls):
- if instance is None:
- return self
- return instance._CHILDREN_SPECS[self.name]
-
-
- def __child_config__(self, cls, name):
- if self.name is None:
- self.name = name
-
- def __set__(self, instance, value):
- instance._CHILDREN_SPECS[self.name] = value
-
- def __property_init__(self, document_instance, value):
- """ method used to set value of the property when
- we create the document. Don't check required. """
- if value is not None:
- value = self.to_json(self.validate(value, required=False))
- document_instance._doc[self.name] = value
-
-
-class MetaArbiter(type):
-
- def __new__(cls, name, bases, attrs):
- # init properties
- children = {}
- defined = set()
- for base in bases:
- if hasattr(base, '_CHILDREN_SPECS'):
- child_names = base._CHILDREN_SPECS.keys()
- duplicate_names = defined.intersection(child_names)
- if duplicate_names:
- raise DuplicatePropertyError(
- 'Duplicate children in base class %s already defined: %s' % (base.__name__, list(duplicate_names)))
- defined.update(duplicate_names)
- children.update(base._CHILDREN_SPECS)
-
-
-
- for attr_name, attr in attrs.items():
- # map properties
- if isinstance(attr, child):
- print attr_name
- if attr_name in defined:
- raise DuplicatePropertyError('Duplicate child: %s' % attr_name)
- children[attr_name] = attr
- attr.__child_config__(cls, attr_name)
- attrs['_CHILDREN_SPECS'] = children
- return type.__new__(cls, name, bases, attrs)
+ print self.name
# chaine init worker:
@@ -157,9 +69,8 @@ class Arbiter(object):
via SIGHUP/USR2.
"""
- __metaclass__ = MetaArbiter
-
- _CHILDREN_SPECS = dict()
+ _SPECS_BYNAME = {}
+ _CHILDREN_SPECS = []
# A flag indicating if a worker failed to
# to boot. If a worker process exist with
@@ -180,38 +91,44 @@ class Arbiter(object):
if name[:3] == "SIG" and name[3] != "_"
)
- def __init__(self, name=None, child_type="supervisor", age=0,
- ppid=0, timeout=30, local_conf={}, global_conf={}):
+ def __init__(self, args, specs=[], name=None,
+ child_type="supervisor", age=0, ppid=0,
+ timeout=30):
+
+ # set conf
+ conf = DEFAULT_CONF.copy()
+ conf.update(args)
+ self.conf = conf
+
- self._name = name
+ specs.extend(self.on_init(conf))
+
+ for spec in specs:
+ c = Child(*spec)
+ self._CHILDREN_SPECS.append(c)
+ self._SPECS_BYNAME[c.name] = c
+
+
+ if name is None:
+ name = self.__class__.__name__
+ self.name = name
self.child_type = child_type
self.age = age
self.ppid = ppid
self.timeout = timeout
- self.local_conf = local_conf
- self.global_conf = global_conf
+
self.pid = None
- self.num_children = len(self._CHILDREN_SPECS.keys())
+ self.num_children = len(self._CHILDREN_SPECS)
self.child_age = 0
self.booted = False
self.stopping = False
- self.debug =self.global_conf.get("debug", False)
- self.tmp = WorkerTmp(self.global_conf)
- self.on_init(self.local_conf, self.global_conf)
+ self.debug =self.conf.get("debug", False)
+ self.tmp = WorkerTmp(self.conf)
+
+ def on_init(self, args):
+ return []
- def on_init(self, local_conf, global_conf):
- pass
-
- def __get_name(self):
- try:
- return self._name
- except AttributeError:
- self._name = self.__class__.__name__.lower()
- return self._name
- def __set_name(self, name):
- self._name = name
- name = util.cached_property(__get_name, __set_name)
def on_init_process(self):
""" method executed when we init a process """
@@ -227,8 +144,8 @@ def init_process(self):
# set current pid
self.pid = os.getpid()
- util.set_owner_process(self.global_conf.get("uid", os.geteuid()),
- self.global_conf.get("gid", os.getegid()))
+ util.set_owner_process(self.conf.get("uid", os.geteuid()),
+ self.conf.get("gid", os.getegid()))
# Reseed the random number generator
util.seed()
@@ -424,8 +341,7 @@ def stop(self, graceful=True):
self.kill_workers(signal.SIGKILL)
self.stopping = False
- def on_reload(self, local_conf, old_local_conf, global_conf,
- old_global_conf):
+ def on_reload(self):
""" method executed on reload """
@@ -434,13 +350,8 @@ def reload(self):
used on HUP
"""
- # keep oldconf
- old_global_conf = self.global_conf.copy()
- old_local_conf = self.local_conf.copy()
-
# exec on reload hook
- self.on_reload(self.local_conf, old_local_conf,
- self.global_conf, old_global_conf)
+ self.on_reload()
OLD__WORKERS = self._WORKERS.copy()
@@ -448,7 +359,7 @@ def reload(self):
to_reload = []
# spawn new workers with new app & conf
- for child_name, child in self._CHILDREN_SPECS.items():
+ for child in self._CHILDREN_SPECS:
if child.child_type != "supervisor":
to_reload.append(child)
@@ -522,7 +433,7 @@ def manage_workers(self):
for pid, (child, state) in self._WORKERS.items():
if not state:
- self.spawn_child(self._CHILDREN_SPECS[child.name])
+ self.spawn_child(self._SPECS_BYNAME[child.name])
def pre_fork(self, worker):
""" methode executed on prefork """
@@ -535,13 +446,17 @@ def spawn_child(self, child_spec):
name = child_spec.name
child_type = child_spec.child_type
- child = child_spec.child_class(name,
- age=self.child_age,
- child_type=child_type,
+ child_args = self.conf
+ child_args.update(child_spec.args)
+
+ # initialize child class
+ child = child_spec.child_class(
+ child_args,
+ name = name,
+ child_type = child_type,
+ age = self.child_age,
ppid = self.pid,
- timeout=child_spec.timeout,
- local_conf=child_spec.args,
- global_conf=self.global_conf)
+ timeout = child_spec.timeout)
self.pre_fork(child)
pid = os.fork()
@@ -580,7 +495,7 @@ def spawn_workers(self):
of the master process.
"""
- for child_name, child in self._CHILDREN_SPECS.items():
+ for child in self._CHILDREN_SPECS:
self.spawn_child(child)
def kill_workers(self, sig):
82 pistil/pool.py
View
@@ -7,17 +7,18 @@
import os
import signal
-from pistil.arbiter import Arbiter
+from pistil.errors import HaltServer
+from pistil.arbiter import Arbiter, Child
+from pistil.workertmp import WorkerTmp
from pistil import util
-class MetaPoolArbiter(type):
-
- def __new__(cls, name, bases, attrs):
- if "worker" in attrs:
- attrs['_CHILDREN_SPECS']['worker'] = attrs.get('worker')
- else:
- raise AttributeError("A worker is required")
- return type.__new__(cls, name, bases, attrs)
+DEFAULT_CONF = dict(
+ uid = os.geteuid(),
+ gid = os.getegid(),
+ umask = 0,
+ debug = False,
+ num_workers = 1,
+)
class PoolArbiter(Arbiter):
@@ -28,13 +29,44 @@ class PoolArbiter(Arbiter):
"HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()
)
- def __init__(self, name=None, child_type="supervisor", age=0,
- ppid=0, timeout=30, local_conf={}, global_conf={}):
- self.num_workers = local_conf.get('num_workers', 1)
- Arbiter.__init__(self, name=name, child_type=child_type,
- age=age, ppid=ppid, timeout=timeout, local_conf=local_conf,
- global_conf=global_conf)
+ def __init__(self, args, spec=(), name=None,
+ child_type="supervisor", age=0, ppid=0,
+ timeout=30):
+
+ if not isinstance(spec, tuple):
+ raise TypeError("spec should be a tuple")
+
+ # set conf
+ conf = DEFAULT_CONF.copy()
+ conf.update(args)
+ self.conf = conf
+
+ # set number of workers
+ self.num_workers = conf.get('num_workers', 1)
+
+ ret = self.on_init(conf)
+ if ret is not None:
+ self._SPEC = Child(*ret)
+ else:
+ self._SPEC = Child(*spec)
+
+ if name is None:
+ name = self.__class__.__name__
+ self.name = name
+ self.child_type = child_type
+ self.age = age
+ self.ppid = ppid
+ self.timeout = timeout
+
+
+ self.pid = None
+ self.child_age = 0
+ self.booted = False
+ self.stopping = False
+ self.debug =self.conf.get("debug", False)
+ self.tmp = WorkerTmp(self.conf)
+
def handle_ttin(self):
"""\
SIGTTIN handling.
@@ -57,20 +89,13 @@ def reload(self):
"""
used on HUP
"""
-
- # keep oldconf
- old_global_conf = self.global_conf.copy()
- old_local_conf = self.local_conf.copy()
-
- # exec on reload hook
- self.on_reload(self.local_conf, old_local_conf,
- self.global_conf, old_global_conf)
+ # exec on reload hook
+ self.on_reload()
# spawn new workers with new app & conf
- child = self._CHILDREN_SPECS["worker"]
- for i in range(self.local_conf.get("num_workers", 1)):
- self.spawn_child(child)
+ for i in range(self.conf.get("num_workers", 1)):
+ self.spawn_child(self._SPEC)
# set new proc_name
util._setproctitle("master [%s]" % self.name)
@@ -112,7 +137,7 @@ def manage_workers(self):
as required.
"""
if len(self._WORKERS.keys()) < self.num_workers:
- self.spawn_children()
+ self.spawn_workers()
workers = self._WORKERS.items()
workers.sort(key=lambda w: w[1][0].age)
@@ -127,9 +152,8 @@ def spawn_workers(self):
This is where a worker process leaves the main loop
of the master process.
"""
- child = self._CHILDREN_SPECS.get('worker')
for i in range(self.num_workers - len(self._WORKERS.keys())):
- self.spawn_child(child)
+ self.spawn_child(self._SPEC)
def kill_worker(self, pid, sig):
20 pistil/tcp/arbiter.py
View
@@ -16,14 +16,14 @@ class TcpArbiter(PoolArbiter):
_LISTENER = None
- def on_init(self, local_conf, global_conf):
- super(TcpArbiter, self).on_init(local_conf, global_conf)
- self.address = local_conf.get('address', ('127.0.0.1', 8000))
+ def on_init(self, args):
+ self.address = args.get('address', ('127.0.0.1', 8000))
if not self._LISTENER:
- self._LISTENER = create_socket(self.local_conf)
+ self._LISTENER = create_socket(args)
# we want to pass the socket to the worker.
- self.worker.args = {"sock": self._LISTENER}
+ self.conf.update({"sock": self._LISTENER})
+ print self.conf
def when_ready(self):
@@ -35,16 +35,6 @@ def on_reexec(self):
# socket after forking a new master.
os.environ['PISTIL_FD'] = str(self._LISTENER.fileno())
- def on_reload(self, local_conf, old_local_conf, global_conf,
- old_global_conf):
- old_address = old_local_conf.get("address", ('127.0.0.1', 8000))
-
- # do we need to change listener ?
- if old_address != local_conf.get("address", ('127.0.0.1', 8000)):
- self._LISTENER.close()
- self._LISTENER = create_socket(conf)
- log.info("Listening at: %s", self._LISTENER)
-
def on_stop(self, graceful=True):
self._LISTENER = None
11 pistil/tcp/gevent_worker.py
View
@@ -14,6 +14,7 @@
except ImportError:
raise RuntimeError("You need gevent installed to use this worker.")
+
from gevent.pool import Pool
from gevent.server import StreamServer
@@ -40,14 +41,8 @@ def handle(self, sock, addr):
class TcpGeventWorker(TcpSyncWorker):
- @classmethod
- def setup(self, server, conf):
- from gevent import monkey
- monkey.noisy = False
- monkey.patch_all()
-
- def on_init(self):
- self.worker_connections = self.conf.get("worker_connections",
+ def on_init(self, conf):
+ self.worker_connections = conf.get("worker_connections",
10000)
self.pool = Pool(self.worker_connections)
2  pistil/tcp/sync_worker.py
View
@@ -20,7 +20,7 @@
class TcpSyncWorker(Worker):
def on_init_process(self):
- self.socket = self.local_conf.get('sock')
+ self.socket = self.conf.get('sock')
self.address = self.socket.getsockname()
util.close_on_exec(self.socket)
38 pistil/worker.py
View
@@ -27,43 +27,31 @@ class Worker(object):
_PIPE = []
- def __init__(self, name=None, age=0, child_type="worker",
- ppid=0, timeout=30, local_conf={}, global_conf={}):
+ def __init__(self, conf, name=None, child_type="worker",
+ age=0, ppid=0, timeout=30):
+
+ if name is None:
+ name = self.__class__.__name__
+ self.name = name
- self._name = name
self.child_type = child_type
self.age = age
self.ppid = ppid
self.timeout = timeout
+ self.conf = conf
- self.local_conf = local_conf
- self.global_conf = global_conf
# initialize
self.booted = False
self.alive = True
- self.debug =self.global_conf.get("debug", False)
- self.tmp = WorkerTmp(self.global_conf)
+ self.debug =self.conf.get("debug", False)
+ self.tmp = WorkerTmp(self.conf)
- self.on_init(self.local_conf, self.global_conf)
+ self.on_init(self.conf)
- def on_init(self, local_conf, global_conf):
+ def on_init(self, conf):
pass
- def __get_name(self):
- try:
- return self._name
- except AttributeError:
- self._name = self.__class__.__name__.lower()
- return self._name
- def __set_name(self, name):
- self._name = name
- name = property(__get_name, __set_name)
-
- def conf(self):
- conf = self.global_conf.copy()
- conf.update(self.local_conf)
- conf = util.cached_property(conf)
def pid(self):
return os.getpid()
@@ -95,8 +83,8 @@ def init_process(self):
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
- util.set_owner_process(self.global_conf.get("uid", os.geteuid()),
- self.global_conf.get("gid", os.getegid()))
+ util.set_owner_process(self.conf.get("uid", os.geteuid()),
+ self.conf.get("gid", os.getegid()))
# Reseed the random number generator
util.seed()
Please sign in to comment.
Something went wrong with that request. Please try again.