Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import logging.config

from celery import Celery
from flask import Flask
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from flask_socketio import SocketIO

from config import CONFIG

BASE_DIR = os.path.abspath(os.path.dirname(__file__))
Expand All @@ -17,31 +20,56 @@
except OSError:
pass

# load logging config file
# Load logging config file
logging.config.fileConfig('config/logging.conf', disable_existing_loggers=False)
# init file logger
# Init file logger
logger = logging.getLogger('CSSI_REST_API')

db = SQLAlchemy()
ma = Marshmallow()
socketio = SocketIO()
celery = Celery(__name__,
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379'),
backend=os.environ.get('CELERY_BACKEND', 'redis://localhost:6379'))
celery.config_from_object('celeryconfig')

# Import models to register them with SQLAlchemy
from app.models import * # noqa

# Import celery task to register them with Celery workers
from .tasks import run_flask_request # noqa

# Import Socket.IO events to register them with Flask-SocketIO
from . import events # noqa

def create_app(config_name):

def create_app(config_name=None, main=True):
app = Flask(__name__)
CORS(app, support_credentials=True)
app.config.from_object(CONFIG[config_name])

app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # disabling sqlalchemy event system

CORS(app, support_credentials=True) # Add CORS support

CONFIG[config_name].init_app(app)

root = CONFIG[config_name].APPLICATION_ROOT

# flask migrate doesn't recognize the tables without this import
from app.models import Application, Genre, ApplicationType, Session, Questionnaire

# Set up extensions
db.init_app(app)

if main:
# Initialize socketio server and attach it to the message queue.
socketio.init_app(app,
message_queue=app.config['SOCKETIO_MESSAGE_QUEUE'])
else:
# Initialize socketio to emit events through through the message queue.
socketio.init_app(None,
message_queue=app.config['SOCKETIO_MESSAGE_QUEUE'],
async_mode='threading')

celery.conf.update(CONFIG[config_name].CELERY_CONFIG)

# Create app blueprints
from app.routes.v1 import main as main_blueprint
app.register_blueprint(main_blueprint, url_prefix=root + '/')
Expand All @@ -55,4 +83,8 @@ def create_app(config_name):
from app.routes.v1 import questionnaire as questionnaire_blueprint
app.register_blueprint(questionnaire_blueprint, url_prefix=root + '/questionnaires')

# Register async tasks support
from .tasks import tasks as tasks_blueprint
app.register_blueprint(tasks_blueprint, url_prefix='/tasks')

return app
21 changes: 21 additions & 0 deletions app/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from . import socketio, celery

@celery.task
def calculate_latency():
"""Sample celery task that posts a message."""
from .wsgi_aux import app
with app.app_context():
print('hi from celery')


@socketio.on('post_message')
def on_post_message():
"""Sample post message."""
calculate_latency.apply_async()
print('post message')


@socketio.on('disconnect')
def on_disconnect():
"""A Socket.IO client has disconnected."""
print('connection hi disconnected')
56 changes: 56 additions & 0 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from io import BytesIO
from flask import Blueprint, abort, g
from werkzeug.exceptions import InternalServerError
from celery import states

from . import celery
from .utils import url_for

text_types = (str, bytes)
try:
text_types += (unicode,)
except NameError:
# no unicode on Python 3
pass

tasks = Blueprint('tasks', __name__)


@celery.task
def run_flask_request(environ):
from .wsgi_aux import app

if '_wsgi.input' in environ:
environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])

# Create a request context similar to that of the original request
# so that the task can have access to flask.g, flask.request, etc.
with app.request_context(environ):
# Record the fact that we are running in the Celery worker now
g.in_celery = True

# Run the route function and record the response
try:
rv = app.full_dispatch_request()
except:
# If we are in debug mode we want to see the exception
# Else, return a 500 error
if app.debug:
raise
rv = app.make_response(InternalServerError())
return (rv.get_data(), rv.status_code, rv.headers)


@tasks.route('/status/<id>', methods=['GET'])
def get_status(id):
"""
Return status about an asynchronous task. If this request returns a 202
status code, it means that task hasn't finished yet. Else, the response
from the task is returned.
"""
task = run_flask_request.AsyncResult(id)
if task.state == states.PENDING:
abort(404)
if task.state == states.RECEIVED or task.state == states.STARTED:
return '', 202, {'Location': url_for('tasks.get_status', id=id)}
return task.info
24 changes: 24 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import time

from flask import url_for as _url_for, current_app, _request_ctx_stack


def timestamp():
"""Return the current timestamp as an integer."""
return int(time.time())


def url_for(*args, **kwargs):
"""
url_for replacement that works even when there is no request context.
"""
if '_external' not in kwargs:
kwargs['_external'] = False
reqctx = _request_ctx_stack.top
if reqctx is None:
if kwargs['_external']:
raise RuntimeError('Cannot generate external URLs without a '
'request context.')
with current_app.test_request_context():
return _url_for(*args, **kwargs)
return _url_for(*args, **kwargs)
7 changes: 7 additions & 0 deletions app/wsgi_aux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import os

from . import create_app

