From d33fa285f2b1807fe6c8c5a6220e93c6a6bc10d8 Mon Sep 17 00:00:00 2001 From: aquint Date: Wed, 10 Sep 2014 15:43:35 +0200 Subject: [PATCH 01/21] scan_request is a dict now --- brain/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/brain/tasks.py b/brain/tasks.py index f56f172..c62f541 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -209,7 +209,7 @@ def scan(scanid, scan_request): scan.user_id = user.id sql.commit() - for (filename, probelist) in scan_request: + for (filename, probelist) in scan_request.items(): if probelist is None: scan.status = IrmaScanStatus.error_probe_missing sql.commit() From df36a42ba14ab05e9afa6333157aecedf9d4c7c2 Mon Sep 17 00:00:00 2001 From: aquint Date: Wed, 10 Sep 2014 15:50:35 +0200 Subject: [PATCH 02/21] pep8 fixes --- brain/tasks.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/brain/tasks.py b/brain/tasks.py index c62f541..61134bc 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -22,7 +22,7 @@ from datetime import datetime, timedelta from brain.objects import User, Scan from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus -from lib.irma.common.exceptions import IrmaTaskError, IrmaDatabaseError +from lib.irma.common.exceptions import IrmaTaskError from lib.irma.database.sqlhandler import SQLDatabase from lib.irma.ftp.handler import FtpTls @@ -225,7 +225,8 @@ def scan(scanid, scan_request): log.debug("{0}: Unknown probe {1}".format(scanid, p)) return IrmaTaskReturn.error(msg) - # Now, create one subtask per file to scan per probe according to quota + # Now, create one subtask per file to + # scan per probe according to quota for probe in probelist: if quota is not None and quota <= 0: break @@ -233,7 +234,9 @@ def scan(scanid, scan_request): quota -= 1 callback_signature = route( results_app.signature("brain.tasks.scan_result", - (user.ftpuser, scanid, filename, probe))) + (user.ftpuser, + scanid, + filename, probe))) jobs_list.append( probe_app.send_task("probe.tasks.probe_scan", args=(user.ftpuser, scanid, filename), From 2568549c823e0ddc1f3c77c0486bccdb3705ecac Mon Sep 17 00:00:00 2001 From: aquint Date: Wed, 10 Sep 2014 18:13:23 +0200 Subject: [PATCH 03/21] add gdedrie --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index fa46d68..c83cc15 100644 --- a/AUTHORS +++ b/AUTHORS @@ -13,6 +13,7 @@ The PRIMARY AUTHORS are (and/or have been): * Fernand Lone-Sang - QuarksLab * Bruno Dorsemaine - Orange DSI Groupe * David Carle - QuarksLab + * Guillaume Dedrie - QuarksLab And here is an inevitably incomplete list of MUCH-APPRECIATED CONTRIBUTORS -- people who have submitted patches, reported bugs, helped answer newbie questions, From 145389d1895bee844f3cc36e41db4af8ad00e45b Mon Sep 17 00:00:00 2001 From: aquint Date: Thu, 11 Sep 2014 09:53:28 +0200 Subject: [PATCH 04/21] restruct directories --- brain/controllers/__init__.py | 0 brain/controllers/scanctrl.py | 19 ++++ brain/helpers/__init__.py | 0 brain/helpers/sql.py | 44 ++++++++ brain/models/__init__.py | 0 brain/models/sqlobjects.py | 185 ++++++++++++++++++++++++++++++++++ brain/objects.py | 101 ------------------- brain/tasks.py | 92 +++-------------- lib | 2 +- 9 files changed, 263 insertions(+), 180 deletions(-) create mode 100644 brain/controllers/__init__.py create mode 100644 brain/controllers/scanctrl.py create mode 100644 brain/helpers/__init__.py create mode 100644 brain/helpers/sql.py create mode 100644 brain/models/__init__.py create mode 100644 brain/models/sqlobjects.py delete mode 100644 brain/objects.py diff --git a/brain/controllers/__init__.py b/brain/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/brain/controllers/scanctrl.py b/brain/controllers/scanctrl.py new file mode 100644 index 0000000..ba448e3 --- /dev/null +++ b/brain/controllers/scanctrl.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +from datetime import timedelta +from datetime import datetime +from brain.models.sqlobjects import Scan + diff --git a/brain/helpers/__init__.py b/brain/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/brain/helpers/sql.py b/brain/helpers/sql.py new file mode 100644 index 0000000..59e88c5 --- /dev/null +++ b/brain/helpers/sql.py @@ -0,0 +1,44 @@ +import logging +from contextlib import contextmanager +from lib.irma.database.sqlhandler import SQLDatabase +from lib.irma.common.exceptions import IrmaDatabaseError +import config.parser as config + +log = logging.getLogger(__name__) + + +def sql_db_connect(): + try: + engine = config.brain_config['sql_brain'].engine + dbname = config.brain_config['sql_brain'].dbname + SQLDatabase.connect(engine, None, None, None, None, dbname) + except Exception as e: + msg = "SQL: can't connect" + log.debug(msg + " [{0}]".format(e)) + raise IrmaDatabaseError(msg) + + +@contextmanager +def session_transaction(): + """Provide a transactional scope around a series of operations.""" + sql_db_connect() + session = SQLDatabase.get_session() + try: + yield session + session.commit() + except IrmaDatabaseError: + session.rollback() + raise + finally: + session.close() + + +@contextmanager +def session_query(): + """Provide a transactional scope around a series of operations.""" + sql_db_connect() + session = SQLDatabase.get_session() + try: + yield session + except IrmaDatabaseError: + raise diff --git a/brain/models/__init__.py b/brain/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/brain/models/sqlobjects.py b/brain/models/sqlobjects.py new file mode 100644 index 0000000..b1679bc --- /dev/null +++ b/brain/models/sqlobjects.py @@ -0,0 +1,185 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +import os +from brain.helpers.sql import sql_db_connect +from sqlalchemy import Column, Integer, String, \ + event, ForeignKey +import config.parser as config +from sqlalchemy.engine import Engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship, backref +from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound +from lib.irma.common.exceptions import IrmaDatabaseError, \ + IrmaDatabaseResultNotFound +from lib.irma.common.utils import IrmaScanStatus +from lib.common.compat import timestamp +from lib.irma.database.sqlobjects import SQLDatabaseObject + + +# SQLite fix for ForeignKey support +# see http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + +# Auto-create directory for sqlite db +db_name = config.brain_config['sql_brain'].dbname +dirname = os.path.dirname(db_name) +dirname = os.path.abspath(dirname) +if not os.path.exists(dirname): + print("SQL directory does not exist {0}" + "..creating".format(dirname)) + os.makedirs(dirname) + os.chmod(dirname, 0777) +elif not (os.path.isdir(dirname)): + print("Error. SQL directory is a not a dir {0}" + "".format(dirname)) + raise IrmaDatabaseError("Can not create Frontend database dir") + +if not os.path.exists(db_name): + # touch like method to create a rw-rw-rw- file for db + open(db_name, 'a').close() + os.chmod(db_name, 0666) + + +sql_db_connect() +Base = declarative_base() + + +class Scan(Base, SQLDatabaseObject): + __tablename__ = 'Scan' + # Fields + id = Column( + Integer, + autoincrement=True, + nullable=False, + primary_key=True, + name='id' + ) + timestamp = Column( + String, + nullable=False, + name='timestamp' + ) + scanid = Column( + String, + index=True, + nullable=False, + name='scanid' + ) + nbfiles = Column( + Integer, + name='nbfiles' + ) + # Many to one Scan <-> User + user_id = Column( + Integer, + ForeignKey('user.id'), + nullable=False, + name="user_id" + ) + user = relationship("User") + + def __repr__(self): + str_repr = ( + "Scan {0}:".format(self.scanid) + + "\tdate: {0}".format(self.timestamp) + + "\t{0} file(s)".format(self.nbfiles) + + "\tstatus: '{0}'".format(IrmaScanStatus.label[self.status]) + + "\tuser_id: {0}\n".format(self.user_id)) + return str_repr + + @staticmethod + def get_scan(scanid, user_id, session): + try: + return session.query(User).filter( + Scan.scanid == scanid and Scan.user_id == user_id + ).one() + except NoResultFound as e: + raise IrmaDatabaseResultNotFound(e) + except MultipleResultsFound as e: + raise IrmaDatabaseError(e) + + +class User(Base, SQLDatabaseObject): + __tablename__ = 'User' + + # Fields + id = Column( + Integer, + autoincrement=True, + nullable=False, + primary_key=True, + name='id' + ) + name = Column( + String, + nullable=False, + name='name' + ) + rmqvhost = Column( + String, + nullable=False, + name='rmqvhost' + ) + ftpuser = Column( + String, + nullable=False, + name='ftpuser' + ) + quota = Column( + Integer, + name='quota' + ) + scans = relationship("Scan", backref="user") + + def __repr__(self): + str_repr = ( + "User {0}:".format(self.name) + + "\trmq_vhost: '{0}'".format(self.rmqvhost) + + "\t ftpuser: '{0}'".format(self.ftpuser) + + "\tquota: '{0}'\n".format(self.quota)) + return str_repr + + @staticmethod + def get_by_rmqvhost(rmqvhost, session): + # FIXME: get rmq_vhost dynamically + if rmqvhost is None: + rmqvhost = config.brain_config['broker_frontend'].vhost + try: + return session.query(User).filter( + User.rmq_vhost == rmqvhost + ).one() + except NoResultFound as e: + raise IrmaDatabaseResultNotFound(e) + except MultipleResultsFound as e: + raise IrmaDatabaseError(e) + + def remaining_quota(self, session): + if self.quota == 0: + # quota=0 means quota disabled + quota = None + else: + # quota are per 24h + min_date = timestamp() - 24 * 60 * 60 + files_consumed = session.query(Scan.nb_files).filter( + Scan.date >= min_date and Scan.user_id == self.id + ).all() + # Quota are set per 24 hours + quota = self.quota - sum(files_consumed) + return quota diff --git a/brain/objects.py b/brain/objects.py deleted file mode 100644 index 7d12220..0000000 --- a/brain/objects.py +++ /dev/null @@ -1,101 +0,0 @@ -# -# Copyright (c) 2013-2014 QuarksLab. -# This file is part of IRMA project. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License in the top-level directory -# of this distribution and at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# No part of the project, including this file, may be copied, -# modified, propagated, or distributed except according to the -# terms contained in the LICENSE file. - -from lib.irma.database.sqlhandler import SQLDatabase -from lib.irma.database.sqlobjects import Base, Column, \ - Integer, String, DateTime -from sqlalchemy import ForeignKey -from sqlalchemy.orm import relationship - - -class Scan(Base): - __tablename__ = 'scans' - id = Column(Integer, primary_key=True) - date = Column(DateTime(timezone=True)) - scanid = Column(String) - status = Column(Integer) - nbfiles = Column(Integer) - taskid = Column(String) - user_id = Column(Integer, ForeignKey('users.id')) - - def __repr__(self): - str_repr = ( - "Scan {0}:".format(self.scanid) + - "\t{0} file(s)".format(self.nbfiles) + - "\t status: '{0}'".format(self.label[self.status]) + - "\ttaskid: '{0}'".format(self.taskid) + - "\tuser_id: {0}\n".format(self.user_id)) - return str_repr - - -class User(Base): - __tablename__ = 'users' - id = Column(Integer, primary_key=True) - name = Column(String) - rmqvhost = Column(String) - ftpuser = Column(String) - quota = Column(Integer) - scan = relationship("Scan") - - def __repr__(self): - str_repr = ( - "User {0}:".format(self.name) + - "\trmq_vhost: '{0}'".format(self.rmqvhost) + - "\t ftpuser: '{0}'".format(self.ftpuser) + - "\tquota: '{0}'\n".format(self.quota)) - return str_repr - - -if __name__ == "__main__": - # create all dbs - import config.parser as config - import sys - import os - - if len(sys.argv) not in (4, 5): - print("usage: {0} [quota]\n" - " with a string\n" - " the rmqvhost used for the frontend\n" - " the ftpuser used by the frontend\n" - " [quota] the number of file scan quota\n" - "example: {0} test1 mqfrontend frontend" - "".format(sys.argv[0])) - sys.exit(1) - - dirname = os.path.dirname(config.brain_config['sql_brain'].dbname) - dirname = os.path.abspath(dirname) - if not os.path.exists(dirname): - print("SQL directory does not exist {0}" - "..creating".format(dirname)) - os.makedirs(dirname) - if not (os.path.isdir(dirname)): - print("Error. SQL directory is a not a dir {0}" - "".format(dirname)) - sys.exit(1) - - # quota is in number of files (0 means disabled) - quota = int(sys.argv[4]) if len(sys.argv) == 5 else 0 - - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - sql = SQLDatabase(engine + dbname) - metadata = Base.metadata - metadata.create_all(sql._db) - user = User(name=sys.argv[1], - rmqvhost=sys.argv[2], - ftpuser=sys.argv[3], - quota=quota) - sql.add(user) - sql.commit() diff --git a/brain/tasks.py b/brain/tasks.py index 61134bc..343734a 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -19,8 +19,7 @@ from celery import Celery from celery.utils.log import get_task_logger -from datetime import datetime, timedelta -from brain.objects import User, Scan +from brain.models.sqlobjects import User, Scan from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus from lib.irma.common.exceptions import IrmaTaskError from lib.irma.database.sqlhandler import SQLDatabase @@ -51,66 +50,6 @@ config.configure_syslog(frontend_app) -# ============= -# SQL Helpers -# ============= - -def get_quota(sql, user): - if user.quota == 0: - # quota=0 means quota disabled - quota = None - else: - # Quota are set per 24 hours - delta = timedelta(hours=24) - what = ("user_id={0} ".format(user.id) + - "and date >= '{0}'".format(datetime.now() - delta)) - quota = user.quota - sql.sum(Scan.nbfiles, what) - return quota - - -def get_groupresult(taskid): - if not taskid: - msg = "SQL: task_id not set" - log.debug(msg) - raise IrmaTaskError() - gr = probe_app.GroupResult.restore(taskid) - if not gr: - msg = "SQL: taskid not valid" - log.debug(msg + " [{0}]".format(taskid)) - raise IrmaTaskError(msg) - return gr - - -def sql_connect(engine, dbname): - try: - return SQLDatabase(engine + dbname) - except Exception as e: - msg = "SQL: can't connect" - log.debug(msg + " [{0}]".format(e)) - raise IrmaTaskError(msg) - - -def sql_get_user(sql, rmqvhost=None): - # FIXME: get rmq_vhost dynamically - try: - if rmqvhost is None: - rmqvhost = config.brain_config['broker_frontend'].vhost - return sql.one_by(User, rmqvhost=rmqvhost) - except Exception as e: - msg = "SQL: user not found" - log.debug(msg + " [{0}]".format(e)) - raise IrmaTaskError(msg) - - -def sql_get_scan(sql, scanid, user): - try: - return sql.one_by(Scan, scanid=scanid, user_id=user.id) - except Exception as e: - msg = "SQL: scan id not found" - log.debug(msg + " [{0}]".format(e)) - raise IrmaTaskError(msg) - - # ================ # Celery Helpers # ================ @@ -232,8 +171,13 @@ def scan(scanid, scan_request): break if quota: quota -= 1 - callback_signature = route( - results_app.signature("brain.tasks.scan_result", + hook_success = route( + results_app.signature("brain.tasks.scan_success", + (user.ftpuser, + scanid, + filename, probe))) + hook_error = route( + results_app.signature("brain.tasks.scan_error", (user.ftpuser, scanid, filename, probe))) @@ -241,20 +185,12 @@ def scan(scanid, scan_request): probe_app.send_task("probe.tasks.probe_scan", args=(user.ftpuser, scanid, filename), queue=probe, - link=callback_signature)) - - if len(jobs_list) != 0: - # Build a result set with all job AsyncResult - # for progress/cancel operations - groupid = str(uuid.uuid4()) - groupres = probe_app.GroupResult(id=groupid, results=jobs_list) - # keep the groupresult object for task status/cancel - groupres.save() - - scan.taskid = groupid - scan.nbfiles = len(jobs_list) - scan.status = IrmaScanStatus.launched - sql.commit() + link=hook_success, + link_error=hook_error)) + + scan.nbfiles = len(jobs_list) + scan.status = IrmaScanStatus.launched + sql.commit() log.debug( "{0}: ".format(scanid) + diff --git a/lib b/lib index e22dc2b..0c3e2a3 160000 --- a/lib +++ b/lib @@ -1 +1 @@ -Subproject commit e22dc2bcbf5163c20590370d497d88fd5d77460b +Subproject commit 0c3e2a3dcadb122b1e2c5a5cc8d61a102c5b2992 From 817efdcb110b6b0a1ae5455c875f083480306fe8 Mon Sep 17 00:00:00 2001 From: aquint Date: Fri, 12 Sep 2014 20:00:48 +0200 Subject: [PATCH 05/21] more sql engine support --- brain/helpers/sql.py | 9 ++++++--- config/brain.ini | 11 ++++++++--- config/parser.py | 43 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/brain/helpers/sql.py b/brain/helpers/sql.py index 59e88c5..02e2062 100644 --- a/brain/helpers/sql.py +++ b/brain/helpers/sql.py @@ -8,10 +8,13 @@ def sql_db_connect(): + """Connection to DB + """ try: - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - SQLDatabase.connect(engine, None, None, None, None, dbname) + uri_params = config.get_sql_db_uri_params() + # TODO args* style argument + SQLDatabase.connect(uri_params[0], uri_params[1], uri_params[2], + uri_params[3], uri_params[4], uri_params[5]) except Exception as e: msg = "SQL: can't connect" log.debug(msg + " [{0}]".format(e)) diff --git a/config/brain.ini b/config/brain.ini index c740dde..bc15a6f 100644 --- a/config/brain.ini +++ b/config/brain.ini @@ -27,9 +27,14 @@ db = 0 host = 127.0.0.1 db = 1 -[sql_brain] -engine = sqlite:/// -dbname = db/irma_scan.db +[sqldb] +dbms = sqlite +dialect = +username = +passwd = +host = +dbname = db/test +tables_prefix = irma [ftp_brain] host = 127.0.0.1 diff --git a/config/parser.py b/config/parser.py index 8762606..26eb648 100644 --- a/config/parser.py +++ b/config/parser.py @@ -66,10 +66,15 @@ ('port', TemplatedConfiguration.integer, 6379), ('db', TemplatedConfiguration.integer, None), ], - 'sql_brain': [ - ('engine', TemplatedConfiguration.string, None), + 'sqldb': [ + ('dbms', TemplatedConfiguration.string, None), + ('dialect', TemplatedConfiguration.string, None), + ('username', TemplatedConfiguration.string, None), + ('passwd', TemplatedConfiguration.string, None), + ('host', TemplatedConfiguration.string, None), ('dbname', TemplatedConfiguration.string, None), - ], + ('tables_prefix', TemplatedConfiguration.string, None), + ], 'ftp_brain': [ ('host', TemplatedConfiguration.string, None), ('port', TemplatedConfiguration.integer, 21), @@ -78,8 +83,13 @@ ] } -cwd = os.path.abspath(os.path.dirname(__file__)) -cfg_file = "{0}/{1}".format(cwd, "brain.ini") +config_path = os.environ.get('IRMA_BRAIN_CFG_PATH') +if config_path is None: + # Fallback to default path that is + # current working directory + config_path = os.path.abspath(os.path.dirname(__file__)) + +cfg_file = "{0}/{1}".format(config_path, "brain.ini") brain_config = TemplatedConfiguration(cfg_file, template_brain_config) @@ -176,6 +186,10 @@ def get_frontend_broker_uri(): return _get_broker_uri(brain_config.broker_frontend) +def get_frontend_rmqvhost(): + return brain_config.broker_frontend.vhost + + # ================ # Syslog helpers # ================ @@ -200,3 +214,22 @@ def setup_log(**args): hl.setFormatter(formatter) # add new handler to logger args['logger'].addHandler(hl) + + +# ================== +# Database helpers +# ================== + +def get_sql_db_uri_params(): + return ( + brain_config.sqldb.dbms, + brain_config.sqldb.dialect, + brain_config.sqldb.username, + brain_config.sqldb.passwd, + brain_config.sqldb.host, + brain_config.sqldb.dbname, + ) + + +def get_sql_db_tables_prefix(): + return brain_config.sqldb.tables_prefix From 2dedb2dabe373e4008dbbc6a813a57dc661d4b6f Mon Sep 17 00:00:00 2001 From: aquint Date: Fri, 12 Sep 2014 20:01:12 +0200 Subject: [PATCH 06/21] split code into ctrller/model --- brain/controllers/scanctrl.py | 147 ++++++++++++- brain/models/sqlobjects.py | 244 ++++++++++++++++------ brain/tasks.py | 379 +++++++++++----------------------- 3 files changed, 446 insertions(+), 324 deletions(-) diff --git a/brain/controllers/scanctrl.py b/brain/controllers/scanctrl.py index ba448e3..ef1bb88 100644 --- a/brain/controllers/scanctrl.py +++ b/brain/controllers/scanctrl.py @@ -13,7 +13,148 @@ # modified, propagated, or distributed except according to the # terms contained in the LICENSE file. -from datetime import timedelta -from datetime import datetime -from brain.models.sqlobjects import Scan +from brain.models.sqlobjects import Scan, Job +from brain.helpers.sql import session_query, session_transaction +from lib.irma.common.utils import IrmaScanStatus +from lib.irma.common.exceptions import IrmaValueError +from lib.common.compat import timestamp + +def new(frontend_scan_id, user_id, nb_files): + with session_transaction() as session: + scan = Scan(frontend_scan_id, user_id, nb_files) + scan.save(session) + session.commit() + return scan.id + + +def get_scan_id(frontend_scan_id, user_id): + with session_query() as session: + scan = Scan.get_scan(frontend_scan_id, user_id, session) + return scan.id + + +def get_nbjobs(scan_id): + with session_query() as session: + scan = Scan.load(scan_id, session) + return scan.nb_jobs + + +def get_job_info(job_id): + with session_query() as session: + job = Job.load(job_id, session) + frontend_scan_id = job.scan.scan_id + filename = job.filename + probe = job.probename + return (frontend_scan_id, filename, probe) + + +def _set_status(scan_id, code): + with session_transaction() as session: + scan = Scan.load(scan_id, session) + scan.status = code + session.commit() + + +def cancelling(scan_id): + _set_status(scan_id, IrmaScanStatus.cancelling) + + +def cancelled(scan_id): + _set_status(scan_id, IrmaScanStatus.cancelled) + + +def launched(scan_id): + _set_status(scan_id, IrmaScanStatus.launched) + + +def progress(scan_id): + with session_query() as session: + scan = Scan.load(scan_id, session) + if IrmaScanStatus.is_error(scan.status): + status_str = IrmaScanStatus.label[scan.status] + raise IrmaValueError(status_str) + status = IrmaScanStatus.label[scan.status] + progress_details = None + if scan.status == IrmaScanStatus.launched: + (total, finished, success) = scan.progress() + progress_details = {} + progress_details['total'] = total + progress_details['finished'] = finished + progress_details['successful'] = success + return (status, progress_details) + + +def get_pending_jobs(scan_id): + with session_query() as session: + scan = Scan.load(scan_id, session) + return scan.get_pending_jobs_taskid() + + +def check_finished(scan_id): + with session_transaction() as session: + scan = Scan.load(scan_id, session) + if scan.status == IrmaScanStatus.processed: + return True + if scan.finished(): + scan.status = IrmaScanStatus.processed + return True + return False + + +def flush(scan_id): + with session_transaction() as session: + scan = Scan.load(scan_id, session) + if scan.status == IrmaScanStatus.flushed: + return + for job in scan.jobs: + session.delete(job) + scan.status = IrmaScanStatus.flushed + + +def error(scan_id, code): + with session_transaction() as session: + scan = Scan.load(scan_id, session) + scan.status = code + +# ========== +# Job ctrl +# ========== + + +def job_new(scan_id, filename, probe): + with session_transaction() as session: + j = Job(filename, probe, scan_id) + j.save(session) + session.commit() + return j.id + + +def _job_finish(job_id, status): + with session_transaction() as session: + job = Job.load(job_id, session) + job.status = status + job.ts_end = timestamp() + job.update(['status', 'ts_end'], session) + scan_id = job.scan.id + check_finished(scan_id) + + +def job_success(job_id): + _job_finish(job_id, Job.success) + + +def job_error(job_id): + _job_finish(job_id, Job.error) + + +def job_set_taskid(job_id, task_id): + with session_transaction() as session: + job = Job.load(job_id, session) + job.task_id = task_id + + +def job_info(job_id): + with session_transaction() as session: + job = Job.load(job_id, session) + return (job.scan.scan_id, job.filename, job.probename) diff --git a/brain/models/sqlobjects.py b/brain/models/sqlobjects.py index b1679bc..38ad8c6 100644 --- a/brain/models/sqlobjects.py +++ b/brain/models/sqlobjects.py @@ -20,49 +20,56 @@ import config.parser as config from sqlalchemy.engine import Engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, backref +from sqlalchemy.orm import relationship from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound from lib.irma.common.exceptions import IrmaDatabaseError, \ IrmaDatabaseResultNotFound from lib.irma.common.utils import IrmaScanStatus from lib.common.compat import timestamp +from lib.irma.database.sqlhandler import SQLDatabase from lib.irma.database.sqlobjects import SQLDatabaseObject # SQLite fix for ForeignKey support # see http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html -@event.listens_for(Engine, "connect") -def set_sqlite_pragma(dbapi_connection, connection_record): - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA foreign_keys=ON") - cursor.close() - -# Auto-create directory for sqlite db -db_name = config.brain_config['sql_brain'].dbname -dirname = os.path.dirname(db_name) -dirname = os.path.abspath(dirname) -if not os.path.exists(dirname): - print("SQL directory does not exist {0}" - "..creating".format(dirname)) - os.makedirs(dirname) - os.chmod(dirname, 0777) -elif not (os.path.isdir(dirname)): - print("Error. SQL directory is a not a dir {0}" - "".format(dirname)) - raise IrmaDatabaseError("Can not create Frontend database dir") - -if not os.path.exists(db_name): - # touch like method to create a rw-rw-rw- file for db - open(db_name, 'a').close() - os.chmod(db_name, 0666) +if config.get_sql_db_uri_params()[0] == 'sqlite': + @event.listens_for(Engine, "connect") + def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + # Auto-create directory for sqlite db + db_name = os.path.abspath(config.get_sql_db_uri_params()[5]) + dirname = os.path.dirname(db_name) + if not os.path.exists(dirname): + print("SQL directory does not exist {0}" + "..creating".format(dirname)) + os.makedirs(dirname) + os.chmod(dirname, 0777) + elif not (os.path.isdir(dirname)): + print("Error. SQL directory is a not a dir {0}" + "".format(dirname)) + raise IrmaDatabaseError("Can not create Frontend database dir") + + if not os.path.exists(db_name): + # touch like method to create a rw-rw-rw- file for db + open(db_name, 'a').close() + os.chmod(db_name, 0666) sql_db_connect() Base = declarative_base() +tables_prefix = '{0}_'.format(config.get_sql_db_tables_prefix()) class Scan(Base, SQLDatabaseObject): - __tablename__ = 'Scan' + __tablename__ = '{0}scan'.format(tables_prefix) + # SQLite fix for auto increment on ids + # see http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html + if config.get_sql_db_uri_params()[0] == 'sqlite': + __table_args__ = {'sqlite_autoincrement': True} + # Fields id = Column( Integer, @@ -71,53 +78,89 @@ class Scan(Base, SQLDatabaseObject): primary_key=True, name='id' ) - timestamp = Column( + scan_id = Column( String, + index=True, nullable=False, - name='timestamp' + name='scan_id' ) - scanid = Column( - String, - index=True, + status = Column( + Integer, + nullable=False, + name='status' + ) + timestamp = Column( + Integer, nullable=False, - name='scanid' + name='timestamp' ) - nbfiles = Column( + nb_files = Column( Integer, - name='nbfiles' + nullable=False, + name='nb_files' ) # Many to one Scan <-> User user_id = Column( Integer, - ForeignKey('user.id'), + ForeignKey('{0}user.id'.format(tables_prefix)), + index=True, nullable=False, - name="user_id" ) - user = relationship("User") + jobs = relationship("Job", backref="scan") - def __repr__(self): - str_repr = ( - "Scan {0}:".format(self.scanid) + - "\tdate: {0}".format(self.timestamp) + - "\t{0} file(s)".format(self.nbfiles) + - "\tstatus: '{0}'".format(IrmaScanStatus.label[self.status]) + - "\tuser_id: {0}\n".format(self.user_id)) - return str_repr + def __init__(self, frontend_scanid, user_id, nb_files): + self.scan_id = frontend_scanid + self.status = IrmaScanStatus.empty + self.timestamp = timestamp() + self.nb_files = nb_files + self.user_id = user_id @staticmethod - def get_scan(scanid, user_id, session): + def get_scan(scan_id, user_id, session): try: - return session.query(User).filter( - Scan.scanid == scanid and Scan.user_id == user_id + return session.query(Scan).filter( + Scan.scan_id == scan_id and Scan.user_id == user_id ).one() except NoResultFound as e: raise IrmaDatabaseResultNotFound(e) except MultipleResultsFound as e: raise IrmaDatabaseError(e) + @property + def nb_jobs(self): + return len(self.jobs) + + def progress(self): + finished = success = total = 0 + for job in self.jobs: + total += 1 + if job.finished(): + finished += 1 + if job.status == Job.success: + success += 1 + return (total, finished, success) + + def get_pending_jobs_taskid(self): + pending_jobs_taskid = [] + for job in self.jobs: + if not job.finished(): + pending_jobs_taskid.append(job.task_id) + return pending_jobs_taskid + + def finished(self): + for job in self.jobs: + if not job.finished(): + return False + return True + class User(Base, SQLDatabaseObject): - __tablename__ = 'User' + __tablename__ = '{0}user'.format(tables_prefix) + + # SQLite fix for auto increment on ids + # see http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html + if config.get_sql_db_uri_params()[0] == 'sqlite': + __table_args__ = {'sqlite_autoincrement': True} # Fields id = Column( @@ -134,6 +177,7 @@ class User(Base, SQLDatabaseObject): ) rmqvhost = Column( String, + index=True, nullable=False, name='rmqvhost' ) @@ -148,13 +192,11 @@ class User(Base, SQLDatabaseObject): ) scans = relationship("Scan", backref="user") - def __repr__(self): - str_repr = ( - "User {0}:".format(self.name) + - "\trmq_vhost: '{0}'".format(self.rmqvhost) + - "\t ftpuser: '{0}'".format(self.ftpuser) + - "\tquota: '{0}'\n".format(self.quota)) - return str_repr + def __init__(self, name, rmqvhost, ftpuser, quota): + self.name = name + self.rmqvhost = rmqvhost + self.ftpuser = ftpuser + self.quota = quota @staticmethod def get_by_rmqvhost(rmqvhost, session): @@ -163,7 +205,7 @@ def get_by_rmqvhost(rmqvhost, session): rmqvhost = config.brain_config['broker_frontend'].vhost try: return session.query(User).filter( - User.rmq_vhost == rmqvhost + User.rmqvhost == rmqvhost ).one() except NoResultFound as e: raise IrmaDatabaseResultNotFound(e) @@ -173,13 +215,87 @@ def get_by_rmqvhost(rmqvhost, session): def remaining_quota(self, session): if self.quota == 0: # quota=0 means quota disabled - quota = None + remaining = None else: # quota are per 24h - min_date = timestamp() - 24 * 60 * 60 - files_consumed = session.query(Scan.nb_files).filter( - Scan.date >= min_date and Scan.user_id == self.id - ).all() + min_ts = timestamp() - 24 * 60 * 60 + scan_list = session.query(Scan).filter( + Scan.user_id == self.id).filter( + Scan.timestamp >= min_ts).all() + consumed = 0 + for scan in scan_list: + consumed += scan.nb_jobs # Quota are set per 24 hours - quota = self.quota - sum(files_consumed) - return quota + remaining = self.quota - consumed + return remaining + + +class Job(Base, SQLDatabaseObject): + __tablename__ = '{0}job'.format(tables_prefix) + success = 1 + running = 0 + error = -1 + + # SQLite fix for auto increment on ids + # see http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html + if config.get_sql_db_uri_params()[0] == 'sqlite': + __table_args__ = {'sqlite_autoincrement': True} + + # Fields + id = Column( + Integer, + autoincrement=True, + nullable=False, + primary_key=True, + name='id' + ) + filename = Column( + String, + nullable=False, + index=True, + name='filename' + ) + probename = Column( + String, + nullable=False, + index=True, + name='probename' + ) + status = Column( + Integer, + nullable=False, + name='status' + ) + ts_start = Column( + Integer, + nullable=False, + name='ts_start' + ) + ts_end = Column( + Integer, + name='ts_end' + ) + task_id = Column( + String, + name="task_id" + ) + # Many to one Job <-> Scan + scan_id = Column( + Integer, + ForeignKey('{0}scan.id'.format(tables_prefix)), + index=True, + nullable=False, + ) + + def __init__(self, filename, probename, scanid): + self.filename = filename + self.probename = probename + self.ts_start = timestamp() + self.status = self.running + self.scan_id = scanid + + def finished(self): + return self.status != self.running + + +Base.metadata.create_all(SQLDatabase.get_engine()) diff --git a/brain/tasks.py b/brain/tasks.py index 343734a..138b128 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -13,98 +13,29 @@ # modified, propagated, or distributed except according to the # terms contained in the LICENSE file. -import uuid -import time import config.parser as config from celery import Celery from celery.utils.log import get_task_logger -from brain.models.sqlobjects import User, Scan +import brain.controllers.userctrl as user_ctrl +import brain.controllers.scanctrl as scan_ctrl +import brain.controllers.probetasks as celery_probe +import brain.controllers.frontendtasks as celery_frontend +import brain.controllers.ftpctrl as ftp_ctrl from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus from lib.irma.common.exceptions import IrmaTaskError -from lib.irma.database.sqlhandler import SQLDatabase -from lib.irma.ftp.handler import FtpTls # Get celery's logger log = get_task_logger(__name__) -# Time to cache the probe list -# to avoid asking to rabbitmq -PROBELIST_CACHE_TIME = 60 -cache_probelist = {'list': None, 'time': None} - scan_app = Celery('scantasks') config.conf_brain_celery(scan_app) config.configure_syslog(scan_app) -probe_app = Celery('probetasks') -config.conf_probe_celery(probe_app) -config.configure_syslog(probe_app) - -results_app = Celery('restasks') +results_app = Celery('resultstasks') config.conf_results_celery(results_app) config.configure_syslog(results_app) -frontend_app = Celery('frontendtasks') -config.conf_frontend_celery(frontend_app) -config.configure_syslog(frontend_app) - - -# ================ -# Celery Helpers -# ================ - -def route(sig): - options = sig.app.amqp.router.route( - sig.options, sig.task, sig.args, sig.kwargs, - ) - try: - queue = options.pop('queue') - except KeyError: - pass - else: - options.update(exchange=queue.exchange.name, - routing_key=queue.routing_key) - sig.set(**options) - return sig - - -# =============== -# Tasks Helpers -# =============== - -def get_probelist(): - now = time.time() - result_queue = config.brain_config['broker_probe'].queue - if cache_probelist['time'] is not None: - cache_time = now - cache_probelist['time'] - if cache_probelist['time'] is None or cache_time > PROBELIST_CACHE_TIME: - slist = list() - i = probe_app.control.inspect() - queues = i.active_queues() - if queues: - for infolist in queues.values(): - for info in infolist: - if info['name'] not in slist: - # exclude only predefined result queue - if info['name'] != result_queue: - slist.append(info['name']) - if len(slist) != 0: - # activate cache only on non empty list - cache_probelist['time'] = now - cache_probelist['list'] = slist - return cache_probelist['list'] - - -def flush_dir(ftpuser, scanid): - print("Flushing dir {0}".format(scanid)) - conf_ftp = config.brain_config['ftp_brain'] - with FtpTls(conf_ftp.host, - conf_ftp.port, - conf_ftp.username, - conf_ftp.password) as ftps: - ftps.deletepath("{0}/{1}".format(ftpuser, scanid), deleteParent=True) - # =================== # Tasks declaration @@ -112,223 +43,157 @@ def flush_dir(ftpuser, scanid): @scan_app.task() def probe_list(): - probe_list = get_probelist() + probe_list = celery_probe.get_probelist() if len(probe_list) > 0: - return IrmaTaskReturn.success(get_probelist()) + return IrmaTaskReturn.success(probe_list) else: return IrmaTaskReturn.error("No probe available") @scan_app.task(ignore_result=True) -def scan(scanid, scan_request): - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - sql = sql_connect(engine, dbname) - available_probelist = get_probelist() - jobs_list = [] +def scan(frontend_scanid, scan_request): + log.info("{0}: scan launched".format(frontend_scanid)) + # TODO user should be identified by RMQ vhost + # read vhost from config as workaround + rmqvhost = config.get_frontend_rmqvhost() + user_id = user_ctrl.get_userid(rmqvhost) + ftpuser = user_ctrl.get_ftpuser(user_id) # create an initial entry to track future errors - scan = Scan(scanid=scanid, - status=IrmaScanStatus.empty, - date=datetime.now()) - sql.add(scan) - sql.commit() - try: - # tell frontend that scanid is now known to brain - # progress available - frontend_app.send_task("frontend.tasks.scan_launched", - args=[scanid]) - user = sql_get_user(sql) - quota = get_quota(sql, user) - if quota is not None: - log.debug("{0} : Found user {1} ".format(scanid, user.name) + - "quota remaining {0}/{1}".format(quota, user.quota)) - else: - log.debug("{0} : Found user {1} quota disabled" - "".format(scanid, user.name)) - scan.user_id = user.id - sql.commit() - - for (filename, probelist) in scan_request.items(): - if probelist is None: - scan.status = IrmaScanStatus.error_probe_missing - sql.commit() - log.debug("{0}: Empty probe list".format(scanid)) - return IrmaTaskReturn.error("BrainTask: Empty probe list") - # first check probelist - for p in probelist: - # check if probe exists - if p not in available_probelist: - scan.status = IrmaScanStatus.error_probe_na - sql.commit() - msg = "BrainTask: Unknown probe {0}".format(p) - log.debug("{0}: Unknown probe {1}".format(scanid, p)) - return IrmaTaskReturn.error(msg) + # tell frontend that frontend_scanid is now known to brain + # progress available + scan_id = scan_ctrl.new(frontend_scanid, user_id, len(scan_request)) + # send a scan launched event to frontend + (remaining, quota) = user_ctrl.get_quota(user_id) + log.info("{0}: Found user".format(frontend_scanid) + + "quota remaining {0}/{1}".format(remaining, quota)) + if remaining <= 0: + scan_ctrl.error(scan_id, IrmaScanStatus.error) + + available_probelist = celery_probe.get_probelist() + for (filename, probe_list) in scan_request.items(): + if probe_list is None: + scan_ctrl.error(scan_id, IrmaScanStatus.error_probe_missing) + log.info("{0}: Empty probe list".format(frontend_scanid)) + return IrmaTaskReturn.error("BrainTask: Empty probe list") + # first check probe_list + for p in probe_list: + # check if probe exists + if p not in available_probelist: + scan_ctrl.error(scan_id, IrmaScanStatus.error_probe_missing) + msg = "BrainTask: Unknown probe {0}".format(p) + log.info("{0}: Unknown probe {1}".format(frontend_scanid, p)) + return IrmaTaskReturn.error(msg) - # Now, create one subtask per file to - # scan per probe according to quota - for probe in probelist: - if quota is not None and quota <= 0: + # Now, create one subtask per file to + # scan per probe according to quota + for probe in probe_list: + if remaining is not None: + if remaining <= 0: break - if quota: - quota -= 1 - hook_success = route( - results_app.signature("brain.tasks.scan_success", - (user.ftpuser, - scanid, - filename, probe))) - hook_error = route( - results_app.signature("brain.tasks.scan_error", - (user.ftpuser, - scanid, - filename, probe))) - jobs_list.append( - probe_app.send_task("probe.tasks.probe_scan", - args=(user.ftpuser, scanid, filename), - queue=probe, - link=hook_success, - link_error=hook_error)) - - scan.nbfiles = len(jobs_list) - scan.status = IrmaScanStatus.launched - sql.commit() - - log.debug( - "{0}: ".format(scanid) + - "{0} files receives / ".format(len(scan_request)) + - "{0} active probe / ".format(len(available_probelist)) + - "{0} probe used / ".format(len(probelist)) + - "{0} jobs launched".format(len(jobs_list))) - - except Exception as e: - scan_flush(scanid) - scan.status = IrmaScanStatus.error - sql.commit() - log.debug("{0} : Error {1}".format(scanid, e)) - return + else: + remaining -= 1 + job_id = scan_ctrl.job_new(scan_id, filename, probe) + task_id = celery_probe.job_launch(ftpuser, + frontend_scanid, + filename, + probe, + job_id) + scan_ctrl.job_set_taskid(job_id, task_id) + scan_ctrl.launched(scan_id) + celery_frontend.scan_launched(frontend_scanid) + log.info( + "{0}: ".format(frontend_scanid) + + "{0} files receives / ".format(len(scan_request)) + + "{0} active probe / ".format(len(available_probelist)) + + "{0} probe used / ".format(len(probe_list)) + + "{0} jobs launched".format(scan_ctrl.get_nbjobs(scan_id))) @scan_app.task() -def scan_progress(scanid): - log.debug("{0}: scan progress".format(scanid)) - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname +def scan_progress(frontend_scanid): try: - sql = sql_connect(engine, dbname) - user = sql_get_user(sql) - scan = sql_get_scan(sql, scanid, user) + log.info("{0}: scan progress".format(frontend_scanid)) + rmqvhost = config.get_frontend_rmqvhost() + user_id = user_ctrl.get_userid(rmqvhost) + scan_id = scan_ctrl.get_scan_id(frontend_scanid, user_id) + (status, progress_details) = scan_ctrl.progress(scan_id) res = {} - if IrmaScanStatus.is_error(scan.status): - status_str = IrmaScanStatus.label[scan.status] - msg = "Brain: scan error ({0})".format(status_str) - return IrmaTaskReturn.error(msg) - res['status'] = IrmaScanStatus.label[scan.status] - res['progress_details'] = None - if scan.status == IrmaScanStatus.launched: - if not scan.taskid: - log.debug("{0}: sql no task_id".format(scanid)) - msg = "Brain: task id not set (SQL)" - return IrmaTaskReturn.error(msg) - gr = get_groupresult(scan.taskid) - nbcompleted = nbsuccessful = 0 - for j in gr: - if j.ready(): - nbcompleted += 1 - if j.successful(): - nbsuccessful += 1 - res['progress_details'] = {} - res['progress_details']['total'] = len(gr) - res['progress_details']['finished'] = nbcompleted - res['progress_details']['successful'] = nbsuccessful + res['status'] = status + res['progress_details'] = progress_details return IrmaTaskReturn.success(res) - except IrmaTaskError as e: + except Exception as e: msg = "Brain: progress error {0}".format(e) return IrmaTaskReturn.error(msg) @scan_app.task() -def scan_cancel(scanid): +def scan_cancel(frontend_scanid): try: - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - sql = sql_connect(engine, dbname) - user = sql_get_user(sql) - scan = sql_get_scan(sql, scanid, user) + log.info("{0}: scan cancel".format(frontend_scanid)) + rmqvhost = config.get_frontend_rmqvhost() + user_id = user_ctrl.get_userid(rmqvhost) + scan_id = scan_ctrl.get_scan_id(frontend_scanid, user_id) + (status, progress_details) = scan_ctrl.progress(scan_id) + scan_ctrl.cancelling(scan_id) + pending_jobs = scan_ctrl.get_pending_jobs(scan_id) + cancel_details = None + if len(pending_jobs) != 0: + celery_probe.job_cancel(pending_jobs) + cancel_details = {} + cancel_details['total'] = progress_details['total'] + cancel_details['finished'] = progress_details['finished'] + cancel_details['cancelled'] = len(pending_jobs) + scan_ctrl.cancelled(scan_id) + scan_flush(frontend_scanid) res = {} - res['status'] = IrmaScanStatus.label[scan.status] - res['cancel_details'] = None - if IrmaScanStatus.is_error(scan.status): - scan_flush(scanid) - if scan.status == IrmaScanStatus.launched: - scan.status = IrmaScanStatus.cancelling - # commit as soon as possible to avoid cancelling again - sql.commit() - gr = get_groupresult(scan.taskid) - nbcompleted = nbcancelled = 0 - # iterate over jobs in groupresult - for j in gr: - if j.ready(): - nbcompleted += 1 - else: - j.revoke(terminate=True) - nbcancelled += 1 - scan.status = IrmaScanStatus.cancelled - sql.commit() - scan_flush(scanid) - res['cancel_details'] = {} - res['cancel_details']['total'] = len(gr) - res['cancel_details']['finished'] = nbcompleted - res['cancel_details']['cancelled'] = nbcancelled + res['status'] = status + res['cancel_details'] = cancel_details return IrmaTaskReturn.success(res) - except IrmaTaskError as e: + except Exception as e: msg = "Brain: cancel error {0}".format(e) return IrmaTaskReturn.error(msg) @results_app.task(ignore_result=True) -def scan_result(result, ftpuser, scanid, filename, probe): +def job_success(result, jobid): + try: + log.info("{0}: job success".format(jobid)) + (frontend_scanid, filename, probe) = scan_ctrl.job_info(jobid) + log.info("{0}: job success {1}".format(frontend_scanid, probe)) + celery_frontend.scan_result(frontend_scanid, filename, probe, result) + scan_ctrl.job_success(jobid) + except IrmaTaskError as e: + msg = "Brain: result error {0}".format(e) + return IrmaTaskReturn.error(msg) + + +@results_app.task(ignore_result=True) +def job_error(jobid): try: - frontend_app.send_task("frontend.tasks.scan_result", - args=(scanid, filename, probe, result)) - log.debug("{0} sent result {1}".format(scanid, probe)) - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - sql = sql_connect(engine, dbname) - user = sql_get_user(sql) - scan = sql_get_scan(sql, scanid, user) - gr = get_groupresult(scan.taskid) - nbtotal = len(gr) - nbcompleted = nbsuccessful = 0 - for j in gr: - if j.ready(): - nbcompleted += 1 - if j.successful(): - nbsuccessful += 1 - if nbtotal == nbcompleted: - # update scan status - scan.status = IrmaScanStatus.processed - sql.commit() + log.info("{0}: job error".format(jobid)) + (frontend_scanid, filename, probe) = scan_ctrl.job_info(jobid) + log.info("{0}: job error {1}".format(frontend_scanid, probe)) + result = {} + result[probe] = {} + result[probe]['status'] = -1 + result[probe]['name'] = probe + celery_frontend.scan_result(frontend_scanid, filename, probe, result) + scan_ctrl.job_error(jobid) except IrmaTaskError as e: msg = "Brain: result error {0}".format(e) return IrmaTaskReturn.error(msg) @results_app.task(ignore_result=True) -def scan_flush(scanid): +def scan_flush(frontend_scanid): try: - log.debug("{0} scan flush requested".format(scanid)) - engine = config.brain_config['sql_brain'].engine - dbname = config.brain_config['sql_brain'].dbname - sql = sql_connect(engine, dbname) - user = sql_get_user(sql) - scan = sql_get_scan(sql, scanid, user) - if not IrmaScanStatus.is_error(scan.status): - log.debug("{0} deleting group results entry".format(scanid)) - gr = get_groupresult(scan.taskid) - # delete groupresult - gr.delete() - # remove directory - log.debug("{0} deleting files".format(scanid)) - flush_dir(user.ftpuser, scanid) + log.info("{0} scan flush requested".format(frontend_scanid)) + rmqvhost = config.get_frontend_rmqvhost() + user_id = user_ctrl.get_userid(rmqvhost) + scan_id = scan_ctrl.get_scan_id(frontend_scanid, user_id) + scan_ctrl.flush(scan_id) + ftpuser = user_ctrl.get_ftpuser(user_id) + ftp_ctrl.flush_dir(ftpuser, frontend_scanid) except Exception as e: - log.debug("{0} flush error {1}".format(scanid, e)) + log.info("{0} flush error {1}".format(scan_id, e)) return From 71539ac5782bf413e29cef2ea0eb793b9860ad2a Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 09:40:05 +0200 Subject: [PATCH 07/21] force refresh of jobs when querying scan.jobs --- brain/models/sqlobjects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/brain/models/sqlobjects.py b/brain/models/sqlobjects.py index 38ad8c6..49f0f2d 100644 --- a/brain/models/sqlobjects.py +++ b/brain/models/sqlobjects.py @@ -106,7 +106,7 @@ class Scan(Base, SQLDatabaseObject): index=True, nullable=False, ) - jobs = relationship("Job", backref="scan") + jobs = relationship("Job", backref="scan", lazy='subquery') def __init__(self, frontend_scanid, user_id, nb_files): self.scan_id = frontend_scanid From a5bae9a5a9ffa53ab0c7da654e511ab150cb8ac9 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 09:43:51 +0200 Subject: [PATCH 08/21] Add sql user creation script --- scripts/create_user.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 scripts/create_user.py diff --git a/scripts/create_user.py b/scripts/create_user.py new file mode 100644 index 0000000..a27853c --- /dev/null +++ b/scripts/create_user.py @@ -0,0 +1,23 @@ +import sys +from brain.models.sqlobjects import User +from brain.helpers.sql import session_transaction + + +if len(sys.argv) not in (4, 5): + print("usage: {0} [quota]\n" + " with a string\n" + " the rmqvhost used for the frontend\n" + " the ftpuser used by the frontend\n" + " [quota] the number of file scan quota\n" + "example: {0} test1 mqfrontend frontend" + "".format(sys.argv[0])) + sys.exit(1) + +# quota is in number of files (0 means disabled) +quota = int(sys.argv[4]) if len(sys.argv) == 5 else 0 +with session_transaction() as session: + user = User(name=sys.argv[1], + rmqvhost=sys.argv[2], + ftpuser=sys.argv[3], + quota=quota) + user.save(session) From 7b8df45999d555ca2d7c88c66909c4e7152ce092 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 16:08:44 +0200 Subject: [PATCH 09/21] add helpers and controllers --- brain/controllers/frontendtasks.py | 36 +++++++++++++ brain/controllers/ftpctrl.py | 27 ++++++++++ brain/controllers/probetasks.py | 81 ++++++++++++++++++++++++++++++ brain/controllers/userctrl.py | 39 ++++++++++++++ brain/helpers/celerytasks.py | 60 ++++++++++++++++++++++ 5 files changed, 243 insertions(+) create mode 100644 brain/controllers/frontendtasks.py create mode 100644 brain/controllers/ftpctrl.py create mode 100644 brain/controllers/probetasks.py create mode 100644 brain/controllers/userctrl.py create mode 100644 brain/helpers/celerytasks.py diff --git a/brain/controllers/frontendtasks.py b/brain/controllers/frontendtasks.py new file mode 100644 index 0000000..c84c7d4 --- /dev/null +++ b/brain/controllers/frontendtasks.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +import celery +import config.parser as config +from brain.helpers.celerytasks import async_call + +frontend_app = celery.Celery('frontendtasks') +config.conf_frontend_celery(frontend_app) +config.configure_syslog(frontend_app) + + +def scan_launched(frontend_scanid): + async_call(frontend_app, + "frontend.tasks", + "scan_launched", + args=[frontend_scanid]) + + +def scan_result(frontend_scanid, filename, probe, result): + async_call(frontend_app, + "frontend.tasks", + "scan_result", + args=[frontend_scanid, filename, probe, result]) diff --git a/brain/controllers/ftpctrl.py b/brain/controllers/ftpctrl.py new file mode 100644 index 0000000..8070743 --- /dev/null +++ b/brain/controllers/ftpctrl.py @@ -0,0 +1,27 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +from lib.irma.ftp.handler import FtpTls +import config.parser as config + + +def flush_dir(ftpuser, scanid): + print("Flushing dir {0}".format(scanid)) + conf_ftp = config.brain_config['ftp_brain'] + with FtpTls(conf_ftp.host, + conf_ftp.port, + conf_ftp.username, + conf_ftp.password) as ftps: + ftps.deletepath("{0}/{1}".format(ftpuser, scanid), deleteParent=True) diff --git a/brain/controllers/probetasks.py b/brain/controllers/probetasks.py new file mode 100644 index 0000000..96477eb --- /dev/null +++ b/brain/controllers/probetasks.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +import celery +import time +import config.parser as config +from brain.helpers.celerytasks import route, async_call + + +results_app = celery.Celery('resultstasks') +config.conf_results_celery(results_app) + +probe_app = celery.Celery('probetasks') +config.conf_results_celery(probe_app) + +# Time to cache the probe list +# to avoid asking to rabbitmq +PROBELIST_CACHE_TIME = 60 +cache_probelist = {'list': None, 'time': None} + + +def get_probelist(): + # get active queues list from probe celery app + now = time.time() + result_queue = config.brain_config['broker_probe'].queue + if cache_probelist['time'] is not None: + cache_time = now - cache_probelist['time'] + if cache_probelist['time'] is None or cache_time > PROBELIST_CACHE_TIME: + slist = list() + i = probe_app.control.inspect() + queues = i.active_queues() + if queues: + for infolist in queues.values(): + for info in infolist: + if info['name'] not in slist: + # exclude only predefined result queue + if info['name'] != result_queue: + slist.append(info['name']) + if len(slist) != 0: + # activate cache only on non empty list + cache_probelist['time'] = now + cache_probelist['list'] = slist + return cache_probelist['list'] + + +# ============ +# Task calls +# ============ + +def job_launch(ftpuser, frontend_scanid, filename, probe, job_id): + """ send a task to the brain to flush the scan files""" + hook_success = route( + results_app.signature("brain.tasks.job_success", + [job_id])) + hook_error = route( + results_app.signature("brain.tasks.job_error", + [job_id])) + task = async_call(probe_app, + "probe.tasks", + "probe_scan", + args=(ftpuser, frontend_scanid, filename), + queue=probe, + link=hook_success, + link_error=hook_error) + return task.id + + +def job_cancel(job_list): + probe_app.control.revoke(job_list, terminate=True) diff --git a/brain/controllers/userctrl.py b/brain/controllers/userctrl.py new file mode 100644 index 0000000..a0a38c2 --- /dev/null +++ b/brain/controllers/userctrl.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +from brain.models.sqlobjects import User +from brain.helpers.sql import session_query, session_transaction + + +def get_userid(rmqvhost): + with session_query() as session: + user = User.get_by_rmqvhost(rmqvhost, session) + return user.id + + +def get_quota(user_id): + with session_query() as session: + user = User.load(user_id, session) + quota = user.quota + remaining = None + if quota is not None: + remaining = user.remaining_quota(session) + return (remaining, quota) + + +def get_ftpuser(user_id): + with session_query() as session: + user = User.load(user_id, session) + return user.ftpuser diff --git a/brain/helpers/celerytasks.py b/brain/helpers/celerytasks.py new file mode 100644 index 0000000..f4c3f27 --- /dev/null +++ b/brain/helpers/celerytasks.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +import celery +from lib.irma.common.exceptions import IrmaTaskError + + +# ================ +# Celery Helpers +# ================ + +def route(sig): + options = sig.app.amqp.router.route( + sig.options, sig.task, sig.args, sig.kwargs, + ) + try: + queue = options.pop('queue') + except KeyError: + pass + else: + options.update(exchange=queue.exchange.name, + routing_key=queue.routing_key) + sig.set(**options) + return sig + + +def sync_call(celery_app, taskpath, taskname, timeout, **kwargs): + """ send a task to the celery app with specified args + and wait until timeout param for result + """ + try: + full_task_path = "{0}.{1}".format(taskpath, taskname) + task = celery_app.send_task(full_task_path, **kwargs) + (status, res) = task.get(timeout=timeout) + return (status, res) + except celery.exceptions.TimeoutError: + raise IrmaTaskError("Celery timeout - {0}".format(taskname)) + + +def async_call(celery_app, taskpath, taskname, **kwargs): + """ send a task to the celery app with specified args + without waiting for results. + """ + try: + full_task_path = "{0}.{1}".format(taskpath, taskname) + return celery_app.send_task(full_task_path, **kwargs) + except: + raise IrmaTaskError("Celery error - {0}".format(taskname)) From 49980894fba52772f225ba0417e36c678c050a72 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 16:24:48 +0200 Subject: [PATCH 10/21] remove backend for probe and frontend, and default to amqp backend for brain synchronous tasks --- config/brain.ini | 8 -------- config/parser.py | 52 ++++++++++++------------------------------------ 2 files changed, 13 insertions(+), 47 deletions(-) diff --git a/config/brain.ini b/config/brain.ini index bc15a6f..d5110d4 100644 --- a/config/brain.ini +++ b/config/brain.ini @@ -19,14 +19,6 @@ username = frontend password = frontend queue = frontend -[backend_brain] -host = 127.0.0.1 -db = 0 - -[backend_probe] -host = 127.0.0.1 -db = 1 - [sqldb] dbms = sqlite dialect = diff --git a/config/parser.py b/config/parser.py index 26eb648..04f037d 100644 --- a/config/parser.py +++ b/config/parser.py @@ -56,16 +56,6 @@ ('password', TemplatedConfiguration.string, None), ('queue', TemplatedConfiguration.string, None) ], - 'backend_brain': [ - ('host', TemplatedConfiguration.string, None), - ('port', TemplatedConfiguration.integer, 6379), - ('db', TemplatedConfiguration.integer, None), - ], - 'backend_probe': [ - ('host', TemplatedConfiguration.string, None), - ('port', TemplatedConfiguration.integer, 6379), - ('db', TemplatedConfiguration.integer, None), - ], 'sqldb': [ ('dbms', TemplatedConfiguration.string, None), ('dialect', TemplatedConfiguration.string, None), @@ -99,7 +89,6 @@ def _conf_celery(app, broker, backend=None, queue=None): app.conf.update(BROKER_URL=broker, - CELERY_RESULT_BACKEND=backend, CELERY_ACCEPT_CONTENT=['json'], CELERY_TASK_SERIALIZER='json', CELERY_RESULT_SERIALIZER='json' @@ -117,6 +106,8 @@ def _conf_celery(app, broker, backend=None, queue=None): def conf_brain_celery(app): broker = get_brain_broker_uri() + # default backend is amqp + # same as broker backend = get_brain_backend_uri() queue = brain_config.broker_brain.queue _conf_celery(app, broker, backend=backend, queue=queue) @@ -124,47 +115,26 @@ def conf_brain_celery(app): def conf_probe_celery(app): broker = get_probe_broker_uri() - backend = get_probe_backend_uri() - _conf_celery(app, broker, backend=backend) + _conf_celery(app, broker, backend=False) def conf_frontend_celery(app): broker = get_frontend_broker_uri() queue = brain_config.broker_frontend.queue - _conf_celery(app, broker, queue=queue) + _conf_celery(app, broker, backend=False, queue=queue) def conf_results_celery(app): broker = get_probe_broker_uri() queue = brain_config.broker_probe.queue - _conf_celery(app, broker, queue=queue) - - -# ================= -# Backend helpers -# ================= - -def _get_backend_uri(backend_config): - host = backend_config.host - port = backend_config.port - db = backend_config.db - return "redis://{host}:{port}/{db}" \ - "".format(host=host, port=port, db=db) - - -def get_brain_backend_uri(): - return _get_backend_uri(brain_config.backend_brain) - - -def get_probe_backend_uri(): - return _get_backend_uri(brain_config.backend_probe) + _conf_celery(app, broker, backend=False, queue=queue) # ================= # Brocker helpers # ================= -def _get_broker_uri(broker_config): +def _get_amqp_uri(broker_config): user = broker_config.username pwd = broker_config.password host = broker_config.host @@ -175,15 +145,19 @@ def _get_broker_uri(broker_config): def get_brain_broker_uri(): - return _get_broker_uri(brain_config.broker_brain) + return _get_amqp_uri(brain_config.broker_brain) + + +def get_brain_backend_uri(): + return _get_amqp_uri(brain_config.broker_brain) def get_probe_broker_uri(): - return _get_broker_uri(brain_config.broker_probe) + return _get_amqp_uri(brain_config.broker_probe) def get_frontend_broker_uri(): - return _get_broker_uri(brain_config.broker_frontend) + return _get_amqp_uri(brain_config.broker_frontend) def get_frontend_rmqvhost(): From 4ea50e3d82e0accf1a559df3f56dc572e2866853 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 16:26:04 +0200 Subject: [PATCH 11/21] add unittest for controllers --- tests/brain.ini | 42 ++++++++ tests/test_controllers.py | 219 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 tests/brain.ini create mode 100644 tests/test_controllers.py diff --git a/tests/brain.ini b/tests/brain.ini new file mode 100644 index 0000000..156ac91 --- /dev/null +++ b/tests/brain.ini @@ -0,0 +1,42 @@ +[broker_brain] +host = 127.0.0.1 +vhost = mqbrain +username = brain +password = brain +queue = brain + +[broker_probe] +host = 127.0.0.1 +vhost = mqprobe +username = probe +password = probe +queue = results + +[broker_frontend] +host = 127.0.0.1 +vhost = mqfrontend +username = frontend +password = frontend +queue = frontend + +[backend_brain] +host = 127.0.0.1 +db = 0 + +[backend_probe] +host = 127.0.0.1 +db = 1 + +[sqldb] +dbms = sqlite +dialect = +username = +passwd = +host = +dbname = test +tables_prefix = test + +[ftp_brain] +host = 127.0.0.1 +username=probe +password=probe diff --git a/tests/test_controllers.py b/tests/test_controllers.py new file mode 100644 index 0000000..fb466f2 --- /dev/null +++ b/tests/test_controllers.py @@ -0,0 +1,219 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +import sys +import os +from random import shuffle +pardir = os.path.abspath(os.path.join(__file__, os.path.pardir)) +sys.path.append(os.path.dirname(pardir)) + +import logging +import unittest +# Test config +cwd = os.path.abspath(os.path.dirname(__file__)) +os.environ['IRMA_BRAIN_CFG_PATH'] = cwd +from brain.helpers.sql import session_query, session_transaction + + +from lib.irma.common.utils import IrmaScanStatus +from lib.common.utils import UUID +from lib.common.compat import timestamp + +import brain.controllers.scanctrl as scan_ctrl +import brain.controllers.userctrl as user_ctrl +from brain.models.sqlobjects import Scan, User +from lib.irma.common.exceptions import IrmaValueError, IrmaTaskError, \ + IrmaDatabaseError, IrmaDatabaseResultNotFound + + +# ================= +# Logging options +# ================= + +def enable_logging(level=logging.INFO, + handler=None, + formatter=None): + global log + log = logging.getLogger() + if formatter is None: + formatter = logging.Formatter("%(asctime)s [%(name)s] " + + "%(levelname)s: %(message)s") + if handler is None: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + log.addHandler(handler) + log.setLevel(level) + + +# ============ +# Test cases +# ============ + +class scanctrlTestCase(unittest.TestCase): + def setUp(self): + self.user_name = "test_user" + self.user_rmqvhost = "test_vhost" + self.user_ftpuser = "test_ftpuser" + self.user_quota = 100 + self.scanid = UUID.generate() + try: + self.userid = user_ctrl.get_userid(self.user_rmqvhost) + except IrmaDatabaseResultNotFound: + # user does not exist create one + with session_transaction() as session: + user = User(self.user_name, + self.user_rmqvhost, + self.user_ftpuser, + self.user_quota) + user.save(session) + self.userid = user.id + + def tearDown(self): + pass + + +class TestScanController(scanctrlTestCase): + # ====== + # SCAN + # ====== + def test_scan_new_id(self): + # test we got an id + scanid = scan_ctrl.new(self.scanid, self.userid, 0) + self.assertIsNotNone(scanid) + + def test_scan_new_status(self): + # test we have corrects fields + scanid = scan_ctrl.new(self.scanid, self.userid, 10) + with session_query() as session: + scan = Scan.load(scanid, session) + self.assertIsNotNone(scan.timestamp) + self.assertEqual(scan.status, IrmaScanStatus.empty) + self.assertEqual(scan.nb_files, 10) + self.assertIsNotNone(scan.nb_jobs) + + def test_scan_launched(self): + scan_id = scan_ctrl.new(self.scanid, self.userid, 10) + for i in xrange(0, 10): + for probe in ['probe1', 'probe2']: + scan_ctrl.job_new(scan_id, "file-{0}".format(i), probe) + scan_ctrl.launched(scan_id) + with session_query() as session: + scan = Scan.load(scan_id, session) + self.assertEqual(scan.status, IrmaScanStatus.launched) + self.assertEqual(scan.nb_files, 10) + self.assertEqual(scan.nb_jobs, 20) + + def test_scan_error(self): + scanid = scan_ctrl.new(self.scanid, self.userid, 10) + for code in IrmaScanStatus.label.keys(): + scan_ctrl.error(scanid, code) + with session_query() as session: + scan = Scan.load(scanid, session) + self.assertEqual(scan.status, code) + + def test_scan_job_success(self): + scan_id = scan_ctrl.new(self.scanid, self.userid, 10) + job_ids = [] + for i in xrange(0, 10): + for probe in ['probe1', 'probe2']: + job_ids.append(scan_ctrl.job_new(scan_id, + "file-{0}".format(i), + probe)) + scan_ctrl.launched(scan_id) + shuffle(job_ids) + for job_id in job_ids: + scan_ctrl.job_success(job_id) + self.assertTrue(scan_ctrl.check_finished(scan_id)) + with session_query() as session: + scan = Scan.load(scan_id, session) + self.assertEqual(scan.status, IrmaScanStatus.processed) + self.assertEqual(scan.nb_files, 10) + self.assertEqual(scan.nb_jobs, 20) + + def test_scan_job_error(self): + scan_id = scan_ctrl.new(self.scanid, self.userid, 10) + job_ids = [] + for i in xrange(0, 10): + for probe in ['probe1', 'probe2']: + job_ids.append(scan_ctrl.job_new(scan_id, + "file-{0}".format(i), + probe)) + scan_ctrl.launched(scan_id) + shuffle(job_ids) + for job_id in job_ids: + scan_ctrl.job_error(job_id) + self.assertTrue(scan_ctrl.check_finished(scan_id)) + with session_query() as session: + scan = Scan.load(scan_id, session) + self.assertEqual(scan.status, IrmaScanStatus.processed) + self.assertEqual(scan.nb_files, 10) + self.assertEqual(scan.nb_jobs, 20) + + def test_scan_progress(self): + scan_id = scan_ctrl.new(self.scanid, self.userid, 10) + job_ids = [] + for i in xrange(0, 10): + for probe in ['probe1', 'probe2']: + job_ids.append(scan_ctrl.job_new(scan_id, + "file-{0}".format(i), + probe)) + scan_ctrl.launched(scan_id) + shuffle(job_ids) + for i, job_id in enumerate(job_ids[:-1]): + scan_ctrl.job_success(job_id) + (status, progress_details) = scan_ctrl.progress(scan_id) + self.assertEqual(status, + IrmaScanStatus.label[IrmaScanStatus.launched]) + self.assertIsNotNone(progress_details) + self.assertEqual(progress_details['successful'], i + 1) + + def test_scan_cancel(self): + scanid = scan_ctrl.new(self.scanid, self.userid, 10) + for code in IrmaScanStatus.label.keys(): + scan_ctrl.error(scanid, code) + with session_query() as session: + scan = Scan.load(scanid, session) + self.assertEqual(scan.status, code) + + +class TestUserController(scanctrlTestCase): + + def test_user_quota(self): + # test we have a correct quota + (remaining, quota) = user_ctrl.get_quota(self.userid) + self.assertIsNotNone(quota) + self.assertEqual(type(quota), int) + self.assertIsNotNone(remaining) + self.assertEqual(type(remaining), int) + + def test_user_quota_update(self): + # test update quota + (before, _) = user_ctrl.get_quota(self.userid) + scanid = scan_ctrl.new(self.scanid, self.userid, 2) + for i in xrange(0, 10): + for probe in ['probe1', 'probe2']: + scan_ctrl.job_new(scanid, "file-{0}".format(i), probe) + scan_ctrl.launched(scanid) + (after, _) = user_ctrl.get_quota(self.userid) + self.assertEqual(before - after, 20) + with session_query() as session: + user = User.load(self.userid, session) + scan_ids = [scan.id for scan in user.scans] + self.assertNotEqual(scan_ids, []) + self.assertTrue(scanid in scan_ids) + +if __name__ == '__main__': + enable_logging() + unittest.main() From 9cc16067243a0144696a0ffac8a3425f9414c5d2 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 17:27:41 +0200 Subject: [PATCH 12/21] add expires parmater for amqp backend --- config/parser.py | 1 + 1 file changed, 1 insertion(+) diff --git a/config/parser.py b/config/parser.py index 04f037d..29fae06 100644 --- a/config/parser.py +++ b/config/parser.py @@ -95,6 +95,7 @@ def _conf_celery(app, broker, backend=None, queue=None): ) if backend is not None: app.conf.update(CELERY_RESULT_BACKEND=backend) + app.conf.update(CELERY_TASK_RESULT_EXPIRES=300) # 5 minutes if queue is not None: app.conf.update(CELERY_DEFAULT_QUEUE=queue, # delivery_mode=1 enable transient mode From 45c9eb8ab31be3fbc0e534fc889cddc957da63e9 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 17:30:33 +0200 Subject: [PATCH 13/21] remove redis packaging --- Makefile | 1 - debian/control | 7 +----- debian/irma-brain-redis.postinst | 41 -------------------------------- 3 files changed, 1 insertion(+), 48 deletions(-) delete mode 100644 debian/irma-brain-redis.postinst diff --git a/Makefile b/Makefile index ab665e9..ecec06e 100644 --- a/Makefile +++ b/Makefile @@ -40,6 +40,5 @@ clean: rm -rf $(CURDIR)/debian/irma-brain-logrotate rm -rf $(CURDIR)/debian/irma-brain-rabbitmq rm -rf $(CURDIR)/debian/irma-brain-ftpd - rm -rf $(CURDIR)/debian/irma-brain-redis rm -rf build/ MANIFEST $(DISTDIR) $(BUILDIR) find . -name '*.pyc' -delete diff --git a/debian/control b/debian/control index 0bb7fa4..3cf2978 100644 --- a/debian/control +++ b/debian/control @@ -32,11 +32,6 @@ Architecture: all Depends: ${misc:Depends}, debconf, pure-ftpd, openssl Description: FTPs server for IRMA Brain -Package: irma-brain-redis -Architecture: all -Depends: ${misc:Depends}, redis-server -Description: Redis server configuration for IRMA Brain - Package: irma-brain-rabbitmq Architecture: all Depends: ${misc:Depends}, debconf, rabbitmq-server @@ -44,6 +39,6 @@ Description: RabbitMQ server configuration for IRMA Brain Package: irma-brain Architecture: all -Depends: ${shlibs:Depends}, ${misc:Depends}, irma-brain-redis, irma-brain-app, irma-brain-rabbitmq, irma-brain-ftpd +Depends: ${shlibs:Depends}, ${misc:Depends}, irma-brain-app, irma-brain-rabbitmq, irma-brain-ftpd Recommends: irma-brain-rsyslog, irma-brain-logrotate Description: IRMA Brain meta-package diff --git a/debian/irma-brain-redis.postinst b/debian/irma-brain-redis.postinst deleted file mode 100644 index 10c7290..0000000 --- a/debian/irma-brain-redis.postinst +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/sh -# postinst script for irma-brain -# -# see: dh_installdeb(1) - -set -e - -# summary of how this script can be called: -# * `configure' -# * `abort-upgrade' -# * `abort-remove' `in-favour' -# -# * `abort-remove' -# * `abort-deconfigure' `in-favour' -# `removing' -# -# for details, see http://www.debian.org/doc/debian-policy/ or -# the debian-policy package - - -case "$1" in - configure) - sed -i "s/^bind 127.0.0.1/# bind 127.0.0.1/g" /etc/redis/redis.conf - invoke-rc.d redis-server restart - ;; - - abort-upgrade|abort-remove|abort-deconfigure) - ;; - - *) - echo "postinst called with unknown argument \`$1'" >&2 - exit 1 - ;; -esac - -# dh_installdeb will replace this with shell code automatically -# generated by other debhelper scripts. - - - -exit 0 From e198551f8d092a6a4db7f1a7af200b33851900c3 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 16 Sep 2014 17:37:05 +0200 Subject: [PATCH 14/21] bump version to 1.1.0 --- Makefile | 2 +- setup.cfg | 6 +++--- setup.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index ecec06e..e8e18a2 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ DESTDIR=/ DISTDIR=$(CURDIR)/deb_dist BUILDIR=$(CURDIR)/debian/irma-brain PROJECT=irma-brain -VERSION=1.0.4 +VERSION=1.1.0 all: @echo "make source - Create source package" diff --git a/setup.cfg b/setup.cfg index 2965299..daab590 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,8 +7,8 @@ install-purelib = $base install-platlib = $base [build_sphinx] -project = 'irma-probe' -version = 1.0 -release = 1.0.4 +project = 'irma-brain' +version = 1.1 +release = 1.1.0 source-dir = docs/ build-dir = docs/_build diff --git a/setup.py b/setup.py index 8aaf71f..d186ea4 100644 --- a/setup.py +++ b/setup.py @@ -353,7 +353,7 @@ def _configure_application(self): setup( name='irma-brain-app', - version='1.0.4', + version='1.1.0', url='http://irma.quarkslab.com', author='Quarkslab', author_email='irma@quarkslab.com', From 4e85b2439df50c76155dac20b5015a021ab12153 Mon Sep 17 00:00:00 2001 From: aquint Date: Mon, 29 Sep 2014 12:15:33 +0200 Subject: [PATCH 15/21] move job function to job controller --- brain/controllers/jobctrl.py | 66 +++++++++++++++++++++++++++++++++++ brain/controllers/scanctrl.py | 54 +--------------------------- brain/tasks.py | 13 +++---- 3 files changed, 74 insertions(+), 59 deletions(-) create mode 100644 brain/controllers/jobctrl.py diff --git a/brain/controllers/jobctrl.py b/brain/controllers/jobctrl.py new file mode 100644 index 0000000..833bba0 --- /dev/null +++ b/brain/controllers/jobctrl.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2013-2014 QuarksLab. +# This file is part of IRMA project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the top-level directory +# of this distribution and at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# No part of the project, including this file, may be copied, +# modified, propagated, or distributed except according to the +# terms contained in the LICENSE file. + +from brain.controllers import scanctrl +from brain.models.sqlobjects import Job +from brain.helpers.sql import session_query, session_transaction +from lib.common.compat import timestamp + + +def new(scan_id, filename, probe): + with session_transaction() as session: + j = Job(filename, probe, scan_id) + j.save(session) + session.commit() + return j.id + + +def _finish(job_id, status): + with session_transaction() as session: + job = Job.load(job_id, session) + job.status = status + job.ts_end = timestamp() + job.update(['status', 'ts_end'], session) + scan_id = job.scan.id + scanctrl.check_finished(scan_id) + + +def success(job_id): + _finish(job_id, Job.success) + + +def error(job_id): + _finish(job_id, Job.error) + + +def set_taskid(job_id, task_id): + with session_transaction() as session: + job = Job.load(job_id, session) + job.task_id = task_id + + +def info(job_id): + with session_query() as session: + job = Job.load(job_id, session) + frontend_scan_id = job.scan.scan_id + filename = job.filename + probe = job.probename + return (frontend_scan_id, filename, probe) + + +def duration(job_id): + with session_query() as session: + job = Job.load(job_id, session) + return (job.ts_end - job.ts_start) diff --git a/brain/controllers/scanctrl.py b/brain/controllers/scanctrl.py index ef1bb88..5c2ab9b 100644 --- a/brain/controllers/scanctrl.py +++ b/brain/controllers/scanctrl.py @@ -13,11 +13,10 @@ # modified, propagated, or distributed except according to the # terms contained in the LICENSE file. -from brain.models.sqlobjects import Scan, Job +from brain.models.sqlobjects import Scan from brain.helpers.sql import session_query, session_transaction from lib.irma.common.utils import IrmaScanStatus from lib.irma.common.exceptions import IrmaValueError -from lib.common.compat import timestamp def new(frontend_scan_id, user_id, nb_files): @@ -40,15 +39,6 @@ def get_nbjobs(scan_id): return scan.nb_jobs -def get_job_info(job_id): - with session_query() as session: - job = Job.load(job_id, session) - frontend_scan_id = job.scan.scan_id - filename = job.filename - probe = job.probename - return (frontend_scan_id, filename, probe) - - def _set_status(scan_id, code): with session_transaction() as session: scan = Scan.load(scan_id, session) @@ -116,45 +106,3 @@ def error(scan_id, code): with session_transaction() as session: scan = Scan.load(scan_id, session) scan.status = code - -# ========== -# Job ctrl -# ========== - - -def job_new(scan_id, filename, probe): - with session_transaction() as session: - j = Job(filename, probe, scan_id) - j.save(session) - session.commit() - return j.id - - -def _job_finish(job_id, status): - with session_transaction() as session: - job = Job.load(job_id, session) - job.status = status - job.ts_end = timestamp() - job.update(['status', 'ts_end'], session) - scan_id = job.scan.id - check_finished(scan_id) - - -def job_success(job_id): - _job_finish(job_id, Job.success) - - -def job_error(job_id): - _job_finish(job_id, Job.error) - - -def job_set_taskid(job_id, task_id): - with session_transaction() as session: - job = Job.load(job_id, session) - job.task_id = task_id - - -def job_info(job_id): - with session_transaction() as session: - job = Job.load(job_id, session) - return (job.scan.scan_id, job.filename, job.probename) diff --git a/brain/tasks.py b/brain/tasks.py index 138b128..7f41871 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -19,6 +19,7 @@ from celery.utils.log import get_task_logger import brain.controllers.userctrl as user_ctrl import brain.controllers.scanctrl as scan_ctrl +import brain.controllers.jobctrl as job_ctrl import brain.controllers.probetasks as celery_probe import brain.controllers.frontendtasks as celery_frontend import brain.controllers.ftpctrl as ftp_ctrl @@ -92,13 +93,13 @@ def scan(frontend_scanid, scan_request): break else: remaining -= 1 - job_id = scan_ctrl.job_new(scan_id, filename, probe) + job_id = job_ctrl.new(scan_id, filename, probe) task_id = celery_probe.job_launch(ftpuser, frontend_scanid, filename, probe, job_id) - scan_ctrl.job_set_taskid(job_id, task_id) + job_ctrl.set_taskid(job_id, task_id) scan_ctrl.launched(scan_id) celery_frontend.scan_launched(frontend_scanid) log.info( @@ -158,10 +159,10 @@ def scan_cancel(frontend_scanid): def job_success(result, jobid): try: log.info("{0}: job success".format(jobid)) - (frontend_scanid, filename, probe) = scan_ctrl.job_info(jobid) - log.info("{0}: job success {1}".format(frontend_scanid, probe)) + (frontend_scanid, filename, probe) = job_ctrl.info(jobid) + log.info("{0}: probe {1}".format(frontend_scanid, probe)) celery_frontend.scan_result(frontend_scanid, filename, probe, result) - scan_ctrl.job_success(jobid) + job_ctrl.success(jobid) except IrmaTaskError as e: msg = "Brain: result error {0}".format(e) return IrmaTaskReturn.error(msg) @@ -173,12 +174,12 @@ def job_error(jobid): log.info("{0}: job error".format(jobid)) (frontend_scanid, filename, probe) = scan_ctrl.job_info(jobid) log.info("{0}: job error {1}".format(frontend_scanid, probe)) + job_ctrl.error(jobid) result = {} result[probe] = {} result[probe]['status'] = -1 result[probe]['name'] = probe celery_frontend.scan_result(frontend_scanid, filename, probe, result) - scan_ctrl.job_error(jobid) except IrmaTaskError as e: msg = "Brain: result error {0}".format(e) return IrmaTaskReturn.error(msg) From 68c56b8fc176bd086146dff17e983287f819869d Mon Sep 17 00:00:00 2001 From: aquint Date: Mon, 29 Sep 2014 12:15:50 +0200 Subject: [PATCH 16/21] move timestamp to float --- brain/models/sqlobjects.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/brain/models/sqlobjects.py b/brain/models/sqlobjects.py index 49f0f2d..fe3dfb7 100644 --- a/brain/models/sqlobjects.py +++ b/brain/models/sqlobjects.py @@ -15,7 +15,7 @@ import os from brain.helpers.sql import sql_db_connect -from sqlalchemy import Column, Integer, String, \ +from sqlalchemy import Column, Integer, Float, String, \ event, ForeignKey import config.parser as config from sqlalchemy.engine import Engine @@ -90,7 +90,7 @@ class Scan(Base, SQLDatabaseObject): name='status' ) timestamp = Column( - Integer, + Float(precision=2), nullable=False, name='timestamp' ) From 98514295c7ab0a2f77851d82d6638385595d358b Mon Sep 17 00:00:00 2001 From: aquint Date: Mon, 29 Sep 2014 14:55:51 +0200 Subject: [PATCH 17/21] fix job_error hook parameter and frontend response --- brain/tasks.py | 18 ++++++++++-------- lib | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/brain/tasks.py b/brain/tasks.py index 7f41871..436fb01 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -23,7 +23,7 @@ import brain.controllers.probetasks as celery_probe import brain.controllers.frontendtasks as celery_frontend import brain.controllers.ftpctrl as ftp_ctrl -from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus +from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus, IrmaProbeType from lib.irma.common.exceptions import IrmaTaskError # Get celery's logger @@ -169,16 +169,18 @@ def job_success(result, jobid): @results_app.task(ignore_result=True) -def job_error(jobid): +def job_error(parent_taskid, jobid): try: - log.info("{0}: job error".format(jobid)) - (frontend_scanid, filename, probe) = scan_ctrl.job_info(jobid) - log.info("{0}: job error {1}".format(frontend_scanid, probe)) + log.info("{0}: job error on {1}".format(jobid, parent_taskid)) + (frontend_scanid, filename, probe) = job_ctrl.info(jobid) + log.info("{0}: probe {1}".format(frontend_scanid, probe)) job_ctrl.error(jobid) result = {} - result[probe] = {} - result[probe]['status'] = -1 - result[probe]['name'] = probe + result['status'] = -1 + result['name'] = probe + result['error'] = "Brain job error" + result['type'] = IrmaProbeType.unknown + result['duration'] = job_ctrl.duration(jobid) celery_frontend.scan_result(frontend_scanid, filename, probe, result) except IrmaTaskError as e: msg = "Brain: result error {0}".format(e) diff --git a/lib b/lib index 0c3e2a3..449bed6 160000 --- a/lib +++ b/lib @@ -1 +1 @@ -Subproject commit 0c3e2a3dcadb122b1e2c5a5cc8d61a102c5b2992 +Subproject commit 449bed6f99f45c118de25ed7468836c0bbe6df6e From a9d21f08ac390edbf650ec5b54d5d13ecbfa75fb Mon Sep 17 00:00:00 2001 From: aquint Date: Mon, 29 Sep 2014 15:05:20 +0200 Subject: [PATCH 18/21] remove unknow type in error response --- brain/tasks.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/brain/tasks.py b/brain/tasks.py index 436fb01..10cbcc2 100644 --- a/brain/tasks.py +++ b/brain/tasks.py @@ -23,7 +23,7 @@ import brain.controllers.probetasks as celery_probe import brain.controllers.frontendtasks as celery_frontend import brain.controllers.ftpctrl as ftp_ctrl -from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus, IrmaProbeType +from lib.irma.common.utils import IrmaTaskReturn, IrmaScanStatus from lib.irma.common.exceptions import IrmaTaskError # Get celery's logger @@ -179,7 +179,6 @@ def job_error(parent_taskid, jobid): result['status'] = -1 result['name'] = probe result['error'] = "Brain job error" - result['type'] = IrmaProbeType.unknown result['duration'] = job_ctrl.duration(jobid) celery_frontend.scan_result(frontend_scanid, filename, probe, result) except IrmaTaskError as e: From d529b558e69810202a5316391bbeeab87f363578 Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 30 Sep 2014 11:59:13 +0200 Subject: [PATCH 19/21] fix orange group name --- AUTHORS | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AUTHORS b/AUTHORS index c83cc15..796762b 100644 --- a/AUTHORS +++ b/AUTHORS @@ -5,13 +5,13 @@ IRMA is a project co-funded by the following actors: * GOVCERT.LU (governmental CERT of Luxembourg) * Airbus Group * QuarksLab - * Orange DSI Groupe + * Orange Group IS&T The PRIMARY AUTHORS are (and/or have been): * Alexandre Quint - Lead Developer, QuarksLab * Fernand Lone-Sang - QuarksLab - * Bruno Dorsemaine - Orange DSI Groupe + * Bruno Dorsemaine - Orange Group IS&T * David Carle - QuarksLab * Guillaume Dedrie - QuarksLab From e0d779a10daec70bd2b02b4a836699e6288a8e5d Mon Sep 17 00:00:00 2001 From: aquint Date: Tue, 7 Oct 2014 10:30:28 +0200 Subject: [PATCH 20/21] turn scripts into module --- scripts/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 scripts/__init__.py diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 From d34d277396e46777f24d856566928301a2073865 Mon Sep 17 00:00:00 2001 From: Alexandre Quint Date: Tue, 7 Oct 2014 13:34:15 +0200 Subject: [PATCH 21/21] sync lib --- lib | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib b/lib index 449bed6..b923c8f 160000 --- a/lib +++ b/lib @@ -1 +1 @@ -Subproject commit 449bed6f99f45c118de25ed7468836c0bbe6df6e +Subproject commit b923c8f395d747914f53efc510cef1788800bed7