3131import time
3232import six
3333
34+ import threading
35+ import logging
36+ import select
37+ import tempfile
38+
3439# Try to use psycopg2 by default. If psycopg2 isn"t available then use
3540# pg8000 which is slower but much more portable because uses only
3641# pure-Python code
4449
4550
4651registered_nodes = []
52+ util_threads = []
4753last_assigned_port = int (random .random () * 16384 ) + 49152
4854pg_config_data = {}
4955
@@ -74,6 +80,54 @@ def __str__(self):
7480 return '\n ERROR: {0}\n CMD: {1}' .format (self .error_text , self .cmd )
7581
7682
83+ class AsyncFileReader (threading .Thread ):
84+ '''
85+ Helper class to implement asynchronous reading of a file
86+ in a separate thread.
87+ '''
88+
89+ def __init__ (self , node_name , fd ):
90+ assert callable (fd .readline )
91+
92+ threading .Thread .__init__ (self )
93+
94+ self .node_name = node_name
95+ self .fd = fd
96+ self .stop_event = threading .Event ()
97+ self .logger = logging .getLogger (node_name )
98+ self .logger .setLevel (logging .INFO )
99+
100+ def run (self ):
101+ while self .fd in select .select ([self .fd ], [], [], 0 )[0 ]:
102+ line = self .fd .readline ()
103+ if line :
104+ extra = {'node' : self .node_name }
105+ self .logger .info (line .strip (), extra = extra )
106+ elif self .stopped ():
107+ break
108+ else :
109+ time .sleep (0.1 )
110+
111+ def stop (self ):
112+ self .stop_event .set ()
113+
114+ def stopped (self ):
115+ return self .stop_event .isSet ()
116+
117+
118+ def log_watch (node_name , pg_logname ):
119+ ''' Starts thread for node that redirects postgresql logs
120+ to python logging system
121+ '''
122+
123+ reader = AsyncFileReader (node_name , open (pg_logname , 'r' ))
124+ reader .start ()
125+
126+ global util_threads
127+ util_threads .append (reader )
128+ return reader
129+
130+
77131class NodeConnection (object ):
78132
79133 """
@@ -148,7 +202,7 @@ def close(self):
148202
149203class PostgresNode (object ):
150204
151- def __init__ (self , name , port , base_dir = None ):
205+ def __init__ (self , name , port , base_dir = None , use_logging = False ):
152206 self .name = name
153207 self .host = '127.0.0.1'
154208 self .port = port
@@ -160,6 +214,9 @@ def __init__(self, name, port, base_dir=None):
160214 os .makedirs (self .logs_dir )
161215 self .working = False
162216
217+ self .use_logging = use_logging
218+ self .logger = None
219+
163220 @property
164221 def data_dir (self ):
165222 return os .path .join (self .base_dir , "data" )
@@ -316,7 +373,15 @@ def pg_ctl(self, command, params={}, command_options=[]):
316373
317374 def start (self , params = {}):
318375 """ Starts cluster """
319- logfile = os .path .join (self .logs_dir , "postgresql.log" )
376+
377+ if self .use_logging :
378+ tmpfile = tempfile .NamedTemporaryFile ('w' , dir = self .logs_dir , delete = False )
379+ logfile = tmpfile .name
380+
381+ self .logger = log_watch (self .name , logfile )
382+ else :
383+ logfile = os .path .join (self .logs_dir , "postgresql.log" )
384+
320385 _params = {
321386 "-D" : self .data_dir ,
322387 "-w" : None ,
@@ -623,7 +688,7 @@ def version_to_num(version):
623688 return num
624689
625690
626- def get_new_node (name , base_dir = None ):
691+ def get_new_node (name , base_dir = None , use_logging = False ):
627692 global registered_nodes
628693 global last_assigned_port
629694
@@ -647,7 +712,7 @@ def get_new_node(name, base_dir=None):
647712 # socket.SOCK_STREAM,
648713 # socket.getprotobyname("tcp"))
649714
650- node = PostgresNode (name , port , base_dir )
715+ node = PostgresNode (name , port , base_dir , use_logging = use_logging )
651716 registered_nodes .append (node )
652717 last_assigned_port = port
653718
@@ -663,7 +728,12 @@ def clean_all():
663728
664729def stop_all ():
665730 global registered_nodes
731+ global util_threads
732+
666733 for node in registered_nodes :
667734 # stop server if it still working
668735 if node .working :
669736 node .stop ()
737+
738+ for thread in util_threads :
739+ thread .stop ()
0 commit comments