Skip to content

Commit

Permalink
bubuku-48: add docker-compose support
Browse files Browse the repository at this point in the history
  • Loading branch information
adyach committed Aug 17, 2016
1 parent c51c098 commit 47d483d
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 41 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -3,4 +3,6 @@
*.pyc
.eggs/
bubuku.egg-info/
.cache/
.cache/
build/
dist/
30 changes: 30 additions & 0 deletions Dockerfile
@@ -0,0 +1,30 @@
FROM registry.opensource.zalan.do/stups/python:3.5.1-23
MAINTAINER Team Aruha, team-aruha@zalando.de

ENV KAFKA_VERSION="0.9.0.1" SCALA_VERSION="2.11" JOLOKIA_VERSION="1.3.3"
ENV KAFKA_DIR="/opt/kafka"

RUN apt-get update && apt-get install wget openjdk-8-jre -y --force-yes && apt-get clean
ADD download_kafka.sh /tmp/download_kafka.sh

RUN sh /tmp/download_kafka.sh ${SCALA_VERSION} ${KAFKA_VERSION} ${KAFKA_DIR}

ADD server.properties ${KAFKA_DIR}/config/

RUN wget -O /tmp/jolokia-jvm-agent.jar http://search.maven.org/remotecontent?filepath=org/jolokia/jolokia-jvm/$JOLOKIA_VERSION/jolokia-jvm-$JOLOKIA_VERSION-agent.jar

ENV KAFKA_OPTS="-server -Dlog4j.configuration=file:${KAFKA_DIR}/config/log4j.properties -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=32M -javaagent:/tmp/jolokia-jvm-agent.jar=host=0.0.0.0"
ENV KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

RUN mkdir -p $KAFKA_DIR/logs/

ENV KAFKA_SETTINGS="${KAFKA_DIR}/config/server.properties"
ADD server.properties ${KAFKA_SETTINGS}
ADD log4j.properties ${KAFKA_DIR}/config/

RUN mkdir /bubuku/
WORKDIR /bubuku/
ADD ./ /bubuku/
RUN pip install --no-cache-dir -r requirements.txt

CMD python3 setup.py develop && bubuku
23 changes: 11 additions & 12 deletions bubuku/cli.py
Expand Up @@ -2,21 +2,20 @@

import click

from bubuku.amazon import Amazon
from bubuku.config import load_config, KafkaProperties, Config
from bubuku.env_provider import EnvProvider
from bubuku.features.remote_exec import RemoteCommandExecutorCheck
from bubuku.id_generator import get_broker_id_policy
from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor
from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider

_LOG = logging.getLogger('bubuku.cli')


def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, amazon: Amazon) -> str:
def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, env_provider: EnvProvider) -> str:
if not broker_id:
kafka_properties = KafkaProperties(config.kafka_settings_template, '/tmp/tmp.props'.format(config.kafka_dir))
broker_id_manager = get_broker_id_policy(config.id_policy, zk, kafka_properties, amazon)
broker_id = broker_id_manager.get_broker_id()
broker_id_manager = get_broker_id_policy(config.id_policy, zk, kafka_properties, env_provider)
broker_id = broker_id_manager.detect_broker_id()
_LOG.info('Will use broker_id {}'.format(broker_id))
running_brokers = zk.get_broker_ids()
if broker_id not in running_brokers:
Expand All @@ -27,9 +26,9 @@ def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, amazo
def __prepare_configs():
config = load_config()
_LOG.info('Using config: {}'.format(config))
amazon = Amazon()
zookeeper = load_exhibitor_proxy(AWSExhibitorAddressProvider(amazon, config.zk_stack_name), config.zk_prefix)
return config, amazon, zookeeper
env_provider = EnvProvider.create_env_provider(config.development_mode)
zookeeper = load_exhibitor_proxy(env_provider.get_address_provider(config), config.zk_prefix)
return config, env_provider, zookeeper


logging.basicConfig(level=getattr(logging, 'INFO', None))
Expand All @@ -44,17 +43,17 @@ def cli():
@click.option('--broker', type=click.STRING,
help='Broker id to restart. By default current broker id is restarted')
def restart_broker(broker: str):
config, amazon, zookeeper = __prepare_configs()
broker_id = __get_opt_broker_id(broker, config, zookeeper, amazon)
config, env_provider, zookeeper = __prepare_configs()
broker_id = __get_opt_broker_id(broker, config, zookeeper, env_provider)
RemoteCommandExecutorCheck.register_restart(zookeeper, broker_id)


@cli.command('rebalance')
@click.option('--broker', type=click.STRING,
help="Broker instance on which to perform rebalance. By default, any free broker will start it")
def rebalance_partitions(broker: str):
config, amazon, zookeeper = __prepare_configs()
broker_id = __get_opt_broker_id(broker, config, zookeeper, amazon) if broker else None
config, env_provider, zookeeper = __prepare_configs()
broker_id = __get_opt_broker_id(broker, config, zookeeper, env_provider) if broker else None
RemoteCommandExecutorCheck.register_rebalance(zookeeper, broker_id)


