-
Notifications
You must be signed in to change notification settings - Fork 817
/
postgresql.py
1619 lines (1382 loc) · 70.5 KB
/
postgresql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import errno
import os
import psycopg2
import psutil
import re
import shlex
import shutil
import signal
import subprocess
import tempfile
import time
from collections import defaultdict
from contextlib import contextmanager
from patroni import call_self
from patroni.callback_executor import CallbackExecutor
from patroni.exceptions import PostgresConnectionException, PostgresException
from patroni.utils import compare_values, parse_bool, parse_int, Retry, RetryFailedError, polling_loop, null_context
from six import string_types
from six.moves.urllib.parse import quote_plus
from threading import current_thread, Lock, Event
logger = logging.getLogger(__name__)
ACTION_ON_START = "on_start"
ACTION_ON_STOP = "on_stop"
ACTION_ON_RESTART = "on_restart"
ACTION_ON_RELOAD = "on_reload"
ACTION_ON_ROLE_CHANGE = "on_role_change"
STATE_RUNNING = 'running'
STATE_REJECT = 'rejecting connections'
STATE_NO_RESPONSE = 'not responding'
STATE_UNKNOWN = 'unknown'
STOP_SIGNALS = {
'smart': signal.SIGTERM,
'fast': signal.SIGINT,
'immediate': signal.SIGQUIT,
}
STOP_POLLING_INTERVAL = 1
REWIND_STATUS = type('Enum', (), {'INITIAL': 0, 'CHECK': 1, 'NEED': 2, 'NOT_NEED': 3, 'SUCCESS': 4, 'FAILED': 5})
def slot_name_from_member_name(member_name):
"""Translate member name to valid PostgreSQL slot name.
PostgreSQL replication slot names must be valid PostgreSQL names. This function maps the wider space of
member names to valid PostgreSQL names. Names are lowercased, dashes and periods common in hostnames
are replaced with underscores, other characters are encoded as their unicode codepoint. Name is truncated
to 64 characters. Multiple different member names may map to a single slot name."""
def replace_char(match):
c = match.group(0)
return '_' if c in '-.' else "u{:04d}".format(ord(c))
slot_name = re.sub('[^a-z0-9_]', replace_char, member_name.lower())
return slot_name[0:64]
class Postgresql(object):
# List of parameters which must be always passed to postmaster as command line options
# to make it not possible to change them with 'ALTER SYSTEM'.
# Some of these parameters have sane default value assigned and Patroni doesn't allow
# to decrease this value. E.g. 'wal_level' can't be lower then 'hot_standby' and so on.
# These parameters could be changed only globally, i.e. via DCS.
# P.S. 'listen_addresses' and 'port' are added here just for convenience, to mark them
# as a parameters which should always be passed through command line.
#
# Format:
# key - parameter name
# value - tuple(default_value, check_function, min_version)
# default_value -- some sane default value
# check_function -- if the new value is not correct must return `!False`
# min_version -- major version of PostgreSQL when parameter was introduced
CMDLINE_OPTIONS = {
'listen_addresses': (None, lambda _: False, 90100),
'port': (None, lambda _: False, 90100),
'cluster_name': (None, lambda _: False, 90500),
'wal_level': ('hot_standby', lambda v: v.lower() in ('hot_standby', 'replica', 'logical'), 90100),
'hot_standby': ('on', lambda _: False, 90100),
'max_connections': (100, lambda v: int(v) >= 100, 90100),
'max_wal_senders': (5, lambda v: int(v) >= 5, 90100),
'wal_keep_segments': (8, lambda v: int(v) >= 8, 90100),
'max_prepared_transactions': (0, lambda v: int(v) >= 0, 90100),
'max_locks_per_transaction': (64, lambda v: int(v) >= 64, 90100),
'track_commit_timestamp': ('off', lambda v: parse_bool(v) is not None, 90500),
'max_replication_slots': (5, lambda v: int(v) >= 5, 90400),
'max_worker_processes': (8, lambda v: int(v) >= 8, 90400),
'wal_log_hints': ('on', lambda _: False, 90400)
}
def __init__(self, config):
self.config = config
self.name = config['name']
self.scope = config['scope']
self._bin_dir = config.get('bin_dir') or ''
self._database = config.get('database', 'postgres')
self._data_dir = config['data_dir']
self._pending_restart = False
self.__thread_ident = current_thread().ident
self._version_file = os.path.join(self._data_dir, 'PG_VERSION')
self._major_version = self.get_major_version()
self._synchronous_standby_names = None
self._server_parameters = self.get_server_parameters(config)
self._connect_address = config.get('connect_address')
self._superuser = config['authentication'].get('superuser', {})
self.resolve_connection_addresses()
self._rewind_state = REWIND_STATUS.INITIAL
self._use_slots = config.get('use_slots', True)
self._schedule_load_slots = self.use_slots
self._pgpass = config.get('pgpass') or os.path.join(os.path.expanduser('~'), 'pgpass')
self._callback_executor = CallbackExecutor()
self.__cb_called = False
self.__cb_pending = None
config_base_name = config.get('config_base_name', 'postgresql')
self._postgresql_conf = os.path.join(self._data_dir, config_base_name + '.conf')
self._postgresql_base_conf_name = config_base_name + '.base.conf'
self._postgresql_base_conf = os.path.join(self._data_dir, self._postgresql_base_conf_name)
self._recovery_conf = os.path.join(self._data_dir, 'recovery.conf')
self._postmaster_pid = os.path.join(self._data_dir, 'postmaster.pid')
self._trigger_file = config.get('recovery_conf', {}).get('trigger_file') or 'promote'
self._trigger_file = os.path.abspath(os.path.join(self._data_dir, self._trigger_file))
self._connection_lock = Lock()
self._connection = None
self._cursor_holder = None
self._sysid = None
self._replication_slots = [] # list of already existing replication slots
self.retry = Retry(max_tries=-1, deadline=config['retry_timeout']/2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
# Retry 'pg_is_in_recovery()' only once
self._is_leader_retry = Retry(max_tries=1, deadline=config['retry_timeout']/2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
self._state_lock = Lock()
self.set_state('stopped')
self._role_lock = Lock()
self.set_role(self.get_postgres_role_from_data_directory())
self._state_entry_timestamp = None
# This event is set to true when no backends are running. Could be set in parallel by
# multiple processes, like when demote is racing with async restart. Needs to be cleared
# before invoking stop if wait for this event is desired.
self.stop_safepoint_reached = Event()
self.stop_safepoint_reached.set()
if self.is_running():
self.set_state('running')
self.set_role('master' if self.is_leader() else 'replica')
self._write_postgresql_conf() # we are "joining" already running postgres
@property
def _configuration_to_save(self):
configuration = [self._postgresql_conf]
if 'custom_conf' not in self.config:
configuration.append(self._postgresql_base_conf)
if not self.config['parameters'].get('hba_file'):
configuration.append(os.path.join(self._data_dir, 'pg_hba.conf'))
return configuration
@property
def use_slots(self):
return self._use_slots and self._major_version >= 90400
@property
def _replication(self):
return self.config['authentication']['replication']
@property
def callback(self):
return self.config.get('callbacks') or {}
@staticmethod
def _wal_name(version):
return 'wal' if version >= 100000 else 'xlog'
@property
def wal_name(self):
return self._wal_name(self._major_version)
@property
def lsn_name(self):
return 'lsn' if self._major_version >= 100000 else 'location'
def _version_file_exists(self):
return not self.data_directory_empty() and os.path.isfile(self._version_file)
def get_major_version(self):
if self._version_file_exists():
try:
with open(self._version_file) as f:
return self.postgres_major_version_to_int(f.read().strip())
except Exception:
logger.exception('Failed to read PG_VERSION from %s', self._data_dir)
return 0
def get_server_parameters(self, config):
parameters = config['parameters'].copy()
listen_addresses, port = (config['listen'] + ':5432').split(':')[:2]
parameters.update({'cluster_name': self.scope, 'listen_addresses': listen_addresses, 'port': port})
if config.get('synchronous_mode', False):
if self._synchronous_standby_names is None:
if config.get('synchronous_mode_strict', False):
parameters['synchronous_standby_names'] = '*'
else:
parameters.pop('synchronous_standby_names', None)
else:
parameters['synchronous_standby_names'] = self._synchronous_standby_names
if self._major_version >= 90600 and parameters['wal_level'] == 'hot_standby':
parameters['wal_level'] = 'replica'
return {k: v for k, v in parameters.items() if not self._major_version or
self._major_version >= self.CMDLINE_OPTIONS.get(k, (0, 1, 90100))[2]}
def resolve_connection_addresses(self):
port = self._server_parameters['port']
tcp_local_address = self._get_tcp_local_address()
self._local_address = {'host': self._get_unix_local_address() or tcp_local_address, 'port': port}
self._local_replication_address = {'host': tcp_local_address, 'port': port}
self.connection_string = 'postgres://{0}/{1}'.format(
self._connect_address or tcp_local_address + ':' + port, self._database)
def _pgcommand(self, cmd):
"""Returns path to the specified PostgreSQL command"""
return os.path.join(self._bin_dir, cmd)
def pg_ctl(self, cmd, *args, **kwargs):
"""Builds and executes pg_ctl command
:returns: `!True` when return_code == 0, otherwise `!False`"""
pg_ctl = [self._pgcommand('pg_ctl'), cmd]
return subprocess.call(pg_ctl + ['-D', self._data_dir] + list(args), **kwargs) == 0
def pg_isready(self):
"""Runs pg_isready to see if PostgreSQL is accepting connections.
:returns: 'ok' if PostgreSQL is up, 'reject' if starting up, 'no_resopnse' if not up."""
cmd = [self._pgcommand('pg_isready'),
'-h', self._local_address['host'],
'-p', self._local_address['port'],
'-d', self._database]
# We only need the username because pg_isready does not try to authenticate
if 'username' in self._superuser:
cmd.extend(['-U', self._superuser['username']])
ret = subprocess.call(cmd)
return_codes = {0: STATE_RUNNING,
1: STATE_REJECT,
2: STATE_NO_RESPONSE,
3: STATE_UNKNOWN}
return return_codes.get(ret, STATE_UNKNOWN)
def reload_config(self, config):
self._superuser = config['authentication'].get('superuser', {})
server_parameters = self.get_server_parameters(config)
local_connection_address_changed = pending_reload = pending_restart = False
if self.state == 'running':
changes = {p: v for p, v in server_parameters.items() if '.' not in p}
changes.update({p: None for p, v in self._server_parameters.items() if not ('.' in p or p in changes)})
if changes:
if 'wal_segment_size' not in changes:
changes['wal_segment_size'] = '16384kB'
# XXX: query can raise an exception
for r in self.query("""SELECT name, setting, unit, vartype, context
FROM pg_settings
WHERE name IN (""" + ', '.join(['%s'] * len(changes)) + """)
ORDER BY 1 DESC""", *(list(changes.keys()))):
if r[4] == 'internal':
if r[0] == 'wal_segment_size':
server_parameters.pop(r[0], None)
wal_segment_size = parse_int(r[2], 'kB')
if wal_segment_size is not None:
changes['wal_segment_size'] = '{0}kB'.format(int(r[1]) * wal_segment_size)
elif r[0] in changes:
unit = changes['wal_segment_size'] if r[0] in ('min_wal_size', 'max_wal_size') else r[2]
new_value = changes.pop(r[0])
if new_value is None or not compare_values(r[3], unit, r[1], new_value):
if r[4] == 'postmaster':
pending_restart = True
if r[0] in ('listen_addresses', 'port') or\
config.get('use_unix_socket', True) and r[0] == 'unix_socket_directories':
local_connection_address_changed = True
else:
pending_reload = True
for param in changes:
if param in server_parameters:
logger.warning('Removing invalid parameter `%s` from postgresql.parameters', param)
server_parameters.pop(param)
# Check that user-defined-paramters have changed (parameters with period in name)
if not pending_reload:
for p, v in server_parameters.items():
if '.' in p and (p not in self._server_parameters or str(v) != str(self._server_parameters[p])):
pending_reload = True
break
if not pending_reload:
for p, v in self._server_parameters.items():
if '.' in p and (p not in server_parameters or str(v) != str(server_parameters[p])):
pending_reload = True
break
self.config = config
self._pending_restart = pending_restart
self._server_parameters = server_parameters
self._connect_address = config.get('connect_address')
if not local_connection_address_changed:
self.resolve_connection_addresses()
if pending_reload:
self._write_postgresql_conf()
self.reload()
self._is_leader_retry.deadline = self.retry.deadline = config['retry_timeout']/2.0
@property
def pending_restart(self):
return self._pending_restart
@staticmethod
def configuration_allows_rewind(data):
return data.get('wal_log_hints setting', 'off') == 'on' \
or data.get('Data page checksum version', '0') != '0'
@property
def can_rewind(self):
""" check if pg_rewind executable is there and that pg_controldata indicates
we have either wal_log_hints or checksums turned on
"""
# low-hanging fruit: check if pg_rewind configuration is there
if not (self.config.get('use_pg_rewind') and all(self._superuser.get(n) for n in ('username', 'password'))):
return False
cmd = [self._pgcommand('pg_rewind'), '--help']
try:
ret = subprocess.call(cmd, stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT)
if ret != 0: # pg_rewind is not there, close up the shop and go home
return False
except OSError:
return False
return self.configuration_allows_rewind(self.controldata())
@property
def sysid(self):
if not self._sysid:
data = self.controldata()
self._sysid = data.get('Database system identifier', "")
return self._sysid
def _get_unix_local_address(self):
for d in self._server_parameters.get('unix_socket_directories', '').split(','):
d = d.strip()
if d.startswith('/'): # Only absolute path can be used to connect via unix-socket
return d
def _get_tcp_local_address(self):
listen_addresses = self._server_parameters['listen_addresses'].split(',')
for la in listen_addresses:
if la.strip().lower() in ('*', '0.0.0.0', '127.0.0.1', 'localhost'): # we are listening on '*' or localhost
return 'localhost' # connection via localhost is preferred
return listen_addresses[0].strip() # can't use localhost, take first address from listen_addresses
def get_postgres_role_from_data_directory(self):
if self.data_directory_empty():
return 'uninitialized'
elif os.path.exists(self._recovery_conf):
return 'replica'
else:
return 'master'
@property
def _local_connect_kwargs(self):
ret = self._local_address.copy()
ret.update({'database': self._database,
'fallback_application_name': 'Patroni',
'connect_timeout': 3,
'options': '-c statement_timeout=2000'})
if 'username' in self._superuser:
ret['user'] = self._superuser['username']
if 'password' in self._superuser:
ret['password'] = self._superuser['password']
return ret
def connection(self):
with self._connection_lock:
if not self._connection or self._connection.closed != 0:
self._connection = psycopg2.connect(**self._local_connect_kwargs)
self._connection.autocommit = True
self.server_version = self._connection.server_version
return self._connection
def _cursor(self):
if not self._cursor_holder or self._cursor_holder.closed or self._cursor_holder.connection.closed != 0:
logger.info("establishing a new patroni connection to the postgres cluster")
self._cursor_holder = self.connection().cursor()
return self._cursor_holder
def close_connection(self):
if self._connection and self._connection.closed == 0:
self._connection.close()
logger.info("closed patroni connection to the postgresql cluster")
self._cursor_holder = self._connection = None
def _query(self, sql, *params):
"""We are always using the same cursor, therefore this method is not thread-safe!!!
You can call it from different threads only if you are holding explicit `AsyncExecutor` lock,
because the main thread is always holding this lock when running HA cycle."""
cursor = None
try:
cursor = self._cursor()
cursor.execute(sql, params)
return cursor
except psycopg2.Error as e:
if cursor and cursor.connection.closed == 0:
# When connected via unix socket, psycopg2 can't recoginze 'connection lost'
# and leaves `_cursor_holder.connection.closed == 0`, but psycopg2.OperationalError
# is still raised (what is correct). It doesn't make sense to continiue with existing
# connection and we will close it, to avoid its reuse by the `_cursor` method.
if isinstance(e, psycopg2.OperationalError):
self.close_connection()
else:
raise e
if self.state == 'restarting':
raise RetryFailedError('cluster is being restarted')
raise PostgresConnectionException('connection problems')
def query(self, sql, *params):
try:
return self.retry(self._query, sql, *params)
except RetryFailedError as e:
raise PostgresConnectionException(str(e))
def data_directory_empty(self):
return not os.path.exists(self._data_dir) or os.listdir(self._data_dir) == []
@staticmethod
def initdb_allowed_option(name):
if name in ['pgdata', 'nosync', 'pwfile', 'sync-only']:
raise Exception('{0} option for initdb is not allowed'.format(name))
return True
def get_initdb_options(self, config):
options = []
for o in config:
if isinstance(o, string_types) and self.initdb_allowed_option(o):
options.append('--{0}'.format(o))
elif isinstance(o, dict):
keys = list(o.keys())
if len(keys) != 1 or not isinstance(keys[0], string_types) or not self.initdb_allowed_option(keys[0]):
raise Exception('Invalid option: {0}'.format(o))
options.append('--{0}={1}'.format(keys[0], o[keys[0]]))
else:
raise Exception('Unknown type of initdb option: {0}'.format(o))
return options
def _initialize(self, config):
self.set_state('initalizing new cluster')
options = self.get_initdb_options(config.get('initdb') or [])
pwfile = None
if self._superuser:
if 'username' in self._superuser:
options.append('--username={0}'.format(self._superuser['username']))
if 'password' in self._superuser:
(fd, pwfile) = tempfile.mkstemp()
os.write(fd, self._superuser['password'].encode('utf-8'))
os.close(fd)
options.append('--pwfile={0}'.format(pwfile))
options = ['-o', ' '.join(options)] if options else []
ret = self.pg_ctl('initdb', *options)
if pwfile:
os.remove(pwfile)
if ret:
self.write_pg_hba(config.get('pg_hba', []))
self._major_version = self.get_major_version()
self._server_parameters = self.get_server_parameters(self.config)
else:
self.set_state('initdb failed')
return ret
def run_bootstrap_post_init(self, config):
"""
runs a script after initdb or custom bootstrap script is called and waits until completion.
"""
cmd = config.get('post_bootstrap') or config.get('post_init')
if cmd:
r = self._local_connect_kwargs
# '/tmp' => '%2Ftmp' for unix socket path
host = quote_plus(r['host']) if r['host'].startswith('/') else r['host']
if 'user' in r:
user = r['user'] + '@'
else:
user = ''
if 'password' in r:
import getpass
r.setdefault('user', os.environ.get('PGUSER', getpass.getuser()))
connstring = 'postgres://{0}{1}:{2}/{3}'.format(user, host, r['port'], r['database'])
env = self.write_pgpass(r) if 'password' in r else None
try:
ret = subprocess.call(shlex.split(cmd) + [connstring], env=env)
except OSError:
logger.error('post_init script %s failed', cmd)
return False
if ret != 0:
logger.error('post_init script %s returned non-zero code %d', cmd, ret)
return False
return True
def delete_trigger_file(self):
if os.path.exists(self._trigger_file):
os.unlink(self._trigger_file)
def write_pgpass(self, record):
if 'user' not in record or 'password' not in record:
return os.environ.copy()
with open(self._pgpass, 'w') as f:
os.fchmod(f.fileno(), 0o600)
f.write('{host}:{port}:*:{user}:{password}\n'.format(**record))
env = os.environ.copy()
env['PGPASSFILE'] = self._pgpass
return env
def replica_method_can_work_without_replication_connection(self, method):
return method != 'basebackup' and self.config and self.config.get(method, {}).get('no_master')
def can_create_replica_without_replication_connection(self):
""" go through the replication methods to see if there are ones
that does not require a working replication connection.
"""
replica_methods = self.config.get('create_replica_method', [])
return any(self.replica_method_can_work_without_replication_connection(method) for method in replica_methods)
def create_replica(self, clone_member):
"""
create the replica according to the replica_method
defined by the user. this is a list, so we need to
loop through all methods the user supplies
"""
self.set_state('creating replica')
self._sysid = None
# get list of replica methods from config.
# If there is no configuration key, or no value is specified, use basebackup
replica_methods = self.config.get('create_replica_method') or ['basebackup']
if clone_member and clone_member.conn_url:
r = clone_member.conn_kwargs(self._replication)
connstring = 'postgres://{user}@{host}:{port}/{database}'.format(**r)
# add the credentials to connect to the replica origin to pgpass.
env = self.write_pgpass(r)
else:
connstring = ''
env = os.environ.copy()
# if we don't have any source, leave only replica methods that work without it
replica_methods = \
[r for r in replica_methods if self.replica_method_can_work_without_replication_connection(r)]
# go through them in priority order
ret = 1
for replica_method in replica_methods:
# if the method is basebackup, then use the built-in
if replica_method == "basebackup":
ret = self.basebackup(connstring, env)
if ret == 0:
logger.info("replica has been created using basebackup")
# if basebackup succeeds, exit with success
break
else:
if not self.data_directory_empty():
self.remove_data_directory()
cmd = replica_method
method_config = {}
# user-defined method; check for configuration
# not required, actually
if replica_method in self.config:
method_config = self.config[replica_method].copy()
# look to see if the user has supplied a full command path
# if not, use the method name as the command
cmd = method_config.pop('command', cmd)
# add the default parameters
method_config.update({"scope": self.scope,
"role": "replica",
"datadir": self._data_dir,
"connstring": connstring})
params = ["--{0}={1}".format(arg, val) for arg, val in method_config.items()]
try:
# call script with the full set of parameters
ret = subprocess.call(shlex.split(cmd) + params, env=env)
# if we succeeded, stop
if ret == 0:
logger.info('replica has been created using %s', replica_method)
break
else:
logger.error('Error creating replica using method %s: %s exited with code=%s',
replica_method, cmd, ret)
except Exception:
logger.exception('Error creating replica using method %s', replica_method)
ret = 1
self.set_state('stopped')
return ret
def is_leader(self):
try:
return not self._is_leader_retry(self._query, 'SELECT pg_is_in_recovery()').fetchone()[0]
except RetryFailedError as e: # SELECT pg_is_in_recovery() failed two times
if not self.is_starting() and self.pg_isready() == STATE_REJECT:
self.set_state('starting')
raise PostgresConnectionException(str(e))
def is_running(self):
if not (self._version_file_exists() and os.path.isfile(self._postmaster_pid)):
# XXX: This is dangerous in case somebody deletes the data directory while PostgreSQL is still running.
return False
return self.is_pid_running(self.get_pid())
def read_pid_file(self):
"""Reads and parses postmaster.pid from the data directory
:returns dictionary of values if successful, empty dictionary otherwise
"""
pid_line_names = ['pid', 'data_dir', 'start_time', 'port', 'socket_dir', 'listen_addr', 'shmem_key']
try:
with open(self._postmaster_pid) as f:
return {name: line.rstrip("\n") for name, line in zip(pid_line_names, f)}
except IOError:
return {}
def get_pid(self):
"""Fetches pid value from postmaster.pid using read_pid_file
:returns pid if successful, 0 if pid file is not present"""
# TODO: figure out what to do on permission errors
pid = self.read_pid_file().get('pid', 0)
try:
return int(pid)
except ValueError:
logger.warning("Garbage pid in postmaster.pid: {0!r}".format(pid))
return 0
@staticmethod
def is_pid_running(pid):
try:
if pid < 0:
pid = -pid
return pid > 0 and pid != os.getpid() and pid != os.getppid() and (os.kill(pid, 0) or True)
except Exception:
return False
@property
def cb_called(self):
return self.__cb_called
def call_nowait(self, cb_name):
""" pick a callback command and call it without waiting for it to finish """
if cb_name in (ACTION_ON_START, ACTION_ON_STOP, ACTION_ON_RESTART, ACTION_ON_ROLE_CHANGE):
self.__cb_called = True
if self.callback and cb_name in self.callback:
cmd = self.callback[cb_name]
try:
cmd = shlex.split(self.callback[cb_name]) + [cb_name, self.role, self.scope]
self._callback_executor.call(cmd)
except Exception:
logger.exception('callback %s %s %s %s failed', cmd, cb_name, self.role, self.scope)
@property
def role(self):
with self._role_lock:
return self._role
def set_role(self, value):
with self._role_lock:
self._role = value
@property
def state(self):
with self._state_lock:
return self._state
def set_state(self, value):
with self._state_lock:
self._state = value
self._state_entry_timestamp = time.time()
def time_in_state(self):
return time.time() - self._state_entry_timestamp
def is_starting(self):
return self.state == 'starting'
def wait_for_port_open(self, pid, initiated, timeout):
"""Waits until PostgreSQL opens ports."""
for _ in polling_loop(timeout):
pid_file = self.read_pid_file()
if len(pid_file) > 5:
try:
pmpid = int(pid_file['pid'])
pmstart = int(pid_file['start_time'])
if pmstart >= initiated - 2 and pmpid == pid:
isready = self.pg_isready()
if isready != STATE_NO_RESPONSE:
if isready not in [STATE_REJECT, STATE_RUNNING]:
logger.warning("Can't determine PostgreSQL startup status, assuming running")
return True
except ValueError:
# Garbage in the pid file
pass
if not self.is_pid_running(pid):
logger.error('postmaster is not running')
self.set_state('start failed')
return False
logger.warning("Timed out waiting for PostgreSQL to start")
return False
def start(self, timeout=None, block_callbacks=False, task=None):
"""Start PostgreSQL
Waits for postmaster to open ports or terminate so pg_isready can be used to check startup completion
or failure.
:returns: True if start was initiated and postmaster ports are open, False if start failed"""
# make sure we close all connections established against
# the former node, otherwise, we might get a stalled one
# after kill -9, which would report incorrect data to
# patroni.
self.close_connection()
if self.is_running():
logger.error('Cannot start PostgreSQL because one is already running.')
return True
if not block_callbacks:
self.__cb_pending = ACTION_ON_START
self.set_role(self.get_postgres_role_from_data_directory())
self.set_state('starting')
self._pending_restart = False
self._write_postgresql_conf()
self.resolve_connection_addresses()
opts = {p: self._server_parameters[p] for p in self.CMDLINE_OPTIONS if p in self._server_parameters}
options = ['--{0}={1}'.format(p, v) for p, v in opts.items()]
start_initiated = time.time()
# Unfortunately `pg_ctl start` does not return postmaster pid to us. Without this information
# it is hard to know the current state of postgres startup, so we had to reimplement pg_ctl start
# in python. It will start postgres, wait for port to be open and wait until postgres will start
# accepting connections.
# Important!!! We can't just start postgres using subprocess.Popen, because in this case it
# will be our child for the rest of our live and we will have to take care of it (`waitpid`).
# So we will use the same approach as pg_ctl uses: start a new process, which will start postgres.
# This process will write postmaster pid to stdout and exit immediately. Now it's responsibility
# of init process to take care about postmaster.
# In order to make everything portable we can't use fork&exec approach here, so we will call
# ourselves and pass list of arguments which must be used to start postgres.
with task or null_context():
if task and task.is_cancelled:
logger.info("PostgreSQL start cancelled.")
return False
start_initiated = time.time()
proc = call_self(['pg_ctl_start', self._pgcommand('postgres'), '-D', self._data_dir] + options,
close_fds=True, preexec_fn=os.setsid, stdout=subprocess.PIPE,
env={p: os.environ[p] for p in ('PATH', 'LC_ALL', 'LANG') if p in os.environ})
pid = int(proc.stdout.readline().strip())
proc.wait()
logger.info('postmaster pid=%s', pid)
if task:
task.complete(pid)
start_timeout = timeout
if not start_timeout:
try:
start_timeout = float(self.config.get('pg_ctl_timeout', 60))
except ValueError:
start_timeout = 60
# We want postmaster to open ports before we continue
if not self.wait_for_port_open(pid, start_initiated, start_timeout):
return False
ret = self.wait_for_startup(start_timeout)
if ret is not None:
return ret
elif timeout is not None:
return False
else:
return None
def checkpoint(self, connect_kwargs=None):
check_not_is_in_recovery = connect_kwargs is not None
connect_kwargs = connect_kwargs or self._local_connect_kwargs
for p in ['connect_timeout', 'options']:
connect_kwargs.pop(p, None)
try:
with self._get_connection_cursor(**connect_kwargs) as cur:
cur.execute("SET statement_timeout = 0")
if check_not_is_in_recovery:
cur.execute('SELECT pg_is_in_recovery()')
if cur.fetchone()[0]:
return 'is_in_recovery=true'
return cur.execute('CHECKPOINT')
except psycopg2.Error:
logging.exception('Exception during CHECKPOINT')
return 'not accessible or not healty'
def stop(self, mode='fast', block_callbacks=False, checkpoint=True):
success, pg_signaled = self._do_stop(mode, block_callbacks, checkpoint)
if success:
self.stop_safepoint_reached.set() # In case we exited early. Setting twice is not a problem.
# block_callbacks is used during restart to avoid
# running start/stop callbacks in addition to restart ones
if not block_callbacks:
self.set_state('stopped')
if pg_signaled:
self.call_nowait(ACTION_ON_STOP)
else:
logger.warning('pg_ctl stop failed')
self.set_state('stop failed')
return success
def _do_stop(self, mode, block_callbacks, checkpoint):
if not self.is_running():
return True, False
if checkpoint and not self.is_starting():
self.checkpoint()
if not block_callbacks:
self.set_state('stopping')
# Send signal to postmaster to stop
pid, result = self._signal_postmaster_stop(mode)
if result is not None:
return result, True
# We can skip safepoint detection if nobody is waiting for it.
if not self.stop_safepoint_reached.is_set():
# Wait for our connection to terminate so we can be sure that no new connections are being initiated
self._wait_for_connection_close(pid)
self._wait_for_user_backends_to_close(pid)
self.stop_safepoint_reached.set()
self._wait_for_postmaster_stop(pid)
return True, True
def _wait_for_postmaster_stop(self, pid):
# This wait loop differs subtly from pg_ctl as we check for both the pid file going
# away and if the pid is running. This seems safer.
while pid == self.get_pid() and self.is_pid_running(pid):
time.sleep(STOP_POLLING_INTERVAL)
def _signal_postmaster_stop(self, mode):
pid = self.get_pid()
if pid == 0:
return None, True
elif pid < 0:
logger.warning("Cannot stop server; single-user server is running (PID: {0})".format(-pid))
return None, False
try:
os.kill(pid, STOP_SIGNALS[mode])
except OSError as e:
if e.errno == errno.ESRCH:
return None, True
else:
logger.warning("Could not send stop signal to PostgreSQL (error: {0})".format(e.errno))
return None, False
return pid, None
def terminate_starting_postmaster(self, pid):
"""Terminates a postmaster that has not yet opened ports or possibly even written a pid file. Blocks
until the process goes away."""
try:
os.kill(pid, STOP_SIGNALS['immediate'])
except OSError as e:
if e.errno == errno.ESRCH:
return
logger.warning("Could not send stop signal to PostgreSQL (error: {0})".format(e.errno))
while self.is_pid_running(pid):
time.sleep(STOP_POLLING_INTERVAL)
def _wait_for_connection_close(self, pid):
try:
with self.connection().cursor() as cur:
while True: # Need a timeout here?
if pid == self.get_pid() and self.is_pid_running(pid):
cur.execute("SELECT 1")
time.sleep(STOP_POLLING_INTERVAL)
continue
else:
break
except psycopg2.Error:
pass
@staticmethod
def _wait_for_user_backends_to_close(postmaster_pid):
# These regexps are cross checked against versions PostgreSQL 9.1 .. 9.6
aux_proc_re = re.compile("(?:postgres:)( .*:)? (?:""(?:startup|logger|checkpointer|writer|wal writer|"
"autovacuum launcher|autovacuum worker|stats collector|wal receiver|archiver|"
"wal sender) process|bgworker: )")
try:
postmaster = psutil.Process(postmaster_pid)
user_backends = [p for p in postmaster.children() if not aux_proc_re.match(p.cmdline()[0])]
logger.debug("Waiting for user backends {0} to close".format(
",".join(p.cmdline()[0] for p in user_backends)))
psutil.wait_procs(user_backends)
logger.debug("Backends closed")
except psutil.NoSuchProcess:
return
def reload(self):
ret = self.pg_ctl('reload')
if ret:
self.call_nowait(ACTION_ON_RELOAD)
return ret
def check_for_startup(self):
"""Checks PostgreSQL status and returns if PostgreSQL is in the middle of startup."""
return self.is_starting() and not self.check_startup_state_changed()
def check_startup_state_changed(self):
"""Checks if PostgreSQL has completed starting up or failed or still starting.
Should only be called when state == 'starting'
:returns: True iff state was changed from 'starting'
"""
ready = self.pg_isready()
if ready == STATE_REJECT:
return False
elif ready == STATE_NO_RESPONSE:
self.set_state('start failed')
self._schedule_load_slots = False # TODO: can remove this?
self.save_configuration_files() # TODO: maybe remove this?
return True
else:
if ready != STATE_RUNNING:
# Bad configuration or unexpected OS error. No idea of PostgreSQL status.
# Let the main loop of run cycle clean up the mess.
logger.warning("%s status returned from pg_isready",
"Unknown" if ready == STATE_UNKNOWN else "Invalid")
self.set_state('running')
self._schedule_load_slots = self.use_slots
self.save_configuration_files()
# TODO: __cb_pending can be None here after PostgreSQL restarts on its own. Do we want to call the callback?
# Previously we didn't even notice.
action = self.__cb_pending or ACTION_ON_START
self.call_nowait(action)
self.__cb_pending = None
return True
def wait_for_startup(self, timeout=None):
"""Waits for PostgreSQL startup to complete or fail.
:returns: True if start was successful, False otherwise"""
if not self.is_starting():
# Should not happen
logger.warning("wait_for_startup() called when not in starting state")
while not self.check_startup_state_changed():
if timeout and self.time_in_state() > timeout:
return None
time.sleep(1)