2828import pwd
2929import tempfile
3030import shutil
31+ import time
3132
3233# Try to use psycopg2 by default. If psycopg2 isn"t available then use
3334# pg8000 which is slower but much more portable because uses only
4546last_assigned_port = int (random .random () * 16384 ) + 49152 ;
4647
4748
48- class ClusterException (Exception ):
49- pass
49+ class ClusterException (Exception ): pass
50+ class QueryException ( Exception ): pass
5051
5152
5253class PostgresNode :
5354 def __init__ (self , name , port ):
5455 self .name = name
56+ self .host = '127.0.0.1'
5557 self .port = port
5658 self .base_dir = tempfile .mkdtemp ()
5759 os .makedirs (self .logs_dir )
@@ -75,6 +77,11 @@ def output_filename(self):
7577 def error_filename (self ):
7678 return "%s/stderr.log" % self .logs_dir
7779
80+ @property
81+ def connstr (self ):
82+ return "port=%s" % self .port
83+ # return "port=%s host=%s" % (self.port, self.host)
84+
7885 def load_pg_config (self ):
7986 """ Loads pg_config output into dict """
8087 pg_config = os .environ .get ("PG_CONFIG" ) \
@@ -116,6 +123,10 @@ def init(self, allows_streaming=False):
116123 "fsync = off\n "
117124 "log_statement = all\n "
118125 "port = %s\n " % self .port )
126+ conf .write (
127+ # "unix_socket_directories = '%s'\n"
128+ # "listen_addresses = ''\n";)
129+ "listen_addresses = '%s'\n " % self .host )
119130
120131 if allows_streaming :
121132 conf .write (
@@ -129,7 +140,7 @@ def init(self, allows_streaming=False):
129140 "max_connections = 10\n " )
130141 self .set_replication_conf ()
131142
132- def init_from_backup (self , root_node , backup_name ):
143+ def init_from_backup (self , root_node , backup_name , has_streaming = False , hba_permit_replication = True ):
133144 """Initializes cluster from backup, made by another node"""
134145
135146 # Copy data from backup
@@ -142,11 +153,25 @@ def init_from_backup(self, root_node, backup_name):
142153 "postgresql.conf" ,
143154 "port = %s" % self .port
144155 )
156+ # Enable streaming
157+ if hba_permit_replication :
158+ self .set_replication_conf ()
159+ if has_streaming :
160+ self .enable_streaming (root_node )
145161
146162 def set_replication_conf (self ):
147163 hba_conf = "%s/pg_hba.conf" % self .data_dir
148164 with open (hba_conf , "a" ) as conf :
149165 conf .write ("local replication all trust\n " )
166+ # conf.write("host replication all 127.0.0.1/32 trust\n")
167+
168+ def enable_streaming (self , root_node ):
169+ config_name = "%s/recovery.conf" % self .data_dir
170+ with open (config_name , "a" ) as conf :
171+ conf .write (
172+ "primary_conninfo='%s application_name=%s'\n "
173+ "standby_mode=on\n "
174+ % (root_node .connstr , self .name ))
150175
151176 def append_conf (self , filename , string ):
152177 """Appends line to a config file like "postgresql.conf"
@@ -254,6 +279,22 @@ def safe_psql(self, dbname, query):
254279 raise ClusterException ("psql failed:\n " + err )
255280 return out
256281
282+ def poll_query_until (self , dbname , query ):
283+ """Runs a query once a second until it returs True"""
284+ max_attemps = 60
285+ attemps = 0
286+
287+ while attemps < max_attemps :
288+ ret = self .safe_psql (dbname , query )
289+
290+ # TODO: fix psql so that it returns result without newline
291+ if ret == "t\n " :
292+ return
293+
294+ time .sleep (1 )
295+ attemps += 1
296+ raise QueryException ("Timeout while waiting for query to return True" )
297+
257298 def execute (self , dbname , query ):
258299 """Executes the query and returns all rows"""
259300 connection = pglib .connect (
@@ -327,3 +368,8 @@ def clean_all():
327368 for node in registered_nodes :
328369 node .cleanup ()
329370 registered_nodes = []
371+
372+ def stop_all ():
373+ global registered_nodes
374+ for node in registered_nodes :
375+ node .stop ()
0 commit comments