Expand Down
5 changes: 3 additions & 2 deletions bubuku/config.py
Expand Up @@ -5,7 +5,7 @@
_LOG = logging.getLogger('bubuku.properties')

Config = namedtuple('Config', ('kafka_dir', 'kafka_settings_template', 'zk_stack_name',
'zk_prefix', 'id_policy', 'features', 'health_port'))
'zk_prefix', 'id_policy', 'features', 'health_port', 'development_mode'))


class KafkaProperties(object):
Expand Down Expand Up @@ -63,7 +63,8 @@ def load_config() -> Config:
zk_prefix=zk_prefix if zk_prefix.startswith('/') or not zk_prefix else '/{}'.format(zk_prefix),
id_policy=os.getenv('BROKER_ID_POLICY', 'ip').lower(),
features=features,
health_port=int(os.getenv('HEALTH_PORT', '8888'))
health_port=int(os.getenv('HEALTH_PORT', '8888')),
development_mode=bool(os.getenv('DEVELOPMENT_MODE', 'False'))
)


Expand Down
7 changes: 4 additions & 3 deletions bubuku/controller.py
Expand Up @@ -3,6 +3,7 @@

from bubuku.amazon import Amazon
from bubuku.broker import BrokerManager
from bubuku.env_provider import EnvProvider
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.controller')
Expand Down Expand Up @@ -49,10 +50,10 @@ def _exclude_self(ip, name, running_actions):


class Controller(object):
def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, amazon: Amazon):
def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provider: EnvProvider):
self.broker_manager = broker_manager
self.zk = zk
self.amazon = amazon
self.env_provider = env_provider
self.checks = []
self.changes = {} # Holds mapping from change name to array of pending changes
self.running = True
Expand Down Expand Up @@ -112,7 +113,7 @@ def _release_changes_lock(self, changes_to_remove):
self.zk.unregister_change(name)

def loop(self):
ip = self.amazon.get_own_ip()
ip = self.env_provider.get_own_ip()

while self.running or self.changes:
self.make_step(ip)
Expand Down
19 changes: 8 additions & 11 deletions bubuku/daemon.py
Expand Up @@ -4,7 +4,6 @@
import logging

from bubuku import health
from bubuku.amazon import Amazon
from bubuku.broker import BrokerManager
from bubuku.config import load_config, KafkaProperties
from bubuku.controller import Controller
Expand All @@ -18,13 +17,12 @@
from bubuku.id_generator import get_broker_id_policy
from bubuku.utils import CmdHelper
from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy
from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider

from bubuku.env_provider import EnvProvider
_LOG = logging.getLogger('bubuku.main')


def apply_features(features: dict, controller: Controller, buku_proxy: BukuExhibitor, broker: BrokerManager,
kafka_properties: KafkaProperties, amazon: Amazon) -> list:
kafka_properties: KafkaProperties, env_provider: EnvProvider) -> list:
for feature, config in features.items():
if feature == 'restart_on_exhibitor':
controller.add_check(CheckExhibitorAddressChanged(buku_proxy, broker))
Expand All @@ -39,7 +37,7 @@ def apply_features(features: dict, controller: Controller, buku_proxy: BukuExhib
elif feature == 'graceful_terminate':
register_terminate_on_interrupt(controller, broker)
elif feature == 'use_ip_address':
kafka_properties.set_property('advertised.host.name', amazon.get_own_ip())
kafka_properties.set_property('advertised.host.name', env_provider.get_own_ip())
else:
_LOG.error('Using of unsupported feature "{}", skipping it'.format(feature))

Expand All @@ -52,25 +50,24 @@ def main():
kafka_properties = KafkaProperties(config.kafka_settings_template,
'{}/config/server.properties'.format(config.kafka_dir))

amazon = Amazon()

address_provider = AWSExhibitorAddressProvider(amazon, config.zk_stack_name)
env_provider = EnvProvider.create_env_provider(config.development_mode)
address_provider = env_provider.get_address_provider(config.zk_stack_name)

_LOG.info("Loading exhibitor configuration")
buku_proxy = load_exhibitor_proxy(address_provider, config.zk_prefix)

_LOG.info("Loading broker_id policy")
broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon)
broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, env_provider)

_LOG.info("Building broker manager")
broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties)

_LOG.info("Creating controller")
controller = Controller(broker, buku_proxy, amazon)
controller = Controller(broker, buku_proxy, env_provider)

controller.add_check(CheckBrokerStopped(broker, buku_proxy))
controller.add_check(RemoteCommandExecutorCheck(buku_proxy, broker))
apply_features(config.features, controller, buku_proxy, broker, kafka_properties, amazon)
apply_features(config.features, controller, buku_proxy, broker, kafka_properties, env_provider)

_LOG.info('Starting health server')
health.start_server(config.health_port)
Expand Down
85 changes: 85 additions & 0 deletions bubuku/env_provider.py
@@ -0,0 +1,85 @@
import json
import logging

import boto3
import requests

from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider
from bubuku.zookeeper.exhibior import LocalAddressProvider
from bubuku.config import Config

_LOG = logging.getLogger('bubuku.amazon')


