diff --git a/.vscode/settings.json b/.vscode/settings.json index 1f1c7bb0..4ec4389e 100755 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,8 @@ { "git.enabled": true, "files.exclude": { + "**/build": true, + "**/dist": true, "**/__pycache__": true, "**/.pytest_cache": true, "**/*.egg-info": true, diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1d329cf9..da1728f0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -45,6 +45,7 @@ be able to work on TabPy changes: git clone https://github.com/tableau/TabPy.git cd TabPy ``` + Before making any code changes run environment setup script. For Windows run this command from the repository root folder: diff --git a/docs/server-config.md b/docs/server-config.md index 88a806d0..4cf188b3 100755 --- a/docs/server-config.md +++ b/docs/server-config.md @@ -163,21 +163,32 @@ With the feature on additional information is logged for HTTP requests: caller i URL, client infomation (Tableau Desktop\Server), Tableau user name (for Tableau Server) and TabPy user name as shown in the example below: + ``` -2019-04-17,15:20:37 [INFO] (evaluation_plane_handler.py:evaluation_plane_handler:86): - ::1 calls POST http://localhost:9004/evaluate, - Client: Tableau Server 2019.2, - Tableau user: ogolovatyi, - TabPy user: user1 -function to evaluate=def _user_script(tabpy, _arg1, _arg2): +2019-05-02,13:50:08 [INFO] (base_handler.py:base_handler:90): Call ID: 934073bd-0d29-46d3-b693-b1e4b1efa9e4, Caller: ::1, Method: POST, Resource: http://localhost:9004/evaluate, Client: Postman for manual testing, Tableau user: ogolovatyi +2019-05-02,13:50:08 [DEBUG] (base_handler.py:base_handler:120): Checking if need to handle authentication, << +call ID: 934073bd-0d29-46d3-b693-b1e4b1efa9e4>> +2019-05-02,13:50:08 [DEBUG] (base_handler.py:base_handler:120): Handling authentication, <> +2019-05-02,13:50:08 [DEBUG] (base_handler.py:base_handler:120): Checking request headers for authentication d +ata, <> +2019-05-02,13:50:08 [DEBUG] (base_handler.py:base_handler:120): Validating credentials for user name "user1", + <> +2019-05-02,13:50:08 [DEBUG] (state.py:state:484): Collecting Access-Control-Allow-Origin from state file... +2019-05-02,13:50:08 [INFO] (base_handler.py:base_handler:120): function to evaluate=def _user_script(tabpy, _ +arg1, _arg2): res = [] for i in range(len(_arg1)): res.append(_arg1[i] * _arg2[i]) return res +, <> ``` + No passwords are logged. + +NOTE the request context details are logged with INFO level. diff --git a/tabpy-server/tabpy_server/app/app.py b/tabpy-server/tabpy_server/app/app.py index 96a48f12..cb366772 100644 --- a/tabpy-server/tabpy_server/app/app.py +++ b/tabpy-server/tabpy_server/app/app.py @@ -9,9 +9,7 @@ from tabpy_server import __version__ from tabpy_server.app.ConfigParameters import ConfigParameters from tabpy_server.app.SettingsParameters import SettingsParameters -from tabpy_server.app.util import ( - log_and_raise, - parse_pwd_file) +from tabpy_server.app.util import parse_pwd_file from tabpy_server.management.state import TabPyState from tabpy_server.management.util import _get_state_from_file from tabpy_server.psws.callbacks import (init_model_evaluator, init_ps_server) @@ -76,8 +74,9 @@ def run(self): 'keyfile': self.settings[SettingsParameters.KeyFile] }) else: - log_and_raise(f'Unsupported transfer protocol {protocol}.', - RuntimeError) + msg = f'Unsupported transfer protocol {protocol}.' + logger.critical(msg) + raise RuntimeError(msg) logger.info('Web service listening on port {}'.format( str(self.settings[SettingsParameters.Port]))) @@ -244,9 +243,10 @@ def set_parameter(settings_key, ConfigParameters.TABPY_PWD_FILE) if ConfigParameters.TABPY_PWD_FILE in self.settings: if not self._parse_pwd_file(): - log_and_raise('Failed to read passwords file %s' % - self.settings[ConfigParameters.TABPY_PWD_FILE], - RuntimeError) + msg = ('Failed to read passwords file ' + f'{self.settings[ConfigParameters.TABPY_PWD_FILE]}') + logger.critical(msg) + raise RuntimeError(msg) else: logger.info( "Password file is not specified: " @@ -268,8 +268,9 @@ def set_parameter(settings_key, def _validate_transfer_protocol_settings(self): if SettingsParameters.TransferProtocol not in self.settings: - log_and_raise( - 'Missing transfer protocol information.', RuntimeError) + msg = 'Missing transfer protocol information.' + logger.critical(msg) + raise RuntimeError(msg) protocol = self.settings[SettingsParameters.TransferProtocol] @@ -277,8 +278,9 @@ def _validate_transfer_protocol_settings(self): return if protocol != 'https': - log_and_raise('Unsupported transfer protocol: {}.'.format( - protocol), RuntimeError) + msg = f'Unsupported transfer protocol: {protocol}' + logger.critical(msg) + raise RuntimeError(msg) self._validate_cert_key_state( 'The parameter(s) {} must be set.', @@ -309,7 +311,8 @@ def _validate_cert_key_state(msg, cert_valid, key_valid): err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE) if err is not None: - log_and_raise(err, RuntimeError) + logger.critical(err) + raise RuntimeError(err) def _parse_pwd_file(self): succeeded, self.credentials = parse_pwd_file( diff --git a/tabpy-server/tabpy_server/app/util.py b/tabpy-server/tabpy_server/app/util.py index 4e475035..bd02203c 100644 --- a/tabpy-server/tabpy_server/app/util.py +++ b/tabpy-server/tabpy_server/app/util.py @@ -8,14 +8,6 @@ logger = logging.getLogger(__name__) -def log_and_raise(msg, exception_type): - ''' - Log the message and raise an exception of specified type - ''' - logger.fatal(msg) - raise exception_type(msg) - - def validate_cert(cert_file_path): with open(cert_file_path, 'r') as f: cert_buf = f.read() @@ -31,13 +23,15 @@ def validate_cert(cert_file_path): https_error = 'Error using HTTPS: ' if now < not_before: - log_and_raise(https_error + - 'The certificate provided is not valid until {}.'.format( - not_before), RuntimeError) + msg = (https_error + + f'The certificate provided is not valid until {not_before}.') + logger.critical(msg) + raise RuntimeError(msg) if now > not_after: - log_and_raise(https_error + - f'The certificate provided expired on {not_after}.', - RuntimeError) + msg = (https_error + + f'The certificate provided expired on {not_after}.') + logger.critical(msg) + raise RuntimeError(msg) def parse_pwd_file(pwd_file_name): @@ -58,10 +52,10 @@ def parse_pwd_file(pwd_file_name): credentials : dict Credentials from the file. Empty if succeeded is False. ''' - logger.info('Parsing passwords file {}...'.format(pwd_file_name)) + logger.info(f'Parsing passwords file {pwd_file_name}...') if not os.path.isfile(pwd_file_name): - logger.fatal('Passwords file {} not found'.format(pwd_file_name)) + logger.critical(f'Passwords file {pwd_file_name} not found') return False, {} credentials = {} diff --git a/tabpy-server/tabpy_server/common/endpoint_file_mgr.py b/tabpy-server/tabpy_server/common/endpoint_file_mgr.py index d1b79beb..6eee07cc 100644 --- a/tabpy-server/tabpy_server/common/endpoint_file_mgr.py +++ b/tabpy-server/tabpy_server/common/endpoint_file_mgr.py @@ -13,25 +13,28 @@ import shutil from re import compile as _compile -logger = logging.getLogger(__name__) _name_checker = _compile(r'^[a-zA-Z0-9-_\s]+$') -def _check_endpoint_name(name): +def _check_endpoint_name(name, logger=logging.getLogger(__name__)): """Checks that the endpoint name is valid by comparing it with an RE and checking that it is not reserved.""" if not isinstance(name, str): - log_and_raise("Endpoint name must be a string or unicode", TypeError) + msg = 'Endpoint name must be a string or unicode' + logger.log(logging.CRITICAL, msg) + raise TypeError(msg) if name == '': - log_and_raise("Endpoint name cannot be empty", ValueError) + msg = 'Endpoint name cannot be empty' + logger.log(logging.CRITICAL, msg) + raise ValueError(msg) if not _name_checker.match(name): - log_and_raise( - 'Endpoint name can only contain: a-z, A-Z, 0-9,' - ' underscore, hyphens and spaces.', - ValueError) + msg = ('Endpoint name can only contain: a-z, A-Z, 0-9,' + ' underscore, hyphens and spaces.') + logger.log(logging.CRITICAL, msg) + raise ValueError(msg) def grab_files(directory): @@ -51,12 +54,9 @@ def grab_files(directory): yield full_path -def get_local_endpoint_file_path(name, version, query_path): - _check_endpoint_name(name) - return os.path.join(query_path, name, str(version)) - - -def cleanup_endpoint_files(name, query_path, retain_versions=None): +def cleanup_endpoint_files(name, query_path, + logger=logging.getLogger(__name__), + retain_versions=None): ''' Cleanup the disk space a certain endpiont uses. @@ -70,7 +70,7 @@ def cleanup_endpoint_files(name, query_path, retain_versions=None): folder for the given version, otherwise, all files for that endpoint are removed. ''' - _check_endpoint_name(name) + _check_endpoint_name(name, logger=logger) local_dir = os.path.join(query_path, name) # nothing to clean, this is true for state file path where we load @@ -84,7 +84,7 @@ def cleanup_endpoint_files(name, query_path, retain_versions=None): else: retain_folders = [os.path.join(local_dir, str(version)) for version in retain_versions] - logger.info("Retain folder: %s" % retain_folders) + logger.log(logging.INFO, f'Retain folders: {retain_folders}') for file_or_dir in os.listdir(local_dir): candidate_dir = os.path.join(local_dir, file_or_dir) diff --git a/tabpy-server/tabpy_server/common/util.py b/tabpy-server/tabpy_server/common/util.py index b90d3023..99baf73e 100644 --- a/tabpy-server/tabpy_server/common/util.py +++ b/tabpy-server/tabpy_server/common/util.py @@ -5,11 +5,3 @@ def format_exception(e, context): err_msg = "%s : " % e.__class__.__name__ err_msg += "%s" % str(e) return err_msg - - -def format_exception_DEBUG(e, context, detail=0): - trace = traceback.format_exc() - err_msg = "Traceback\n %s\n" % trace - err_msg += "Error type : %s\n" % e.__class__.__name__ - err_msg += "Error message : %s\n" % str(e) - return "Error when %s: %s" % (context, err_msg) diff --git a/tabpy-server/tabpy_server/handlers/base_handler.py b/tabpy-server/tabpy_server/handlers/base_handler.py index 316bd5f5..47d22996 100644 --- a/tabpy-server/tabpy_server/handlers/base_handler.py +++ b/tabpy-server/tabpy_server/handlers/base_handler.py @@ -1,20 +1,126 @@ import base64 import binascii import concurrent -import tornado.web import json import logging - +import tornado.web from tabpy_server.app.SettingsParameters import SettingsParameters from tabpy_server.handlers.util import hash_password +import uuid + -logger = logging.getLogger(__name__) STAGING_THREAD = concurrent.futures.ThreadPoolExecutor(max_workers=3) -class BaseHandler(tornado.web.RequestHandler): - KEYS_TO_SANITIZE = ("api key", "api_key", "admin key", "admin_key") +class ContextLoggerWrapper(object): + ''' + This class appends request context to logged messages. + ''' + @staticmethod + def _generate_call_id(): + return str(uuid.uuid4()) + + def __init__(self, request: tornado.httputil.HTTPServerRequest): + self.call_id = self._generate_call_id() + self.set_request(request) + + self.tabpy_username = None + self.log_request_context = False + self.request_context_logged = False + + def set_request(self, request: tornado.httputil.HTTPServerRequest): + ''' + Set HTTP(S) request for logger. Headers will be used to + append request data as client information, Tableau user name, etc. + ''' + self.remote_ip = request.remote_ip + self.method = request.method + self.url = request.full_url() + + if 'TabPy-Client' in request.headers: + self.client = request.headers['TabPy-Client'] + else: + self.client = None + + if 'TabPy-User' in request.headers: + self.tableau_username = request.headers['TabPy-User'] + else: + self.tableau_username = None + + def set_tabpy_username(self, tabpy_username: str): + self.tabpy_username = tabpy_username + + def enable_context_logging(self, enable: bool): + ''' + Enable/disable request context information logging. + + Parameters + ---------- + enable: bool + If True request context information will be logged and + every log entry for a request handler will have call ID + with it. + ''' + self.log_request_context = enable + + def _log_context_info(self): + if not self.log_request_context: + return + + context = f'Call ID: {self.call_id}' + + if self.remote_ip is not None: + context += f', Caller: {self.remote_ip}' + + if self.method is not None: + context += f', Method: {self.method}' + + if self.url is not None: + context += f', URL: {self.url}' + + if self.client is not None: + context += f', Client: {self.client}' + if self.tableau_username is not None: + context += f', Tableau user: {self.tableau_username}' + + if self.tabpy_username is not None: + context += f', TabPy user: {self.tabpy_username}' + + logging.getLogger(__name__).log(logging.INFO, context) + self.request_context_logged = True + + def log(self, level: int, msg: str): + ''' + Log message with or without call ID. If call context is logged and + call ID added to any log entry is specified by if context logging + is enabled (see CallContext.enable_context_logging for more details). + + Parameters + ---------- + level: int + Log level: logging.CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET. + + msg: str + Message format string. + + args + Same as args in Logger.debug(). + + kwargs + Same as kwargs in Logger.debug(). + ''' + extended_msg = msg + if self.log_request_context: + if not self.request_context_logged: + self._log_context_info() + + extended_msg += f', <>' + + logging.getLogger(__name__).log(level, extended_msg) + + +class BaseHandler(tornado.web.RequestHandler): def initialize(self, app): self.tabpy_state = app.tabpy_state # set content type to application/json @@ -22,9 +128,16 @@ def initialize(self, app): self.port = self.settings[SettingsParameters.Port] self.python_service = app.python_service self.credentials = app.credentials - self.log_request_context =\ - app.settings[SettingsParameters.LogRequestContext] self.username = None + self.password = None + + self.logger = ContextLoggerWrapper(self.request) + self.logger.enable_context_logging( + app.settings[SettingsParameters.LogRequestContext]) + self.logger.log( + logging.DEBUG, + 'Checking if need to handle authentication') + self.not_authorized = not self.handle_authentication("v1") def error_out(self, code, log_message, info=None): self.set_status(code) @@ -35,7 +148,8 @@ def error_out(self, code, log_message, info=None): # loggers are misconfigured or causing the failure # themselves print(info) - logger.error( + self.logger.log( + logging.ERROR, 'Responding with status={}, message="{}", info="{}"'. format(code, log_message, info)) self.finish() @@ -53,25 +167,20 @@ def _add_CORS_header(self): origin = self.tabpy_state.get_access_control_allow_origin() if len(origin) > 0: self.set_header("Access-Control-Allow-Origin", origin) - logger.debug(self.append_request_context( - "Access-Control-Allow-Origin:{}".format(origin))) + self.logger.log(logging.DEBUG, + f'Access-Control-Allow-Origin:{origin}') headers = self.tabpy_state.get_access_control_allow_headers() if len(headers) > 0: self.set_header("Access-Control-Allow-Headers", headers) - logger.debug(self.append_request_context( - "Access-Control-Allow-Headers:{}".format(headers))) + self.logger.log(logging.DEBUG, + f'Access-Control-Allow-Headers:{headers}') methods = self.tabpy_state.get_access_control_allow_methods() if len(methods) > 0: self.set_header("Access-Control-Allow-Methods", methods) - logger.debug(self.append_request_context( - "Access-Control-Allow-Methods:{}".format(methods))) - - def _sanitize_request_data(self, data, keys=KEYS_TO_SANITIZE): - """Remove keys so that we can log safely""" - for key in keys: - data.pop(key, None) + self.logger.log(logging.DEBUG, + f'Access-Control-Allow-Methods:{methods}') def _get_auth_method(self, api_version) -> (bool, str): ''' @@ -96,28 +205,32 @@ def _get_auth_method(self, api_version) -> (bool, str): is not needed. ''' if api_version not in self.settings[SettingsParameters.ApiVersions]: - logger.critical(f'Unknown API version "{api_version}"') + self.logger.log(logging.CRITICAL, + f'Unknown API version "{api_version}"') return False, '' version_settings =\ self.settings[SettingsParameters.ApiVersions][api_version] if 'features' not in version_settings: - logger.info(f'No features configured for API "{api_version}"') + self.logger.log(logging.INFO, + f'No features configured for API "{api_version}"') return True, '' features = version_settings['features'] if 'authentication' not in features or\ not features['authentication']['required']: - logger.info( - f'Authentication is not a required feature for API ' - '"{api_version}"') + self.logger.log( + logging.INFO, + 'Authentication is not a required feature for API ' + f'"{api_version}"') return True, '' auth_feature = features['authentication'] if 'methods' not in auth_feature: - logger.critical( - f'Authentication method is not configured for API ' - '"{api_version}"') + self.logger.log( + logging.INFO, + 'Authentication method is not configured for API ' + f'"{api_version}"') methods = auth_feature['methods'] if 'basic-auth' in auth_feature['methods']: @@ -125,15 +238,16 @@ def _get_auth_method(self, api_version) -> (bool, str): # Add new methods here... # No known methods were found - logger.critical( + self.logger.log( + logging.CRITICAL, f'Unknown authentication method(s) "{methods}" are configured ' - 'for API "{api_version}"') + f'for API "{api_version}"') return False, '' def _get_basic_auth_credentials(self) -> bool: ''' Find credentials for basic access authentication method. Credentials if - found stored in self.username and self.password. + found stored in Credentials.username and Credentials.password. Returns ------- @@ -141,30 +255,35 @@ def _get_basic_auth_credentials(self) -> bool: True if valid credentials were found. False otherwise. ''' - logger.debug('Checking request headers for authentication data') + self.logger.log(logging.DEBUG, + 'Checking request headers for authentication data') if 'Authorization' not in self.request.headers: - logger.info('Authorization header not found') + self.logger.log(logging.INFO, 'Authorization header not found') return False auth_header = self.request.headers['Authorization'] auth_header_list = auth_header.split(' ') if len(auth_header_list) != 2 or\ auth_header_list[0] != 'Basic': - logger.error(f'Unknown authentication method "{auth_header}"') + self.logger.log(logging.ERROR, + f'Unknown authentication method "{auth_header}"') return False try: cred = base64.b64decode(auth_header_list[1]).decode('utf-8') except (binascii.Error, UnicodeDecodeError) as ex: - logger.critical(f'Cannot decode credentials: {str(ex)}') + self.logger.log(logging.CRITICAL, + f'Cannot decode credentials: {str(ex)}') return False login_pwd = cred.split(':') if len(login_pwd) != 2: - logger.error('Invalid string in encoded credentials') + self.logger.log(logging.ERROR, + 'Invalid string in encoded credentials') return False self.username = login_pwd[0] + self.logger.set_tabpy_username(self.username) self.password = login_pwd[1] return True @@ -189,9 +308,10 @@ def _get_credentials(self, method) -> bool: # Add new methods here... # No known methods were found - logger.critical( + self.logger.log( + logging.CRITICAL, f'Unknown authentication method(s) "{method}" are configured ' - 'for API "{api_version}"') + f'for API "{api_version}"') return False def _validate_basic_auth_credentials(self) -> bool: @@ -207,14 +327,17 @@ def _validate_basic_auth_credentials(self) -> bool: otherwise. ''' login = self.username.lower() - logger.debug(f'Validating credentials for user name "{login}"') + self.logger.log(logging.DEBUG, + f'Validating credentials for user name "{login}"') if login not in self.credentials: - logger.error(f'User name "{self.username}" not found') + self.logger.log(logging.ERROR, + f'User name "{self.username}" not found') return False - hashed_pwd = hash_password(self.username, self.password) + hashed_pwd = hash_password(login, self.password) if self.credentials[login].lower() != hashed_pwd.lower(): - logger.error(f'Wrong password for user name "{self.username}"') + self.logger.log(logging.ERROR, + f'Wrong password for user name "{self.username}"') return False return True @@ -240,9 +363,10 @@ def _validate_credentials(self, method) -> bool: # Add new methods here... # No known methods were found - logger.critical( + self.logger.log( + logging.CRITICAL, f'Unknown authentication method(s) "{method}" are configured ' - 'for API "{api_version}"') + f'for API "{api_version}"') return False def handle_authentication(self, api_version) -> bool: @@ -263,7 +387,7 @@ def handle_authentication(self, api_version) -> bool: credentials provided. False otherwise. ''' - logger.debug('Handling authentication') + self.logger.log(logging.DEBUG, 'Handling authentication') found, method = self._get_auth_method(api_version) if not found: return False @@ -290,16 +414,15 @@ def should_fail_with_not_authorized(self): required and validation for credentials passes. True if validation for credentials failed. ''' - logger.debug(self.append_request_context( - 'Checking if need to handle authentication')) - return not self.handle_authentication("v1") + return self.not_authorized def fail_with_not_authorized(self): ''' Prepares server 401 response. ''' - logger.error(self.append_request_context( - 'Failing with 401 for unauthorized request')) + self.logger.log( + logging.ERROR, + 'Failing with 401 for unauthorized request') self.set_status(401) self.set_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.tabpy_state.name)) @@ -307,23 +430,3 @@ def fail_with_not_authorized(self): 401, info="Unauthorized request.", log_message="Invalid credentials provided.") - - def append_request_context(self, msg) -> str: - ''' - Adds request context (caller info) to logged messages. - ''' - context = '' - if self.log_request_context: - # log request details - context = (f'{self.request.remote_ip} calls ' - f'{self.request.method} {self.request.full_url()}') - if 'TabPy-Client' in self.request.headers: - context += f', Client: {self.request.headers["TabPy-Client"]}' - if 'TabPy-User' in self.request.headers: - context +=\ - f', Tableau user: {self.request.headers["TabPy-User"]}' - if self.username is not None and self.username != '': - context += f', TabPy user: {self.username}' - context += '\n' - - return context + msg diff --git a/tabpy-server/tabpy_server/handlers/endpoint_handler.py b/tabpy-server/tabpy_server/handlers/endpoint_handler.py index f5a69a9d..f3234c33 100644 --- a/tabpy-server/tabpy_server/handlers/endpoint_handler.py +++ b/tabpy-server/tabpy_server/handlers/endpoint_handler.py @@ -19,9 +19,6 @@ import shutil -logger = logging.getLogger(__name__) - - class EndpointHandler(ManagementHandler): def initialize(self, app): super(EndpointHandler, self).initialize(app) @@ -31,8 +28,8 @@ def get(self, endpoint_name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - f'Processing GET for /endpoints/{endpoint_name}')) + self.logger.log(logging.DEBUG, + f'Processing GET for /endpoints/{endpoint_name}') self._add_CORS_header() if not endpoint_name: @@ -52,8 +49,8 @@ def put(self, name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - f'Processing PUT for /endpoints/{name}')) + self.logger.log(logging.DEBUG, + f'Processing PUT for /endpoints/{name}') try: if not self.request.body: @@ -80,8 +77,9 @@ def put(self, name): return new_version = int(endpoints[name]['version']) + 1 - logger.info(self.append_request_context( - 'Endpoint info: %s' % request_data)) + self.logger.log( + logging.INFO, + f'Endpoint info: {request_data}') err_msg = yield self._add_or_update_endpoint( 'update', name, new_version, request_data) if err_msg: @@ -103,8 +101,9 @@ def delete(self, name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - 'Processing DELETE for /endpoints/{}'.format(name))) + self.logger.log( + logging.DEBUG, + f'Processing DELETE for /endpoints/{name}') try: endpoints = self.tabpy_state.get_endpoints(name) @@ -143,7 +142,8 @@ def delete(self, name): self.error_out(500, err_msg) self.finish() - on_state_change(self.settings, self.tabpy_state, self.python_service) + on_state_change(self.settings, self.tabpy_state, self.python_service, + self) @gen.coroutine def _delete_po_future(self, delete_path): diff --git a/tabpy-server/tabpy_server/handlers/endpoints_handler.py b/tabpy-server/tabpy_server/handlers/endpoints_handler.py index 88557cea..4181ceea 100644 --- a/tabpy-server/tabpy_server/handlers/endpoints_handler.py +++ b/tabpy-server/tabpy_server/handlers/endpoints_handler.py @@ -14,9 +14,6 @@ from tabpy_server.common.util import format_exception -logger = logging.getLogger(__name__) - - class EndpointsHandler(ManagementHandler): def initialize(self, app): super(EndpointsHandler, self).initialize(app) @@ -36,9 +33,6 @@ def post(self): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - 'Processing POST for /endpoints')) - try: if not self.request.body: self.error_out(400, "Input body cannot be empty") @@ -70,15 +64,17 @@ def post(self): self.finish() return - logger.debug(self.append_request_context( - "Adding endpoint '{}'".format(name))) + self.logger.log( + logging.DEBUG, + f'Adding endpoint "{name}"') err_msg = yield self._add_or_update_endpoint('add', name, 1, request_data) if err_msg: self.error_out(400, err_msg) else: - logger.debug(self.append_request_context( - "Endpoint {} successfully added".format(name))) + self.logger.log( + logging.DEBUG, + f'Endpoint {name} successfully added') self.set_status(201) self.write(self.tabpy_state.get_endpoints(name)) self.finish() diff --git a/tabpy-server/tabpy_server/handlers/evaluation_plane_handler.py b/tabpy-server/tabpy_server/handlers/evaluation_plane_handler.py index 222d32b0..7b374fa2 100644 --- a/tabpy-server/tabpy_server/handlers/evaluation_plane_handler.py +++ b/tabpy-server/tabpy_server/handlers/evaluation_plane_handler.py @@ -8,16 +8,14 @@ import sys -logger = logging.getLogger(__name__) - - class RestrictedTabPy: - def __init__(self, port): + def __init__(self, port, logger): self.port = port + self.logger = logger def query(self, name, *args, **kwargs): url = f'http://localhost:{self.port}/query/{name}' - logger.debug(f'Querying {url}...') + self.logger.log(logging.DEBUG, f'Querying {url}...') internal_data = {'data': args or kwargs} data = json.dumps(internal_data) headers = {'content-type': 'application/json'} @@ -43,9 +41,6 @@ def post(self): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - 'Processing POST for /evaluate')) - self._add_CORS_header() try: body = json.loads(self.request.body.decode('utf-8')) @@ -76,14 +71,13 @@ def post(self): 'the format _arg1, _arg2, _argN') return - function_to_evaluate = ( - 'def _user_script(tabpy' - + arguments_str + '):\n') + function_to_evaluate = f'def _user_script(tabpy{arguments_str}):\n' for u in user_code.splitlines(): function_to_evaluate += ' ' + u + '\n' - logger.info(self.append_request_context( - f'function to evaluate={function_to_evaluate}')) + self.logger.log( + logging.INFO, + f'function to evaluate={function_to_evaluate}') result = yield self.call_subprocess(function_to_evaluate, arguments) @@ -110,7 +104,7 @@ def post(self): @gen.coroutine def call_subprocess(self, function_to_evaluate, arguments): - restricted_tabpy = RestrictedTabPy(self.port) + restricted_tabpy = RestrictedTabPy(self.port, self) # Exec does not run the function, so it does not block. if sys.version_info > (3, 0): exec(function_to_evaluate, globals()) diff --git a/tabpy-server/tabpy_server/handlers/management_handler.py b/tabpy-server/tabpy_server/handlers/management_handler.py index ccd914d3..4c4b3bce 100644 --- a/tabpy-server/tabpy_server/handlers/management_handler.py +++ b/tabpy-server/tabpy_server/handlers/management_handler.py @@ -56,16 +56,19 @@ def _add_or_update_endpoint(self, action, name, version, request_data): logging.debug("Adding/updating model {}...".format(name)) _name_checker = _compile('^[a-zA-Z0-9-_\\s]+$') if not isinstance(name, (str, unicode)): - log_and_raise( - "Endpoint name must be a string or unicode", TypeError) + msg = 'Endpoint name must be a string or unicode' + self.logger.log(logging.CRITICAL, msg) + raise TypeError(msg) if not _name_checker.match(name): raise gen.Return('endpoint name can only contain: a-z, A-Z, 0-9,' ' underscore, hyphens and spaces.') if self.settings.get('add_or_updating_endpoint'): - log_and_raise("Another endpoint update is already in progress" - ", please wait a while and try again", RuntimeError) + msg = ('Another endpoint update is already in progress' + ', please wait a while and try again') + self.logger.log(logging.CRITICAL, msg) + raise RuntimeError(msg) request_uuid = random_uuid() self.settings['add_or_updating_endpoint'] = request_uuid @@ -145,7 +148,8 @@ def _add_or_update_endpoint(self, action, name, version, request_data): on_state_change(self.settings, self.tabpy_state, - self.python_service) + self.python_service, + self) finally: self.settings['add_or_updating_endpoint'] = None diff --git a/tabpy-server/tabpy_server/handlers/query_plane_handler.py b/tabpy-server/tabpy_server/handlers/query_plane_handler.py index a28a63fd..e31950e9 100644 --- a/tabpy-server/tabpy_server/handlers/query_plane_handler.py +++ b/tabpy-server/tabpy_server/handlers/query_plane_handler.py @@ -12,27 +12,11 @@ import tornado.web -logger = logging.getLogger(__name__) - - def _get_uuid(): """Generate a unique identifier string""" return str(uuid.uuid4()) -def _sanitize_request_data(data): - if not isinstance(data, dict): - log_and_raise("Expect input data to be a dictionary", RuntimeError) - - if "method" in data: - return {"data": data.get("data"), "method": data.get("method")} - elif "data" in data: - return data.get("data") - else: - log_and_raise("Expect input data is a dictionary with at least a " - "key called 'data'", RuntimeError) - - class QueryPlaneHandler(BaseHandler): def initialize(self, app): super(QueryPlaneHandler, self).initialize(app) @@ -71,8 +55,9 @@ def _query(self, po_name, data, uid, qry): 'utf-8')).hexdigest()) return (QuerySuccessful, response.for_json(), gls_time) else: - logger.error(self.append_request_context( - f'Failed query, response: {response}')) + self.logger.log( + logging.ERROR, + f'Failed query, response: {response}') return (type(response), response.for_json(), gls_time) # handle HTTP Options requests to support CORS @@ -83,8 +68,9 @@ def options(self, pred_name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - f'Processing OPTIONS for /query/{pred_name}')) + self.logger.log( + logging.DEBUG, + f'Processing OPTIONS for /query/{pred_name}') # add CORS headers if TabPy has a cors_origin specified self._add_CORS_header() @@ -116,6 +102,21 @@ def _handle_result(self, po_name, data, qry, uid): return (None, None) + def _sanitize_request_data(self, data): + if not isinstance(data, dict): + msg = 'Input data must be a dictionary' + self.logger.log(logging.CRITICAL, msg) + raise RuntimeError(msg) + + if "method" in data: + return {"data": data.get("data"), "method": data.get("method")} + elif "data" in data: + return data.get("data") + else: + msg = 'Input data must be a dictionary with a key called "data"' + self.logger.log(logging.CRITICAL, msg) + raise RuntimeError(msg) + def _process_query(self, endpoint_name, start): try: self._add_CORS_header() @@ -127,7 +128,7 @@ def _process_query(self, endpoint_name, start): request_json = self.request.body.decode('utf-8') # Sanitize input data - data = _sanitize_request_data(json.loads(request_json)) + data = self._sanitize_request_data(json.loads(request_json)) except Exception as e: err_msg = format_exception(e, "Invalid Input Data") self.error_out(400, err_msg) @@ -153,8 +154,9 @@ def _process_query(self, endpoint_name, start): return if po_name != endpoint_name: - logger.info(self.append_request_context( - f'Querying actual model: po_name={po_name}')) + self.logger.log( + logging.INFO, + f'Querying actual model: po_name={po_name}') uid = _get_uuid() @@ -205,9 +207,6 @@ def get(self, endpoint_name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - f'Processing GET for /query/{endpoint_name}')) - start = time.time() if sys.version_info > (3, 0): endpoint_name = urllib.parse.unquote(endpoint_name) @@ -221,9 +220,6 @@ def post(self, endpoint_name): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - f'Processing POST for /query/{endpoint_name}')) - start = time.time() if sys.version_info > (3, 0): endpoint_name = urllib.parse.unquote(endpoint_name) diff --git a/tabpy-server/tabpy_server/handlers/service_info_handler.py b/tabpy-server/tabpy_server/handlers/service_info_handler.py index 74fbc8e8..1d1c3059 100644 --- a/tabpy-server/tabpy_server/handlers/service_info_handler.py +++ b/tabpy-server/tabpy_server/handlers/service_info_handler.py @@ -1,12 +1,8 @@ import json -import logging from tabpy_server.app.SettingsParameters import SettingsParameters from tabpy_server.handlers import ManagementHandler -logger = logging.getLogger(__name__) - - class ServiceInfoHandler(ManagementHandler): def initialize(self, app): super(ServiceInfoHandler, self).initialize(app) @@ -15,8 +11,6 @@ def get(self): # do not check for authentication - this method # is the only way for client to collect info about # supported API versions and required features - logger.debug(self.append_request_context( - 'Processing GET for /info')) self._add_CORS_header() info = {} info['description'] = self.tabpy_state.get_description() diff --git a/tabpy-server/tabpy_server/handlers/status_handler.py b/tabpy-server/tabpy_server/handlers/status_handler.py index 4297f98f..60dd9a70 100644 --- a/tabpy-server/tabpy_server/handlers/status_handler.py +++ b/tabpy-server/tabpy_server/handlers/status_handler.py @@ -3,9 +3,6 @@ from tabpy_server.handlers import BaseHandler -logger = logging.getLogger(__name__) - - class StatusHandler(BaseHandler): def initialize(self, app): super(StatusHandler, self).initialize(app) @@ -17,8 +14,6 @@ def get(self): self._add_CORS_header() - logger.debug(self.append_request_context( - "Obtaining service status")) status_dict = {} for k, v in self.python_service.ps.query_objects.items(): status_dict[k] = { @@ -27,8 +22,9 @@ def get(self): 'status': v['status'], 'last_error': v['last_error']} - logger.debug(self.append_request_context( - f'Found models: {status_dict}')) + self.logger.log( + logging.DEBUG, + f'Found models: {status_dict}') self.write(json.dumps(status_dict)) self.finish() return diff --git a/tabpy-server/tabpy_server/handlers/upload_destination_handler.py b/tabpy-server/tabpy_server/handlers/upload_destination_handler.py index 33a559f1..f94ef630 100644 --- a/tabpy-server/tabpy_server/handlers/upload_destination_handler.py +++ b/tabpy-server/tabpy_server/handlers/upload_destination_handler.py @@ -4,8 +4,6 @@ import os -logger = logging.getLogger(__name__) - _QUERY_OBJECT_STAGING_FOLDER = 'staging' @@ -18,9 +16,6 @@ def get(self): self.fail_with_not_authorized() return - logger.debug(self.append_request_context( - 'Processing GET for /configurations/endpoint_upload_destination')) - path = self.settings[SettingsParameters.StateFilePath] path = os.path.join(path, _QUERY_OBJECT_STAGING_FOLDER) self.write({"path": path}) diff --git a/tabpy-server/tabpy_server/handlers/util.py b/tabpy-server/tabpy_server/handlers/util.py index 99617d87..fb80f673 100755 --- a/tabpy-server/tabpy_server/handlers/util.py +++ b/tabpy-server/tabpy_server/handlers/util.py @@ -1,11 +1,8 @@ import base64 import binascii from hashlib import pbkdf2_hmac -import logging from tabpy_server.app.SettingsParameters import SettingsParameters -logger = logging.getLogger(__name__) - def hash_password(username, pwd): ''' diff --git a/tabpy-server/tabpy_server/management/state.py b/tabpy-server/tabpy_server/management/state.py index 8ec2e492..6c033c45 100644 --- a/tabpy-server/tabpy_server/management/state.py +++ b/tabpy-server/tabpy_server/management/state.py @@ -49,40 +49,6 @@ def wrapper(self, *args, **kwargs): return wrapper -def load_state_from_str(state_string): - ''' - Convert from String to ConfigParser - ''' - if state_string: - try: - config = ConfigParser(allow_no_value=True) - config.optionxform = str - config.readfp(StringIO(state_string)) - return config - except Exception as e: - log_and_raise("Invalid state string %s" % str(e), ValueError) - else: - log_and_raise("State string is empty!", ValueError) - - -def save_state_to_str(config): - ''' - Convert from ConfigParser to String - ''' - if not config: - log_and_raise("Invalid config", ValueError) - value = None - try: - string_f = StringIO() - config.write(string_f) - value = string_f.getvalue() - except Exception: - logger.error("Cannot convert config to string") - finally: - string_f.close() - return value - - def _get_root_path(state_path): if state_path[-1] != '/': return state_path + '/' @@ -127,7 +93,9 @@ def __init__(self, settings, config=None): self.set_config(config, _update=False) @state_lock - def set_config(self, config, _update=True): + def set_config(self, config, + logger=logging.getLogger(__name__), + _update=True): ''' Set the local ConfigParser manually. This new ConfigParser will be used as current state. @@ -136,7 +104,7 @@ def set_config(self, config, _update=True): raise ValueError("Invalid config") self.config = config if _update: - self._write_state() + self._write_state(logger) def get_endpoints(self, name=None): ''' @@ -559,6 +527,7 @@ def _set_revision_number(self, revision_number): logger.error("Unable to set revision number: %s" % e) def _remove_config_option(self, section_name, option_name, + logger=logging.getLogger(__name__), _update_revision=True): if not self.config: raise ValueError("State configuration not yet loaded.") @@ -566,7 +535,7 @@ def _remove_config_option(self, section_name, option_name, # update revision number if _update_revision: self._increase_revision_number() - self._write_state() + self._write_state(logger=logger) def _has_config_value(self, section_name, option_name): if not self.config: @@ -581,19 +550,20 @@ def _increase_revision_number(self): str(cur_rev + 1)) def _set_config_value(self, section_name, option_name, option_value, + logger=logging.getLogger(__name__), _update_revision=True): if not self.config: raise ValueError("State configuration not yet loaded.") if not self.config.has_section(section_name): - logger.debug("Adding config section {}".format(section_name)) + logger.log(logging.DEBUG, f'Adding config section {section_name}') self.config.add_section(section_name) self.config.set(section_name, option_name, option_value) # update revision number if _update_revision: self._increase_revision_number() - self._write_state() + self._write_state(logger=logger) def _get_config_items(self, section_name): if not self.config: @@ -616,9 +586,9 @@ def _get_config_value(self, section_name, option_name, optional=False, raise ValueError("Cannot find option name %s under section %s" % (option_name, section_name)) - def _write_state(self): + def _write_state(self, logger=logging.getLogger(__name__)): ''' Write state (ConfigParser) to Consul ''' - logger.info("Writing state to config") - write_state_config(self.config, self.settings) + logger.log(logging.INFO, 'Writing state to config') + write_state_config(self.config, self.settings, logger=logger) diff --git a/tabpy-server/tabpy_server/management/util.py b/tabpy-server/tabpy_server/management/util.py index 9a05fd39..6bbec130 100644 --- a/tabpy-server/tabpy_server/management/util.py +++ b/tabpy-server/tabpy_server/management/util.py @@ -7,22 +7,18 @@ from datetime import datetime, timedelta, tzinfo from tabpy_server.app.ConfigParameters import ConfigParameters from tabpy_server.app.SettingsParameters import SettingsParameters -from tabpy_server.app.util import log_and_raise from time import mktime -logger = logging.getLogger(__name__) - -def write_state_config(state, settings): +def write_state_config(state, settings, logger=logging.getLogger(__name__)): if SettingsParameters.StateFilePath in settings: state_path = settings[SettingsParameters.StateFilePath] else: - log_and_raise( - '{} is not set'.format( - ConfigParameters.TABPY_STATE_PATH), - ValueError) + msg = f'{ConfigParameters.TABPY_STATE_PATH} is not set' + logger.log(logging.CRITICAL, msg) + raise ValueError(msg) - logger.debug("State path is {}".format(state_path)) + logger.log(logging.DEBUG, f'State path is {state_path}') state_key = os.path.join(state_path, 'state.ini') tmp_state_file = state_key @@ -30,23 +26,24 @@ def write_state_config(state, settings): state.write(f) -def _get_state_from_file(state_path): +def _get_state_from_file(state_path, logger=logging.getLogger(__name__)): state_key = os.path.join(state_path, 'state.ini') tmp_state_file = state_key if not os.path.exists(tmp_state_file): - log_and_raise( - "Missing config file at %r" % - (tmp_state_file,), ValueError) + msg = f'Missing config file at {tmp_state_file}' + logger.log(logging.CRITICAL, msg) + raise ValueError(msg) config = _ConfigParser(allow_no_value=True) config.optionxform = str config.read(tmp_state_file) if not config.has_section('Service Info'): - log_and_raise( - "Config error: Expected 'Service Info' section in %s" % - (tmp_state_file,), ValueError) + msg = ('Config error: Expected [Service Info] section in ' + f'{tmp_state_file}') + logger.log(logging.CRITICAL, msg) + raise ValueError(msg) return config @@ -82,15 +79,3 @@ def __repr__(self): def __str__(self): return "UTC" - - -_utc = _UTC() - - -def _dt_to_utc_timestamp(t): - if t.tzname() == 'UTC': - return (t - datetime(1970, 1, 1, tzinfo=_utc)).total_seconds() - elif not t.tzinfo: - return mktime(t.timetuple()) - else: - log_and_raise('Only local time and UTC time is supported', ValueError) diff --git a/tabpy-server/tabpy_server/psws/callbacks.py b/tabpy-server/tabpy_server/psws/callbacks.py index c8fa327e..c1b13a73 100644 --- a/tabpy-server/tabpy_server/psws/callbacks.py +++ b/tabpy-server/tabpy_server/psws/callbacks.py @@ -1,17 +1,14 @@ import logging import sys -from time import sleep - -from tornado import gen - from tabpy_server.app.SettingsParameters import SettingsParameters from tabpy_server.common.messages import ( LoadObject, DeleteObjects, ListObjects, ObjectList) from tabpy_server.common.endpoint_file_mgr import cleanup_endpoint_files from tabpy_server.common.util import format_exception from tabpy_server.management.state import TabPyState, get_query_object_path - from tabpy_server.management import util +from time import sleep +from tornado import gen logger = logging.getLogger(__name__) @@ -135,11 +132,13 @@ def _get_latest_service_state(settings, @gen.coroutine -def on_state_change(settings, tabpy_state, python_service): +def on_state_change(settings, tabpy_state, python_service, + logger=logging.getLogger(__name__)): try: logger.info("Loading state from state file") config = util._get_state_from_file( - settings[SettingsParameters.StateFilePath]) + settings[SettingsParameters.StateFilePath], + logger=logger) new_ps_state = TabPyState(config=config, settings=settings) (has_changes, changes) = _get_latest_service_state(settings, @@ -161,7 +160,8 @@ def on_state_change(settings, tabpy_state, python_service): python_service.manage_request(DeleteObjects([object_name])) cleanup_endpoint_files(object_name, - settings[SettingsParameters.UploadDir]) + settings[SettingsParameters.UploadDir], + logger=logger) else: endpoint_info = new_endpoints[object_name] @@ -180,8 +180,9 @@ def on_state_change(settings, tabpy_state, python_service): # cleanup old version of endpoint files if object_version > 2: cleanup_endpoint_files( - object_name, settings[SettingsParameters.UploadDir], [ - object_version, object_version - 1]) + object_name, settings[SettingsParameters.UploadDir], + logger=logger, + retain_versions=[object_version, object_version - 1]) except Exception as e: err_msg = format_exception(e, 'on_state_change')