-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
app.py
88 lines (70 loc) · 2.95 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import absolute_import
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import celery
import tornado.web
from tornado import ioloop
from tornado.httpserver import HTTPServer
from .api import control
from .urls import handlers as default_handlers
from .events import Events
from .options import default_options
from tornado.web import url
logger = logging.getLogger(__name__)
def rewrite_handler(handler, url_prefix):
if type(handler) is url:
return url("/{}{}".format(url_prefix.strip("/"), handler.regex.pattern),
handler.handler_class, handler.kwargs, handler.name)
return ("/{}{}".format(url_prefix.strip("/"), handler[0]), handler[1])
class Flower(tornado.web.Application):
pool_executor_cls = ThreadPoolExecutor
max_workers = 4
def __init__(self, options=None, capp=None, events=None,
io_loop=None, **kwargs):
handlers = default_handlers
if options is not None and options.url_prefix:
handlers = [rewrite_handler(h, options.url_prefix) for h in handlers]
kwargs.update(handlers=handlers)
super(Flower, self).__init__(**kwargs)
self.options = options or default_options
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.ssl_options = kwargs.get('ssl_options', None)
self.capp = capp or celery.Celery()
self.events = events or Events(
self.capp, db=self.options.db,
persistent=self.options.persistent,
enable_events=self.options.enable_events,
io_loop=self.io_loop,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
self.started = False
def start(self):
self.pool = self.pool_executor_cls(max_workers=self.max_workers)
self.events.start()
if not self.options.unix_socket:
self.listen(self.options.port, address=self.options.address,
ssl_options=self.ssl_options,
xheaders=self.options.xheaders)
else:
from tornado.netutil import bind_unix_socket
server = HTTPServer(self)
socket = bind_unix_socket(self.options.unix_socket, mode=0o777)
server.add_socket(socket)
self.io_loop.add_future(
control.ControlHandler.update_workers(app=self),
callback=lambda x: logger.debug(
'Successfully updated worker cache'))
self.started = True
self.io_loop.start()
def stop(self):
if self.started:
self.events.stop()
self.pool.shutdown(wait=False)
self.started = False
def delay(self, method, *args, **kwargs):
return self.pool.submit(partial(method, *args, **kwargs))
@property
def transport(self):
return getattr(self.capp.connection().transport,
'driver_type', None)