Skip to content

Commit

Permalink
Merge pull request #69 from zalando/feature/clusterid
Browse files Browse the repository at this point in the history
Add clusterid to patroni in order to avoid connections from already existing nodes belonging to another cluster (master - replica system).
  • Loading branch information
Oleksii Kliukin committed Oct 21, 2015
2 parents b922b8a + 0b69ef9 commit 2d7909e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 19 deletions.
5 changes: 4 additions & 1 deletion patroni/dcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ def take_leader(self):
overwriting the key if necessary."""

@abc.abstractmethod
def initialize(self):
def initialize(self, create_new=True, sysid=""):
"""Race for cluster initialization.
:param create_new: False if the key should already exist (in the case we are setting the system_id)
:param sysid: PostgreSQL cluster system identifier, if specified, is written to the key
:returns: `!True` if key has been created successfully.
this method should create atomically initialize key and return `!True`
Expand Down
7 changes: 4 additions & 3 deletions patroni/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ def _load_cluster(self):
nodes = {os.path.relpath(node.key, result.key): node for node in result.leaves}

# get initialize flag
initialize = bool(nodes.get(self._INITIALIZE, False))
initialize = nodes.get(self._INITIALIZE, None)
initialize = initialize and initialize.value

# get last leader operation
last_leader_operation = nodes.get(self._LEADER_OPTIME, None)
Expand Down Expand Up @@ -235,8 +236,8 @@ def update_leader(self):
return self.retry(self.client.test_and_set, self.leader_path, self._name, self._name, self.ttl)

@catch_etcd_errors
def initialize(self):
return self.retry(self.client.write, self.initialize_path, self._name, prevExist=False)
def initialize(self, create_new=True, sysid=""):
return self.retry(self.client.write, self.initialize_path, sysid, prevExist=(not create_new))

@catch_etcd_errors
def delete_leader(self):
Expand Down
23 changes: 18 additions & 5 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import psycopg2
import requests
import sys

from patroni.async_executor import AsyncExecutor
from patroni.exceptions import DCSError, PostgresConnectionException
Expand Down Expand Up @@ -73,9 +74,10 @@ def bootstrap(self):
self._async_executor.run_async(self.copy_backup_from_leader, args=(self.cluster.leader, ))
return 'trying to bootstrap from leader'
elif not self.cluster.initialize: # no initialize key
if self.dcs.initialize(): # race for initialization
if self.dcs.initialize(create_new=True): # race for initialization
try:
self.state_handler.bootstrap()
self.dcs.initialize(create_new=False, sysid=self.state_handler.sysid)
except: # initdb or start failed
# remove initialization key and give a chance to other members
logger.info("removing initialize key after failed attempt to initialize the cluster")
Expand Down Expand Up @@ -350,15 +352,20 @@ def handle_long_action_in_progress(self):
else:
return self._async_executor.scheduled_action + ' in progress'

def sysid_valid(self, sysid):
# sysid does tv_sec << 32, where tv_sec is the number of seconds sine 1970,
# so even 1 << 32 would have 10 digits.
return str(sysid) and len(str(sysid)) >= 10 and str(sysid).isdigit()

def _run_cycle(self):
try:
self.load_cluster_from_dcs()

self.touch_member()

# cluster has leader key but not initialize key
if not self.cluster.is_unlocked() and not self.cluster.initialize:
self.dcs.initialize() # fix it
if not self.cluster.is_unlocked() and not self.sysid_valid(self.cluster.initialize) and self.has_lock():
self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)

if self._async_executor.busy:
return self.handle_long_action_in_progress()
Expand All @@ -372,8 +379,14 @@ def _run_cycle(self):
if self.state_handler.data_directory_empty():
return self.bootstrap() # new node
# "bootstrap", but data directory is not empty
elif not self.cluster.initialize and self.cluster.is_unlocked():
self.dcs.initialize()
elif not self.sysid_valid(self.cluster.initialize) and self.cluster.is_unlocked():
self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
else:
# check if we are allowed to join
if self.sysid_valid(self.cluster.initialize) and self.cluster.initialize != self.state_handler.sysid:
logger.fatal("system ID mismatch, node {0} belongs to a different cluster".
format(self.state_handler.name))
sys.exit(1)

