Skip to content

Commit

Permalink
Create or update SRV record for etcd cluster inside zone specified via
Browse files Browse the repository at this point in the history
environment variable HOSTED_ZONE
  • Loading branch information
Alexander Kukushkin committed Apr 28, 2015
1 parent af0985a commit 095831a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 28 deletions.
2 changes: 1 addition & 1 deletion etcd-cluster-appliance/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM zalando/ubuntu:14.04.1-1
RUN apt-get update && apt-get -y install python python-boto

## Install etcd
ENV ETCDVERSION 2.0.9
ENV ETCDVERSION 2.0.10
RUN mkdir -m 777 /etcd && curl -L https://github.com/coreos/etcd/releases/download/v${ETCDVERSION}/etcd-v${ETCDVERSION}-linux-amd64.tar.gz | tar xz -C /etcd --strip=1 --wildcards --no-anchored etcd etcdctl
EXPOSE 2379 2380

Expand Down
84 changes: 57 additions & 27 deletions etcd-cluster-appliance/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from __future__ import print_function

import boto.ec2
import boto.route53
import json
import logging
import os
import requests
import shutil
import subprocess
import time

from boto.ec2.instance import Instance
Expand All @@ -22,6 +24,7 @@ class EtcdMember:
API_VERSION = '/v2/'
DEFAULT_CLIENT_PORT = 2379
DEFAULT_PEER_PORT = 2380
AG_TAG = 'aws:autoscaling:groupName'

def __init__(self, arg):
self.is_accessible = False
Expand Down Expand Up @@ -56,7 +59,7 @@ def set_info_from_ec2_instance(self, instance):
self.addr = instance.private_ip_address
self.dns = instance.private_dns_name
self.cluster_token = instance.tags['aws:cloudformation:stack-name']
self.autoscaling_group = instance.tags['aws:autoscaling:groupName']
self.autoscaling_group = instance.tags[self.AG_TAG]

@staticmethod
def get_addr_from_urls(urls):
Expand Down Expand Up @@ -253,7 +256,6 @@ class EtcdManager:

ETCD_BINARY = './etcd'
DATA_DIR = 'data'
AG_TAG = 'aws:autoscaling:groupName'
NAPTIME = 5

def __init__(self):
Expand All @@ -276,10 +278,10 @@ def find_my_instace(self):
self.load_my_identities()

conn = boto.ec2.connect_to_region(self.region)
for r in conn.get_all_reservations():
for r in conn.get_all_reservations(filters={'instance_id': self.instance_id}):
for i in r.instances:
if i.id == self.instance_id:
return (EtcdMember(i) if self.AG_TAG in i.tags else None)
return (EtcdMember(i) if EtcdMember.AG_TAG in i.tags else None)
return None

def get_my_instace(self):
Expand All @@ -291,9 +293,9 @@ def get_autoscaling_members(self):
me = self.get_my_instace()

conn = boto.ec2.connect_to_region(self.region)
res = conn.get_all_reservations(filters={'tag:{}'.format(self.AG_TAG): me.autoscaling_group})
res = conn.get_all_reservations(filters={'tag:{}'.format(EtcdMember.AG_TAG): me.autoscaling_group})

return [i for r in res for i in r.instances if i.state != 'terminated' and i.tags.get(self.AG_TAG, '')
return [i for r in res for i in r.instances if i.state != 'terminated' and i.tags.get(EtcdMember.AG_TAG, '')
== me.autoscaling_group]

def clean_data_dir(self):
Expand Down Expand Up @@ -351,23 +353,14 @@ def __init__(self, manager, hosted_zone):
'clientURLs': [],
})
self.hosted_zone = hosted_zone
self._srv = '_etcd-server._tcp.' + hosted_zone
self.members = {}

def get_srv_record(self):
self.conn = boto.route53.connect_to_region(self.manager.region)
zone = self.conn.get_zone(self.hosted_zone)
if not zone:
return
for r in zone.get_records():
if r.type.upper() == 'SRV' and r.name.lower().startswith(self._srv):
print(r)
self.unhealthy_members = {}

def is_leader(self):
return self.me.is_leader()

def acquire_lock(self):
data = data = {'value': self.manager.instance_id, 'ttl': self.NAPTIME, 'prevExists': False}
data = data = {'value': self.manager.instance_id, 'ttl': self.NAPTIME, 'prevExist': False}
return not self.me.api_put('keys/_self_maintenance_lock', data=data) is None

def members_changed(self):
Expand All @@ -384,7 +377,10 @@ def members_changed(self):
return changed

def cluster_unhealthy(self):
return True
process = subprocess.Popen([self.manager.ETCD_BINARY + 'ctl'], stdout=subprocess.PIPE)
ret = any([True for line in process.stdout if 'is unhealthy' in line])
process.wait()
return ret

def remove_unhealthy_members(self, autoscaling_members):
members = {}
Expand All @@ -398,27 +394,61 @@ def remove_unhealthy_members(self, autoscaling_members):
for m in members.values():
self.me.delete_member(m)

def update_srv_record(self):
pass
def update_srv_record(self, autoscaling_members):
stack_version = self.manager.me.cluster_token.split('-')[-1]
record_name = '.'.join(['_etcd-server._tcp', stack_version, self.hosted_zone])
record_type = 'SRV'

conn = boto.route53.connect_to_region('universal')
zone = conn.get_zone(self.hosted_zone)
if not zone:
return

old_record = None
for r in zone.get_records():
if r.type.upper() == record_type and r.name.lower().startswith(record_name):
old_record = r
break

members = {}
for m in self.members.values():
m = EtcdMember(m)
members[m.addr] = m

new_value = [' '.join(map(str, [1, 1, members[i.private_ip_address].peer_port, i.private_dns_name])) for i in
autoscaling_members if i.private_ip_address in members]

if old_record:
if set(old_record.resource_records) != set(new_value):
zone.update_record(old_record, new_value)
else:
zone.add_record(record_type, record_name, new_value)

def run(self):
update_required = False
while True:
try:
if self.manager.etcd_pid != 0 and self.is_leader() and (self.members_changed()
or self.cluster_unhealthy()) and self.acquire_lock():
autoscaling_members = self.manager.get_autoscaling_members()
if autoscaling_members:
self.remove_unhealthy_members(autoscaling_members)
self.update_srv_record()
if self.manager.etcd_pid != 0 and self.is_leader():
if (update_required or self.members_changed() or self.cluster_unhealthy()) and self.acquire_lock():
update_required = True
autoscaling_members = self.manager.get_autoscaling_members()
if autoscaling_members:
self.remove_unhealthy_members(autoscaling_members)
self.update_srv_record(autoscaling_members)
update_required = False
else:
self.members = {}
update_required = False
except:
logging.exception('Exception in HouseKeeper main loop')
logging.debug('Sleeping %s seconds...', self.NAPTIME)
time.sleep(self.NAPTIME)


def main():
hosted_zone = os.environ.get('HOSTED_ZONE', None)
manager = EtcdManager()
house_keeper = HouseKeeper(manager, 'test.')
house_keeper = HouseKeeper(manager, hosted_zone)
house_keeper.start()
manager.run()

Expand Down

0 comments on commit 095831a

Please sign in to comment.