# Create an application instance that auxiliary processes such as Celery
# workers can use
app = create_app(os.environ.get('CSSI_CONFIG', 'production'), main=False)
7 changes: 7 additions & 0 deletions celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

# global Celery options that apply to all configurations

# enable the pickle serializer
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = ['pickle']
12 changes: 12 additions & 0 deletions config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class Config:

APP_NAME = os.environ.get('APP_NAME') or 'CSSI_REST_API'
APPLICATION_ROOT = os.environ.get('APPLICATION_ROOT') or '/api/v1'
CELERY_CONFIG = {}
SOCKETIO_MESSAGE_QUEUE = os.environ.get(
'SOCKETIO_MESSAGE_QUEUE', os.environ.get('CELERY_BROKER_URL',
'redis://'))

if os.environ.get('SECRET_KEY'):
SECRET_KEY = os.environ.get('SECRET_KEY')
Expand Down Expand Up @@ -93,6 +97,8 @@ class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'sqlite:///' + os.path.join(BASE_DIR, 'cssi-dev.sqlite')
CELERY_BACKEND = os.environ.get('DEV_CELERY_BACKEND') or \
'sqlite:///' + os.path.join(BASE_DIR, 'celery-dev.sqlite')

@classmethod
def init_app(cls, app):
Expand All @@ -114,6 +120,10 @@ class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(BASE_DIR, 'cssi-test.sqlite')
CELERY_BACKEND = os.environ.get('TEST_CELERY_BACKEND') or \
'sqlite:///' + os.path.join(BASE_DIR, 'celery-test.sqlite')
CELERY_CONFIG = {'CELERY_ALWAYS_EAGER': True}
SOCKETIO_MESSAGE_QUEUE = None

@classmethod
def init_app(cls, app):
Expand All @@ -134,6 +144,8 @@ class ProductionConfig(Config):

SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(BASE_DIR, 'cssi.sqlite')
CELERY_BACKEND = os.environ.get('CELERY_BACKEND') or \
'sqlite:///' + os.path.join(BASE_DIR, 'celery.sqlite')
SSL_DISABLE = (os.environ.get('SSL_DISABLE') or 'True') == 'True'

@classmethod
Expand Down
95 changes: 90 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,98 @@
"""

import os
import subprocess
import sys
import eventlet

from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager
from flask_script import Manager, Command, Server as _Server, Option

from app import create_app, db
from app import create_app, db, socketio

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
eventlet.monkey_patch()

app = create_app(os.getenv('CSSI_CONFIG') or 'default')

manager = Manager(app)
migrate = Migrate(app, db)

manager.add_command('db', MigrateCommand)


class Server(_Server):
help = description = 'Runs the Socket.IO web server'

def get_options(self):
options = (
Option('-h', '--host',
dest='host',
default=self.host),

Option('-p', '--port',
dest='port',
type=int,
default=self.port),

Option('-d', '--debug',
action='store_true',
dest='use_debugger',
help=('enable the Werkzeug debugger (DO NOT use in '
'production code)'),
default=self.use_debugger),
Option('-D', '--no-debug',
action='store_false',
dest='use_debugger',
help='disable the Werkzeug debugger',
default=self.use_debugger),

Option('-r', '--reload',
action='store_true',
dest='use_reloader',
help=('monitor Python files for changes (not 100%% safe '
'for production use)'),
default=self.use_reloader),
Option('-R', '--no-reload',
action='store_false',
dest='use_reloader',
help='do not monitor Python files for changes',
default=self.use_reloader),
)
return options

def __call__(self, app, host, port, use_debugger, use_reloader):
# override the default runserver command to start a Socket.IO server
if use_debugger is None:
use_debugger = app.debug
if use_debugger is None:
use_debugger = True
if use_reloader is None:
use_reloader = app.debug
socketio.run(app,
host=host,
port=port,
debug=use_debugger,
use_reloader=use_reloader,
**self.server_options)


manager.add_command("runserver", Server())


class CeleryWorker(Command):
"""Starts the celery worker."""
name = 'celery'
capture_all_args = True

def run(self, argv):
ret = subprocess.call(
['celery', 'worker', '-A', 'app.celery', '--loglevel=info'] + argv)
sys.exit(ret)


manager.add_command("celery", CeleryWorker())


@manager.command
def create_metadata():
"""Create the table metadata.
Expand All @@ -38,6 +117,7 @@ def create_metadata():
Genre.seed()
ApplicationType.seed()


@manager.command
def test():
"""Run the unit tests.
Expand All @@ -51,15 +131,20 @@ def test():


@manager.command
def recreate_db():
def recreate_db(drop_first=False):
"""Recreates a local database

Not safe to use in production.
"""
db.drop_all()
if drop_first:
db.drop_all()
db.create_all()
db.session.commit()


if __name__ == '__main__':
if sys.argv[1] == 'test' or sys.argv[1] == 'lint':
# small hack, to ensure that Flask-Script uses the testing
# configuration if we are going to run the tests
os.environ['CSSI_CONFIG'] = 'testing'
manager.run()
Loading