# try to start dead postgres
if not self.state_handler.is_healthy():
Expand Down
15 changes: 10 additions & 5 deletions patroni/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, config):
self._connection = None
self._cursor_holder = None
self._need_rewind = False
self._sysid = None
self.replication_slots = [] # list of already existing replication slots
self.retry = Retry(max_tries=-1, deadline=5, max_delay=1, retry_exceptions=PostgresConnectionException)

Expand Down Expand Up @@ -100,10 +101,14 @@ def can_rewind(self):
return False
# check if the cluster's configuration permits pg_rewind
data = self.controldata()
if data:
return data.get('wal_log_hints setting', 'off') == 'on' or\
data.get('Data page checksum version', '0') != '0'
return False
return data.get('wal_log_hints setting', 'off') == 'on' or data.get('Data page checksum version', '0') != '0'

@property
def sysid(self):
if not self._sysid:
data = self.controldata()
self._sysid = data.get('Database system identifier', "")
return self._sysid

def require_rewind(self):
self._need_rewind = True
Expand Down Expand Up @@ -391,7 +396,7 @@ def controldata(self):
try:
data = subprocess.check_output(['pg_controldata', self.data_dir])
if data:
data = data.splitlines()
data = data.decode().splitlines()
result = {l.split(':')[0].replace('Current ', '', 1): l.split(':')[1].strip() for l in data if l}
except subprocess.CalledProcessError:
logger.exception("Error when calling pg_controldata")
Expand Down
7 changes: 4 additions & 3 deletions patroni/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _inner_load_cluster(self):
self.fetch_cluster = True

# get initialize flag
initialize = self._INITIALIZE in nodes
initialize = (self.get_node(self.initialize_path) or [None])[0] if self._INITIALIZE in nodes else None

# get list of members
members = self.load_members() if self._MEMBERS[:-1] in nodes else []
Expand Down Expand Up @@ -203,8 +203,9 @@ def set_failover_value(self, value, index=None):
logging.exception('set_failover_value')
return False

def initialize(self):
return self._create(self.initialize_path, self._name, makepath=True)
def initialize(self, create_new=True, sysid=""):
return self._create(self.initialize_path, sysid, makepath=True) if create_new \
else self.client.retry(self.client.set, self.initialize_path, sysid.encode("utf-8"))

def touch_member(self, data, ttl=None):
cluster = self.cluster
Expand Down
8 changes: 7 additions & 1 deletion tests/test_ha.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import etcd
import unittest

from mock import Mock, patch
from mock import Mock, MagicMock, patch
from patroni.dcs import Cluster, Failover, Leader, Member
from patroni.etcd import Client, Etcd
from patroni.exceptions import DCSError, PostgresException
Expand Down Expand Up @@ -130,6 +130,12 @@ def test_recover_master_failed(self):
self.ha.has_lock = true
self.assertEquals(self.ha.run_cycle(), 'removed leader key after trying and failing to start postgres')

@patch('sys.exit', return_value=1)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match(self, exit_mock):
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)

@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_start_as_readonly(self):
self.p.is_leader = self.p.is_healthy = false
Expand Down
6 changes: 5 additions & 1 deletion tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __exit__(self, *args):


def pg_controldata_string(*args, **kwargs):
return """
return b"""
pg_control version number: 942
Catalog version number: 201509161
Database system identifier: 6200971513092291716
Expand Down Expand Up @@ -435,6 +435,10 @@ def test_cleanup_archive_status(self, mock_file, mock_link, mock_remove, mock_un
mock_unlink.assert_not_called()
mock_remove.assert_not_called()

@patch('subprocess.check_output', MagicMock(return_value=0, side_effect=pg_controldata_string))
def test_sysid(self):
self.assertEqual(self.p.sysid, "6200971513092291716")

@patch('os.path.isfile', MagicMock(return_value=True))
@patch('shutil.copy', side_effect=Exception)
def test_save_configuration_files(self, mock_copy):
Expand Down

0 comments on commit 2d7909e

Please sign in to comment.