-
-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathapp.py
89 lines (62 loc) · 2.14 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from celery import Celery, Task
from flask import Flask
from werkzeug.debug import DebuggedApplication
from werkzeug.middleware.proxy_fix import ProxyFix
from hello.extensions import db, debug_toolbar, flask_static_digest
from hello.page.views import page
from hello.up.views import up
def create_celery_app(app=None):
"""
Create a new Celery app and tie together the Celery config to the app's
config. Wrap all tasks in the context of the application.
:param app: Flask app
:return: Celery app
"""
app = app or create_app()
class FlaskTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery = Celery(app.import_name, task_cls=FlaskTask)
celery.conf.update(app.config.get("CELERY_CONFIG", {}))
celery.set_default()
app.extensions["celery"] = celery
return celery
def create_app(settings_override=None):
"""
Create a Flask application using the app factory pattern.
:param settings_override: Override settings
:return: Flask app
"""
app = Flask(__name__, static_folder="../public", static_url_path="")
app.config.from_object("config.settings")
if settings_override:
app.config.update(settings_override)
middleware(app)
app.register_blueprint(up)
app.register_blueprint(page)
extensions(app)
return app
def extensions(app):
"""
Register 0 or more extensions (mutates the app passed in).
:param app: Flask application instance
:return: None
"""
debug_toolbar.init_app(app)
db.init_app(app)
flask_static_digest.init_app(app)
return None
def middleware(app):
"""
Register 0 or more middleware (mutates the app passed in).
:param app: Flask application instance
:return: None
"""
# Enable the Flask interactive debugger in the brower for development.
if app.debug:
app.wsgi_app = DebuggedApplication(app.wsgi_app, evalex=True)
# Set the real IP address into request.remote_addr when behind a proxy.
app.wsgi_app = ProxyFix(app.wsgi_app)
return None
celery_app = create_celery_app()