class EnvProvider(object):
def get_own_ip(self) -> str:
raise NotImplementedError('Not implemented')

def get_address_provider(self, config: Config):
raise NotImplementedError('Not implemented')

@staticmethod
def create_env_provider(dev_mode: bool):
return LocalEnvProvider() if dev_mode else AmazonEnvProvider()


class AmazonEnvProvider(EnvProvider):
NONE = object()

def __init__(self):
self.document = None
self.aws_addr = '169.254.169.254'

def _get_document(self) -> dict:
if not self.document:
try:
self.document = requests.get(
'http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr),
timeout=5).json()
_LOG.info("Amazon specific information loaded from AWS: {}".format(
json.dumps(self.document, indent=2)))
except Exception as ex:
_LOG.warn('Failed to download AWS document', exc_info=ex)
self.document = AmazonEnvProvider.NONE
return self.document if self.document != AmazonEnvProvider.NONE else None

def get_aws_region(self) -> str:
doc = self._get_document()
return doc['region'] if doc else None

def get_own_ip(self) -> str:
doc = self._get_document()
return doc['privateIp'] if doc else '127.0.0.1'

def get_addresses_by_lb_name(self, lb_name) -> list:
region = self.get_aws_region()

private_ips = []

if region is not None:
elb = boto3.client('elb', region_name=region)
ec2 = boto3.client('ec2', region_name=region)

response = elb.describe_instance_health(LoadBalancerName=lb_name)

for instance in response['InstanceStates']:
if instance['State'] == 'InService':
private_ips.append(ec2.describe_instances(
InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress'])

else:
private_ips = [lb_name]
_LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips))
return private_ips

def get_address_provider(self, config: Config):
return AWSExhibitorAddressProvider(self, config.zk_stack_name)


class LocalEnvProvider(EnvProvider):
def get_own_ip(self) -> str:
return '127.0.0.1'

def get_address_provider(self, config: Config):
return LocalAddressProvider()

2 changes: 1 addition & 1 deletion bubuku/features/remote_exec.py
Expand Up @@ -17,7 +17,7 @@ def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager):

def check(self) -> Change:
with self.zk.lock():
data = self.zk.take_action(self.broker_manager.id_manager.get_broker_id())
data = self.zk.take_action(self.broker_manager.id_manager.detect_broker_id())
if not data:
return None
if 'name' not in data:
Expand Down
25 changes: 18 additions & 7 deletions bubuku/id_generator.py
Expand Up @@ -5,8 +5,8 @@
import re
from time import sleep, time

from bubuku.amazon import Amazon
from bubuku.config import KafkaProperties
from bubuku.env_provider import EnvProvider
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.id_generator')
Expand All @@ -16,6 +16,9 @@ class BrokerIdGenerator(object):
def get_broker_id(self) -> str:
raise NotImplementedError('Not implemented')

def detect_broker_id(self):
raise NotImplementedError('Not implemented')

def wait_for_broker_id_absence(self):
while self.is_registered():
sleep(1)
Expand Down Expand Up @@ -58,6 +61,9 @@ def __init__(self, zk: BukuExhibitor, ip: str, kafka_props: KafkaProperties):
def get_broker_id(self):
return self.broker_id

def detect_broker_id(self):
return self.broker_id

def is_registered(self):
return self.zk.is_broker_registered(self.broker_id)

Expand All @@ -72,22 +78,27 @@ def __init__(self, zk: BukuExhibitor, kafka_properties: KafkaProperties):
def get_broker_id(self):
return None

def is_registered(self):
def detect_broker_id(self):
meta_path = '{}/meta.properties'.format(self.kafka_properties.get_property('log.dirs'))
while not os.path.isfile(meta_path):
return False
return None
with open(meta_path) as f:
lines = f.readlines()
for line in lines:
match = re.search('broker\.id=(\d+)', line)
if match:
return self.zk.is_broker_registered(match.group(1))
return False
return match.group(1)
return None

def is_registered(self):
broker_id = self.detect_broker_id()
return self.zk.is_broker_registered(broker_id)


def get_broker_id_policy(policy: str, zk: BukuExhibitor, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator:
def get_broker_id_policy(policy: str, zk: BukuExhibitor,
kafka_props: KafkaProperties, env_provider: EnvProvider) -> BrokerIdGenerator:
if policy == 'ip':
return BrokerIDByIp(zk, amazon.get_own_ip(), kafka_props)
return BrokerIDByIp(zk, env_provider.get_own_ip(), kafka_props)
elif policy == 'auto':
return BrokerIdAutoAssign(zk, kafka_props)
else:
Expand Down
7 changes: 6 additions & 1 deletion bubuku/zookeeper/exhibior.py
Expand Up @@ -38,4 +38,9 @@ def _query_exhibitors(self, exhibitors):
return None

def query_from_amazon(self):
return self.amazon.get_addresses_by_lb_name(self.zk_stack_name)
return self.amazon.get_addresses_by_lb_name(self.zk_stack_name)


class LocalAddressProvider(AddressListProvider):
def get_latest_address(self) -> (list, int):
return ('zookeeper',), 2181

0 comments on commit 47d483d

Please sign in to comment.