diff --git a/.gitignore b/.gitignore index b410dc2a..615fa4d6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.pyc -dist \ No newline at end of file +dist +tags diff --git a/testgres/testgres.py b/testgres/testgres.py index 05ee6600..5d41a3fd 100644 --- a/testgres/testgres.py +++ b/testgres/testgres.py @@ -31,6 +31,11 @@ import time import six +import threading +import logging +import select +import tempfile + # Try to use psycopg2 by default. If psycopg2 isn"t available then use # pg8000 which is slower but much more portable because uses only # pure-Python code @@ -44,6 +49,7 @@ registered_nodes = [] +util_threads = [] last_assigned_port = int(random.random() * 16384) + 49152 pg_config_data = {} @@ -74,6 +80,54 @@ def __str__(self): return '\n ERROR: {0}\n CMD: {1}'.format(self.error_text, self.cmd) +class LogWriter(threading.Thread): + ''' + Helper class to implement reading from postgresql.log and redirect + it python logging + ''' + + def __init__(self, node_name, fd): + assert callable(fd.readline) + + threading.Thread.__init__(self) + + self.node_name = node_name + self.fd = fd + self.stop_event = threading.Event() + self.logger = logging.getLogger(node_name) + self.logger.setLevel(logging.INFO) + + def run(self): + while self.fd in select.select([self.fd], [], [], 0)[0]: + line = self.fd.readline() + if line: + extra = {'node': self.node_name} + self.logger.info(line.strip(), extra=extra) + elif self.stopped(): + break + else: + time.sleep(0.1) + + def stop(self): + self.stop_event.set() + + def stopped(self): + return self.stop_event.isSet() + + +def log_watch(node_name, pg_logname): + ''' Starts thread for node that redirects postgresql logs + to python logging system + ''' + + reader = LogWriter(node_name, open(pg_logname, 'r')) + reader.start() + + global util_threads + util_threads.append(reader) + return reader + + class NodeConnection(object): """ @@ -148,7 +202,7 @@ def close(self): class PostgresNode(object): - def __init__(self, name, port, base_dir=None): + def __init__(self, name, port, base_dir=None, use_logging=False): self.name = name self.host = '127.0.0.1' self.port = port @@ -160,6 +214,9 @@ def __init__(self, name, port, base_dir=None): os.makedirs(self.logs_dir) self.working = False + self.use_logging = use_logging + self.logger = None + @property def data_dir(self): return os.path.join(self.base_dir, "data") @@ -316,7 +373,15 @@ def pg_ctl(self, command, params={}, command_options=[]): def start(self, params={}): """ Starts cluster """ - logfile = os.path.join(self.logs_dir, "postgresql.log") + + if self.use_logging: + tmpfile = tempfile.NamedTemporaryFile('w', dir=self.logs_dir, delete=False) + logfile = tmpfile.name + + self.logger = log_watch(self.name, logfile) + else: + logfile = os.path.join(self.logs_dir, "postgresql.log") + _params = { "-D": self.data_dir, "-w": None, @@ -387,6 +452,9 @@ def stop(self, params={}): _params.update(params) self.pg_ctl("stop", _params) + if self.logger: + self.logger.stop() + self.working = False return self @@ -623,7 +691,7 @@ def version_to_num(version): return num -def get_new_node(name, base_dir=None): +def get_new_node(name, base_dir=None, use_logging=False): global registered_nodes global last_assigned_port @@ -647,7 +715,7 @@ def get_new_node(name, base_dir=None): # socket.SOCK_STREAM, # socket.getprotobyname("tcp")) - node = PostgresNode(name, port, base_dir) + node = PostgresNode(name, port, base_dir, use_logging=use_logging) registered_nodes.append(node) last_assigned_port = port @@ -663,7 +731,12 @@ def clean_all(): def stop_all(): global registered_nodes + global util_threads + for node in registered_nodes: # stop server if it still working if node.working: node.stop() + + for thread in util_threads: + thread.stop() diff --git a/testgres/tests/test_simple.py b/testgres/tests/test_simple.py old mode 100644 new mode 100755 index 2fd5525c..f33781c8 --- a/testgres/tests/test_simple.py +++ b/testgres/tests/test_simple.py @@ -1,4 +1,11 @@ +#!/usr/bin/env python + import unittest +import re +import six +import tempfile +import logging.config + from testgres import get_new_node, stop_all @@ -73,7 +80,50 @@ def test_users(self): node.init().start() node.psql('postgres', 'create role test_user login') value = node.safe_psql('postgres', 'select 1', username='test_user') - self.assertEqual(value, '1\n') + self.assertEqual(value, six.b('1\n')) + + def test_logging(self): + regex = re.compile('\w+:\s{1}LOG:.*') + logfile = tempfile.NamedTemporaryFile('w', delete=True) + + log_conf = { + 'version': 1, + 'handlers': { + 'file': { + 'class': 'logging.FileHandler', + 'filename': logfile.name, + 'formatter': 'base_format', + 'level': logging.DEBUG, + }, + }, + 'formatters': { + 'base_format': { + 'format': '%(node)-5s: %(message)s', + }, + }, + 'root': { + 'handlers': ('file', ), + 'level': 'DEBUG', + }, + } + + logging.config.dictConfig(log_conf) + + node = get_new_node('master', use_logging=True) + node1 = get_new_node('slave1', use_logging=True) + node2 = get_new_node('slave2', use_logging=True) + + node.init().start() + node1.init().start() + node2.init().start() + + with open(logfile.name, 'r') as log: + for line in log: + self.assertTrue(regex.match(line)) + + node.stop() + node1.stop() + node2.stop() if __name__ == '__main__':