diff --git a/app/__init__.py b/app/__init__.py index 7fe3a92..a2d78e5 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,10 +1,13 @@ import os import logging.config +from celery import Celery from flask import Flask from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow +from flask_socketio import SocketIO + from config import CONFIG BASE_DIR = os.path.abspath(os.path.dirname(__file__)) @@ -17,31 +20,56 @@ except OSError: pass -# load logging config file +# Load logging config file logging.config.fileConfig('config/logging.conf', disable_existing_loggers=False) -# init file logger +# Init file logger logger = logging.getLogger('CSSI_REST_API') db = SQLAlchemy() ma = Marshmallow() +socketio = SocketIO() +celery = Celery(__name__, + broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379'), + backend=os.environ.get('CELERY_BACKEND', 'redis://localhost:6379')) +celery.config_from_object('celeryconfig') + +# Import models to register them with SQLAlchemy +from app.models import * # noqa + +# Import celery task to register them with Celery workers +from .tasks import run_flask_request # noqa +# Import Socket.IO events to register them with Flask-SocketIO +from . import events # noqa -def create_app(config_name): + +def create_app(config_name=None, main=True): app = Flask(__name__) - CORS(app, support_credentials=True) app.config.from_object(CONFIG[config_name]) + app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # disabling sqlalchemy event system + CORS(app, support_credentials=True) # Add CORS support + CONFIG[config_name].init_app(app) root = CONFIG[config_name].APPLICATION_ROOT - # flask migrate doesn't recognize the tables without this import - from app.models import Application, Genre, ApplicationType, Session, Questionnaire - # Set up extensions db.init_app(app) + if main: + # Initialize socketio server and attach it to the message queue. + socketio.init_app(app, + message_queue=app.config['SOCKETIO_MESSAGE_QUEUE']) + else: + # Initialize socketio to emit events through through the message queue. + socketio.init_app(None, + message_queue=app.config['SOCKETIO_MESSAGE_QUEUE'], + async_mode='threading') + + celery.conf.update(CONFIG[config_name].CELERY_CONFIG) + # Create app blueprints from app.routes.v1 import main as main_blueprint app.register_blueprint(main_blueprint, url_prefix=root + '/') @@ -55,4 +83,8 @@ def create_app(config_name): from app.routes.v1 import questionnaire as questionnaire_blueprint app.register_blueprint(questionnaire_blueprint, url_prefix=root + '/questionnaires') + # Register async tasks support + from .tasks import tasks as tasks_blueprint + app.register_blueprint(tasks_blueprint, url_prefix='/tasks') + return app diff --git a/app/events.py b/app/events.py new file mode 100644 index 0000000..bcf314e --- /dev/null +++ b/app/events.py @@ -0,0 +1,21 @@ +from . import socketio, celery + +@celery.task +def calculate_latency(): + """Sample celery task that posts a message.""" + from .wsgi_aux import app + with app.app_context(): + print('hi from celery') + + +@socketio.on('post_message') +def on_post_message(): + """Sample post message.""" + calculate_latency.apply_async() + print('post message') + + +@socketio.on('disconnect') +def on_disconnect(): + """A Socket.IO client has disconnected.""" + print('connection hi disconnected') diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 0000000..96190f7 --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,56 @@ +from io import BytesIO +from flask import Blueprint, abort, g +from werkzeug.exceptions import InternalServerError +from celery import states + +from . import celery +from .utils import url_for + +text_types = (str, bytes) +try: + text_types += (unicode,) +except NameError: + # no unicode on Python 3 + pass + +tasks = Blueprint('tasks', __name__) + + +@celery.task +def run_flask_request(environ): + from .wsgi_aux import app + + if '_wsgi.input' in environ: + environ['wsgi.input'] = BytesIO(environ['_wsgi.input']) + + # Create a request context similar to that of the original request + # so that the task can have access to flask.g, flask.request, etc. + with app.request_context(environ): + # Record the fact that we are running in the Celery worker now + g.in_celery = True + + # Run the route function and record the response + try: + rv = app.full_dispatch_request() + except: + # If we are in debug mode we want to see the exception + # Else, return a 500 error + if app.debug: + raise + rv = app.make_response(InternalServerError()) + return (rv.get_data(), rv.status_code, rv.headers) + + +@tasks.route('/status/', methods=['GET']) +def get_status(id): + """ + Return status about an asynchronous task. If this request returns a 202 + status code, it means that task hasn't finished yet. Else, the response + from the task is returned. + """ + task = run_flask_request.AsyncResult(id) + if task.state == states.PENDING: + abort(404) + if task.state == states.RECEIVED or task.state == states.STARTED: + return '', 202, {'Location': url_for('tasks.get_status', id=id)} + return task.info diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..d757746 --- /dev/null +++ b/app/utils.py @@ -0,0 +1,24 @@ +import time + +from flask import url_for as _url_for, current_app, _request_ctx_stack + + +def timestamp(): + """Return the current timestamp as an integer.""" + return int(time.time()) + + +def url_for(*args, **kwargs): + """ + url_for replacement that works even when there is no request context. + """ + if '_external' not in kwargs: + kwargs['_external'] = False + reqctx = _request_ctx_stack.top + if reqctx is None: + if kwargs['_external']: + raise RuntimeError('Cannot generate external URLs without a ' + 'request context.') + with current_app.test_request_context(): + return _url_for(*args, **kwargs) + return _url_for(*args, **kwargs) diff --git a/app/wsgi_aux.py b/app/wsgi_aux.py new file mode 100644 index 0000000..b8b0fcb --- /dev/null +++ b/app/wsgi_aux.py @@ -0,0 +1,7 @@ +import os + +from . import create_app + +# Create an application instance that auxiliary processes such as Celery +# workers can use +app = create_app(os.environ.get('CSSI_CONFIG', 'production'), main=False) \ No newline at end of file diff --git a/celeryconfig.py b/celeryconfig.py new file mode 100644 index 0000000..a77fc13 --- /dev/null +++ b/celeryconfig.py @@ -0,0 +1,7 @@ + +# global Celery options that apply to all configurations + +# enable the pickle serializer +task_serializer = 'pickle' +result_serializer = 'pickle' +accept_content = ['pickle'] \ No newline at end of file diff --git a/config/config.py b/config/config.py index dd7f2c2..e5cd895 100644 --- a/config/config.py +++ b/config/config.py @@ -65,6 +65,10 @@ class Config: APP_NAME = os.environ.get('APP_NAME') or 'CSSI_REST_API' APPLICATION_ROOT = os.environ.get('APPLICATION_ROOT') or '/api/v1' + CELERY_CONFIG = {} + SOCKETIO_MESSAGE_QUEUE = os.environ.get( + 'SOCKETIO_MESSAGE_QUEUE', os.environ.get('CELERY_BROKER_URL', + 'redis://')) if os.environ.get('SECRET_KEY'): SECRET_KEY = os.environ.get('SECRET_KEY') @@ -93,6 +97,8 @@ class DevelopmentConfig(Config): DEBUG = True SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \ 'sqlite:///' + os.path.join(BASE_DIR, 'cssi-dev.sqlite') + CELERY_BACKEND = os.environ.get('DEV_CELERY_BACKEND') or \ + 'sqlite:///' + os.path.join(BASE_DIR, 'celery-dev.sqlite') @classmethod def init_app(cls, app): @@ -114,6 +120,10 @@ class TestingConfig(Config): TESTING = True SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \ 'sqlite:///' + os.path.join(BASE_DIR, 'cssi-test.sqlite') + CELERY_BACKEND = os.environ.get('TEST_CELERY_BACKEND') or \ + 'sqlite:///' + os.path.join(BASE_DIR, 'celery-test.sqlite') + CELERY_CONFIG = {'CELERY_ALWAYS_EAGER': True} + SOCKETIO_MESSAGE_QUEUE = None @classmethod def init_app(cls, app): @@ -134,6 +144,8 @@ class ProductionConfig(Config): SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ 'sqlite:///' + os.path.join(BASE_DIR, 'cssi.sqlite') + CELERY_BACKEND = os.environ.get('CELERY_BACKEND') or \ + 'sqlite:///' + os.path.join(BASE_DIR, 'celery.sqlite') SSL_DISABLE = (os.environ.get('SSL_DISABLE') or 'True') == 'True' @classmethod diff --git a/manage.py b/manage.py index e7c3b85..ee12068 100644 --- a/manage.py +++ b/manage.py @@ -13,19 +13,98 @@ """ import os +import subprocess +import sys +import eventlet from flask_migrate import Migrate, MigrateCommand -from flask_script import Manager +from flask_script import Manager, Command, Server as _Server, Option -from app import create_app, db +from app import create_app, db, socketio -app = create_app(os.getenv('FLASK_CONFIG') or 'default') +eventlet.monkey_patch() + +app = create_app(os.getenv('CSSI_CONFIG') or 'default') manager = Manager(app) migrate = Migrate(app, db) manager.add_command('db', MigrateCommand) + +class Server(_Server): + help = description = 'Runs the Socket.IO web server' + + def get_options(self): + options = ( + Option('-h', '--host', + dest='host', + default=self.host), + + Option('-p', '--port', + dest='port', + type=int, + default=self.port), + + Option('-d', '--debug', + action='store_true', + dest='use_debugger', + help=('enable the Werkzeug debugger (DO NOT use in ' + 'production code)'), + default=self.use_debugger), + Option('-D', '--no-debug', + action='store_false', + dest='use_debugger', + help='disable the Werkzeug debugger', + default=self.use_debugger), + + Option('-r', '--reload', + action='store_true', + dest='use_reloader', + help=('monitor Python files for changes (not 100%% safe ' + 'for production use)'), + default=self.use_reloader), + Option('-R', '--no-reload', + action='store_false', + dest='use_reloader', + help='do not monitor Python files for changes', + default=self.use_reloader), + ) + return options + + def __call__(self, app, host, port, use_debugger, use_reloader): + # override the default runserver command to start a Socket.IO server + if use_debugger is None: + use_debugger = app.debug + if use_debugger is None: + use_debugger = True + if use_reloader is None: + use_reloader = app.debug + socketio.run(app, + host=host, + port=port, + debug=use_debugger, + use_reloader=use_reloader, + **self.server_options) + + +manager.add_command("runserver", Server()) + + +class CeleryWorker(Command): + """Starts the celery worker.""" + name = 'celery' + capture_all_args = True + + def run(self, argv): + ret = subprocess.call( + ['celery', 'worker', '-A', 'app.celery', '--loglevel=info'] + argv) + sys.exit(ret) + + +manager.add_command("celery", CeleryWorker()) + + @manager.command def create_metadata(): """Create the table metadata. @@ -38,6 +117,7 @@ def create_metadata(): Genre.seed() ApplicationType.seed() + @manager.command def test(): """Run the unit tests. @@ -51,15 +131,20 @@ def test(): @manager.command -def recreate_db(): +def recreate_db(drop_first=False): """Recreates a local database Not safe to use in production. """ - db.drop_all() + if drop_first: + db.drop_all() db.create_all() db.session.commit() if __name__ == '__main__': + if sys.argv[1] == 'test' or sys.argv[1] == 'lint': + # small hack, to ensure that Flask-Script uses the testing + # configuration if we are going to run the tests + os.environ['CSSI_CONFIG'] = 'testing' manager.run() diff --git a/migrations/versions/64702ea1834d_.py b/migrations/versions/64702ea1834d_.py deleted file mode 100644 index e823621..0000000 --- a/migrations/versions/64702ea1834d_.py +++ /dev/null @@ -1,36 +0,0 @@ -"""empty message - -Revision ID: 64702ea1834d -Revises: 06614ec763fe -Create Date: 2019-04-19 03:56:38.990243 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '64702ea1834d' -down_revision = '06614ec763fe' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_foreign_key('fk_genre_id', 'application', 'genre', ['genre_id'], ['id'], use_alter=True) - op.create_foreign_key('fk_type_id', 'application', 'application_type', ['type_id'], ['id'], use_alter=True) - op.add_column('session', sa.Column('status', sa.String(length=25), nullable=False)) - op.create_foreign_key('fk_app_id', 'session', 'application', ['app_id'], ['id'], use_alter=True) - op.create_foreign_key('fk_questionnaire_id', 'session', 'questionnaire', ['questionnaire_id'], ['id'], use_alter=True) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint('fk_questionnaire_id', 'session', type_='foreignkey') - op.drop_constraint('fk_app_id', 'session', type_='foreignkey') - op.drop_column('session', 'status') - op.drop_constraint('fk_type_id', 'application', type_='foreignkey') - op.drop_constraint('fk_genre_id', 'application', type_='foreignkey') - # ### end Alembic commands ### diff --git a/migrations/versions/06614ec763fe_.py b/migrations/versions/883bbdba07f8_.py similarity index 95% rename from migrations/versions/06614ec763fe_.py rename to migrations/versions/883bbdba07f8_.py index 7935d66..be763e4 100644 --- a/migrations/versions/06614ec763fe_.py +++ b/migrations/versions/883bbdba07f8_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 06614ec763fe +Revision ID: 883bbdba07f8 Revises: -Create Date: 2019-04-19 02:19:26.253782 +Create Date: 2019-04-21 13:31:10.379141 """ from alembic import op @@ -10,7 +10,7 @@ # revision identifiers, used by Alembic. -revision = '06614ec763fe' +revision = '883bbdba07f8' down_revision = None branch_labels = None depends_on = None @@ -54,6 +54,7 @@ def upgrade(): ) op.create_table('session', sa.Column('id', sa.Integer(), nullable=False), + sa.Column('status', sa.String(length=25), nullable=False), sa.Column('app_id', sa.Integer(), nullable=False), sa.Column('creation_date', sa.TIMESTAMP(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.Column('expected_emotions', sa.JSON(), nullable=False), diff --git a/requirements.txt b/requirements.txt index c901e77..5f71db1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +eventlet==0.19.0 flask==0.12.2 flask_script==2.0.6 flask_migrate==2.1.1 @@ -6,4 +7,7 @@ flask_sqlalchemy==2.3.2 flask_marshmallow==0.8.0 marshmallow-sqlalchemy==0.13.2 PyMySQL==0.9.3 -flask-cors \ No newline at end of file +flask-cors +flask-socketio +celery==4.3.0 +redis==3.2.0 \ No newline at end of file