Skip to content

Commit

Permalink
Merge Feike-s and my work on pypi package alltogether
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberDem0n committed Sep 6, 2015
2 parents 147d7c8 + 1774d6e commit c9577e1
Show file tree
Hide file tree
Showing 26 changed files with 614 additions and 360 deletions.
8 changes: 4 additions & 4 deletions README.rst
Expand Up @@ -77,7 +77,7 @@ settings:
etcd
- *session\_timeout*: the TTL to acquire the leader lock. Think of it
as the length of time before automatic failover process is initiated.
- *reconnects\_timeout*: how long we should try to reconnect to
- *reconnect\_timeout*: how long we should try to reconnect to
ZooKeeper after connection loss. After this timeout we assume that we
don't have lock anymore and will restart in read-only mode.
- *hosts*: list of ZooKeeper cluster members in format: [
Expand All @@ -101,6 +101,7 @@ settings:
accessible from other nodes and applications.
- *data\_dir*: file path to initialize and store Postgres data files
- *maximum\_lag\_on\_failover*: the maximum bytes a follower may lag
- *use\_slots*: whether or not to use replication_slots. Must be False for PostgreSQL 9.3, and you should comment out max_replication_slots.
before it is not eligible become leader
- *pg\_hba*: list of lines which should be added to pg\_hba.conf

Expand Down Expand Up @@ -138,9 +139,8 @@ settings:
- *password*: admin password, user will be created during
initialization.

- *recovery\_conf*: configuration settings written to recovery.conf
when configuring follower
- *parameters*: list of configuration settings for Postgres
- *recovery\_conf*: additional configuration settings written to recovery.conf when configuring follower
- *parameters*: list of configuration settings for Postgres. Many of these are required for replication to work.

Replication choices
-------------------
Expand Down
122 changes: 121 additions & 1 deletion patroni/__init__.py
@@ -1 +1,121 @@
__version__ = '0.22'
import logging
import os
import sys
import time
import yaml

from patroni.api import RestApiServer
from patroni.etcd import Etcd
from patroni.ha import Ha
from patroni.postgresql import Postgresql
from patroni.utils import setup_signal_handlers, sleep, reap_children
from patroni.zookeeper import ZooKeeper

logger = logging.getLogger(__name__)


class Patroni:

def __init__(self, config):
self.nap_time = config['loop_wait']
self.postgresql = Postgresql(config['postgresql'])
self.ha = Ha(self.postgresql, self.get_dcs(self.postgresql.name, config))
host, port = config['restapi']['listen'].split(':')
self.api = RestApiServer(self, config['restapi'])
self.next_run = time.time()
self.shutdown_member_ttl = 300

@staticmethod
def get_dcs(name, config):
if 'etcd' in config:
return Etcd(name, config['etcd'])
if 'zookeeper' in config:
return ZooKeeper(name, config['zookeeper'])
raise Exception('Can not find sutable configuration of distributed configuration store')

def touch_member(self, ttl=None):
connection_string = self.postgresql.connection_string + '?application_name=' + self.api.connection_string
if self.ha.cluster:
for m in self.ha.cluster.members:
# Do not update member TTL when it is far from being expired
if m.name == self.postgresql.name and m.real_ttl() > self.shutdown_member_ttl:
return True
return self.ha.dcs.touch_member(connection_string, ttl)

def initialize(self):
# wait for etcd to be available
while not self.touch_member():
logger.info('waiting on DCS')
sleep(5)

# is data directory empty?
if self.postgresql.data_directory_empty():
# racing to initialize
if self.ha.dcs.race('/initialize'):
self.postgresql.initialize()
self.ha.dcs.take_leader()
self.postgresql.start()
self.postgresql.create_replication_user()
self.postgresql.create_connection_users()
else:
while True:
leader = self.ha.dcs.current_leader()
if leader and self.postgresql.sync_from_leader(leader):
self.postgresql.write_recovery_conf(leader)
self.postgresql.start()
break
sleep(5)
elif self.postgresql.is_running():
self.postgresql.load_replication_slots()

def schedule_next_run(self):
if self.postgresql.is_promoted:
self.next_run = time.time()
self.next_run += self.nap_time
current_time = time.time()
nap_time = self.next_run - current_time
if nap_time <= 0:
self.next_run = current_time
else:
self.ha.dcs.watch(nap_time)

def run(self):
self.api.start()
self.next_run = time.time()

while True:
self.touch_member()
logger.info(self.ha.run_cycle())
try:
if self.ha.state_handler.is_leader():
self.ha.cluster and self.ha.state_handler.create_replication_slots(self.ha.cluster)
else:
self.ha.state_handler.drop_replication_slots()
except:
logger.exception('Exception when changing replication slots')
reap_children()
self.schedule_next_run()


def main():
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
logging.getLogger('requests').setLevel(logging.WARNING)
setup_signal_handlers()

if len(sys.argv) < 2 or not os.path.isfile(sys.argv[1]):
print('Usage: {} config.yml'.format(sys.argv[0]))
return

with open(sys.argv[1], 'r') as f:
config = yaml.load(f)

patroni = Patroni(config)
try:
patroni.initialize()
patroni.run()
except KeyboardInterrupt:
pass
finally:
patroni.touch_member(patroni.shutdown_member_ttl) # schedule member removal
patroni.postgresql.stop()
patroni.ha.dcs.delete_leader()
5 changes: 3 additions & 2 deletions patroni/__main__.py
@@ -1,4 +1,5 @@
import patroni
from patroni import main


if __name__ == '__main__':
patroni.main()
main()
File renamed without changes.
46 changes: 27 additions & 19 deletions patroni/helpers/dcs.py → patroni/dcs.py
@@ -1,7 +1,8 @@
import abc

from collections import namedtuple
from patroni.helpers.utils import calculate_ttl, sleep
from patroni.exceptions import DCSError
from patroni.utils import calculate_ttl, sleep
from six.moves.urllib_parse import urlparse, urlunparse, parse_qsl


Expand All @@ -22,24 +23,11 @@ def parse_connection_string(value):
return conn_url, api_url


class DCSError(Exception):
"""Parent class for all kind of exceptions related to selected distributed configuration store"""

def __init__(self, value):
self.value = value

def __str__(self):
"""
>>> str(DCSError('foo'))
"'foo'"
"""
return repr(self.value)


class Member(namedtuple('Member', 'index,name,conn_url,api_url,expiration,ttl')):

"""Immutable object (namedtuple) which represents single member of PostgreSQL cluster.
Consists of the following fields:
:param index: modification index of a given member key in DCS
:param index: modification index of a given member key in a Configuration Store
:param name: name of PostgreSQL cluster member
:param conn_url: connection string containing host, user and password which could be used to access this member.
:param api_url: REST API url of patroni instance
Expand All @@ -50,11 +38,30 @@ def real_ttl(self):
return calculate_ttl(self.expiration) or -1


class Leader(namedtuple('Leader', 'index,expiration,ttl,member')):

"""Immutable object (namedtuple) which represents leader key.
Consists of the following fields:
:param index: modification index of a leader key in a Configuration Store
:param expiration: expiration time of the leader key
:param ttl: ttl of the leader key
:param member: reference to a `Member` object which represents current leader (see `Cluster.members`)"""

@property
def name(self):
return self.member.name

@property
def conn_url(self):
return self.member.conn_url


class Cluster(namedtuple('Cluster', 'initialize,leader,last_leader_operation,members')):

"""Immutable object (namedtuple) which represents PostgreSQL cluster.
Consists of the following fields:
:param initialize: boolean, shows whether this cluster has initialization key stored in DC or not.
:param leader: `Member` object which represents current leader of the cluster
:param leader: `Leader` object which represents current leader of the cluster
:param last_leader_operation: int or long object containing position of last known leader operation.
This value is stored in `/optime/leader` key
:param members: list of Member object, all PostgreSQL cluster members including leader"""
Expand All @@ -74,7 +81,8 @@ def __init__(self, name, config):
i.e.: `zookeeper` for zookeeper, `etcd` for etcd, etc...
"""
self._name = name
self._base_path = '/service/' + config['scope']
self._scope = config['scope']
self._base_path = '/service/' + self._scope

def client_path(self, path):
return self._base_path + path
Expand Down Expand Up @@ -144,5 +152,5 @@ def delete_leader(self):
"""Voluntarily remove leader key from DCS
This method should remove leader key if current instance is the leader"""

def sleep(self, timeout):
def watch(self, timeout):
sleep(timeout)

0 comments on commit c9577e1

Please sign in to comment.