diff --git a/.gitignore b/.gitignore index 3802d8f..9335683 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ *.pyc .eggs/ bubuku.egg-info/ -.cache/ \ No newline at end of file +.cache/ +build/ +dist/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..69f7b98 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +FROM registry.opensource.zalan.do/stups/python:3.5.1-23 +MAINTAINER Team Aruha, team-aruha@zalando.de + +ENV KAFKA_VERSION="0.9.0.1" SCALA_VERSION="2.11" JOLOKIA_VERSION="1.3.3" +ENV KAFKA_DIR="/opt/kafka" + +RUN apt-get update && apt-get install wget openjdk-8-jre -y --force-yes && apt-get clean +ADD download_kafka.sh /tmp/download_kafka.sh + +RUN sh /tmp/download_kafka.sh ${SCALA_VERSION} ${KAFKA_VERSION} ${KAFKA_DIR} + +ADD server.properties ${KAFKA_DIR}/config/ + +RUN wget -O /tmp/jolokia-jvm-agent.jar http://search.maven.org/remotecontent?filepath=org/jolokia/jolokia-jvm/$JOLOKIA_VERSION/jolokia-jvm-$JOLOKIA_VERSION-agent.jar + +ENV KAFKA_OPTS="-server -Dlog4j.configuration=file:${KAFKA_DIR}/config/log4j.properties -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=32M -javaagent:/tmp/jolokia-jvm-agent.jar=host=0.0.0.0" +ENV KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" + +RUN mkdir -p $KAFKA_DIR/logs/ + +ENV KAFKA_SETTINGS="${KAFKA_DIR}/config/server.properties" +ADD server.properties ${KAFKA_SETTINGS} +ADD log4j.properties ${KAFKA_DIR}/config/ + +RUN mkdir /bubuku/ +WORKDIR /bubuku/ +ADD ./ /bubuku/ +RUN pip install --no-cache-dir -r requirements.txt + +CMD python3 setup.py develop && bubuku \ No newline at end of file diff --git a/bubuku/cli.py b/bubuku/cli.py index 6601854..bd401fc 100644 --- a/bubuku/cli.py +++ b/bubuku/cli.py @@ -2,21 +2,20 @@ import click -from bubuku.amazon import Amazon from bubuku.config import load_config, KafkaProperties, Config +from bubuku.env_provider import EnvProvider from bubuku.features.remote_exec import RemoteCommandExecutorCheck from bubuku.id_generator import get_broker_id_policy from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor -from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider _LOG = logging.getLogger('bubuku.cli') -def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, amazon: Amazon) -> str: +def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, env_provider: EnvProvider) -> str: if not broker_id: kafka_properties = KafkaProperties(config.kafka_settings_template, '/tmp/tmp.props'.format(config.kafka_dir)) - broker_id_manager = get_broker_id_policy(config.id_policy, zk, kafka_properties, amazon) - broker_id = broker_id_manager.get_broker_id() + broker_id_manager = get_broker_id_policy(config.id_policy, zk, kafka_properties, env_provider) + broker_id = broker_id_manager.detect_broker_id() _LOG.info('Will use broker_id {}'.format(broker_id)) running_brokers = zk.get_broker_ids() if broker_id not in running_brokers: @@ -27,9 +26,9 @@ def __get_opt_broker_id(broker_id: str, config: Config, zk: BukuExhibitor, amazo def __prepare_configs(): config = load_config() _LOG.info('Using config: {}'.format(config)) - amazon = Amazon() - zookeeper = load_exhibitor_proxy(AWSExhibitorAddressProvider(amazon, config.zk_stack_name), config.zk_prefix) - return config, amazon, zookeeper + env_provider = EnvProvider.create_env_provider(config.development_mode) + zookeeper = load_exhibitor_proxy(env_provider.get_address_provider(config), config.zk_prefix) + return config, env_provider, zookeeper logging.basicConfig(level=getattr(logging, 'INFO', None)) @@ -44,8 +43,8 @@ def cli(): @click.option('--broker', type=click.STRING, help='Broker id to restart. By default current broker id is restarted') def restart_broker(broker: str): - config, amazon, zookeeper = __prepare_configs() - broker_id = __get_opt_broker_id(broker, config, zookeeper, amazon) + config, env_provider, zookeeper = __prepare_configs() + broker_id = __get_opt_broker_id(broker, config, zookeeper, env_provider) RemoteCommandExecutorCheck.register_restart(zookeeper, broker_id) @@ -53,8 +52,8 @@ def restart_broker(broker: str): @click.option('--broker', type=click.STRING, help="Broker instance on which to perform rebalance. By default, any free broker will start it") def rebalance_partitions(broker: str): - config, amazon, zookeeper = __prepare_configs() - broker_id = __get_opt_broker_id(broker, config, zookeeper, amazon) if broker else None + config, env_provider, zookeeper = __prepare_configs() + broker_id = __get_opt_broker_id(broker, config, zookeeper, env_provider) if broker else None RemoteCommandExecutorCheck.register_rebalance(zookeeper, broker_id) diff --git a/bubuku/config.py b/bubuku/config.py index c974646..1199bfa 100644 --- a/bubuku/config.py +++ b/bubuku/config.py @@ -5,7 +5,7 @@ _LOG = logging.getLogger('bubuku.properties') Config = namedtuple('Config', ('kafka_dir', 'kafka_settings_template', 'zk_stack_name', - 'zk_prefix', 'id_policy', 'features', 'health_port')) + 'zk_prefix', 'id_policy', 'features', 'health_port', 'development_mode')) class KafkaProperties(object): @@ -63,7 +63,8 @@ def load_config() -> Config: zk_prefix=zk_prefix if zk_prefix.startswith('/') or not zk_prefix else '/{}'.format(zk_prefix), id_policy=os.getenv('BROKER_ID_POLICY', 'ip').lower(), features=features, - health_port=int(os.getenv('HEALTH_PORT', '8888')) + health_port=int(os.getenv('HEALTH_PORT', '8888')), + development_mode=bool(os.getenv('DEVELOPMENT_MODE', 'False')) ) diff --git a/bubuku/controller.py b/bubuku/controller.py index ef97439..61483e2 100644 --- a/bubuku/controller.py +++ b/bubuku/controller.py @@ -3,6 +3,7 @@ from bubuku.amazon import Amazon from bubuku.broker import BrokerManager +from bubuku.env_provider import EnvProvider from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.controller') @@ -49,10 +50,10 @@ def _exclude_self(ip, name, running_actions): class Controller(object): - def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, amazon: Amazon): + def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provider: EnvProvider): self.broker_manager = broker_manager self.zk = zk - self.amazon = amazon + self.env_provider = env_provider self.checks = [] self.changes = {} # Holds mapping from change name to array of pending changes self.running = True @@ -112,7 +113,7 @@ def _release_changes_lock(self, changes_to_remove): self.zk.unregister_change(name) def loop(self): - ip = self.amazon.get_own_ip() + ip = self.env_provider.get_own_ip() while self.running or self.changes: self.make_step(ip) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index 7eed8b7..0db801d 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -4,7 +4,6 @@ import logging from bubuku import health -from bubuku.amazon import Amazon from bubuku.broker import BrokerManager from bubuku.config import load_config, KafkaProperties from bubuku.controller import Controller @@ -18,13 +17,12 @@ from bubuku.id_generator import get_broker_id_policy from bubuku.utils import CmdHelper from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy -from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider - +from bubuku.env_provider import EnvProvider _LOG = logging.getLogger('bubuku.main') def apply_features(features: dict, controller: Controller, buku_proxy: BukuExhibitor, broker: BrokerManager, - kafka_properties: KafkaProperties, amazon: Amazon) -> list: + kafka_properties: KafkaProperties, env_provider: EnvProvider) -> list: for feature, config in features.items(): if feature == 'restart_on_exhibitor': controller.add_check(CheckExhibitorAddressChanged(buku_proxy, broker)) @@ -39,7 +37,7 @@ def apply_features(features: dict, controller: Controller, buku_proxy: BukuExhib elif feature == 'graceful_terminate': register_terminate_on_interrupt(controller, broker) elif feature == 'use_ip_address': - kafka_properties.set_property('advertised.host.name', amazon.get_own_ip()) + kafka_properties.set_property('advertised.host.name', env_provider.get_own_ip()) else: _LOG.error('Using of unsupported feature "{}", skipping it'.format(feature)) @@ -52,25 +50,24 @@ def main(): kafka_properties = KafkaProperties(config.kafka_settings_template, '{}/config/server.properties'.format(config.kafka_dir)) - amazon = Amazon() - - address_provider = AWSExhibitorAddressProvider(amazon, config.zk_stack_name) + env_provider = EnvProvider.create_env_provider(config.development_mode) + address_provider = env_provider.get_address_provider(config.zk_stack_name) _LOG.info("Loading exhibitor configuration") buku_proxy = load_exhibitor_proxy(address_provider, config.zk_prefix) _LOG.info("Loading broker_id policy") - broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon) + broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, env_provider) _LOG.info("Building broker manager") broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties) _LOG.info("Creating controller") - controller = Controller(broker, buku_proxy, amazon) + controller = Controller(broker, buku_proxy, env_provider) controller.add_check(CheckBrokerStopped(broker, buku_proxy)) controller.add_check(RemoteCommandExecutorCheck(buku_proxy, broker)) - apply_features(config.features, controller, buku_proxy, broker, kafka_properties, amazon) + apply_features(config.features, controller, buku_proxy, broker, kafka_properties, env_provider) _LOG.info('Starting health server') health.start_server(config.health_port) diff --git a/bubuku/env_provider.py b/bubuku/env_provider.py new file mode 100644 index 0000000..1f31b0f --- /dev/null +++ b/bubuku/env_provider.py @@ -0,0 +1,85 @@ +import json +import logging + +import boto3 +import requests + +from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider +from bubuku.zookeeper.exhibior import LocalAddressProvider +from bubuku.config import Config + +_LOG = logging.getLogger('bubuku.amazon') + + +class EnvProvider(object): + def get_own_ip(self) -> str: + raise NotImplementedError('Not implemented') + + def get_address_provider(self, config: Config): + raise NotImplementedError('Not implemented') + + @staticmethod + def create_env_provider(dev_mode: bool): + return LocalEnvProvider() if dev_mode else AmazonEnvProvider() + + +class AmazonEnvProvider(EnvProvider): + NONE = object() + + def __init__(self): + self.document = None + self.aws_addr = '169.254.169.254' + + def _get_document(self) -> dict: + if not self.document: + try: + self.document = requests.get( + 'http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr), + timeout=5).json() + _LOG.info("Amazon specific information loaded from AWS: {}".format( + json.dumps(self.document, indent=2))) + except Exception as ex: + _LOG.warn('Failed to download AWS document', exc_info=ex) + self.document = AmazonEnvProvider.NONE + return self.document if self.document != AmazonEnvProvider.NONE else None + + def get_aws_region(self) -> str: + doc = self._get_document() + return doc['region'] if doc else None + + def get_own_ip(self) -> str: + doc = self._get_document() + return doc['privateIp'] if doc else '127.0.0.1' + + def get_addresses_by_lb_name(self, lb_name) -> list: + region = self.get_aws_region() + + private_ips = [] + + if region is not None: + elb = boto3.client('elb', region_name=region) + ec2 = boto3.client('ec2', region_name=region) + + response = elb.describe_instance_health(LoadBalancerName=lb_name) + + for instance in response['InstanceStates']: + if instance['State'] == 'InService': + private_ips.append(ec2.describe_instances( + InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress']) + + else: + private_ips = [lb_name] + _LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips)) + return private_ips + + def get_address_provider(self, config: Config): + return AWSExhibitorAddressProvider(self, config.zk_stack_name) + + +class LocalEnvProvider(EnvProvider): + def get_own_ip(self) -> str: + return '127.0.0.1' + + def get_address_provider(self, config: Config): + return LocalAddressProvider() + diff --git a/bubuku/features/remote_exec.py b/bubuku/features/remote_exec.py index 4437706..1b11bc3 100644 --- a/bubuku/features/remote_exec.py +++ b/bubuku/features/remote_exec.py @@ -17,7 +17,7 @@ def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager): def check(self) -> Change: with self.zk.lock(): - data = self.zk.take_action(self.broker_manager.id_manager.get_broker_id()) + data = self.zk.take_action(self.broker_manager.id_manager.detect_broker_id()) if not data: return None if 'name' not in data: diff --git a/bubuku/id_generator.py b/bubuku/id_generator.py index 75e2825..231e83e 100755 --- a/bubuku/id_generator.py +++ b/bubuku/id_generator.py @@ -5,8 +5,8 @@ import re from time import sleep, time -from bubuku.amazon import Amazon from bubuku.config import KafkaProperties +from bubuku.env_provider import EnvProvider from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.id_generator') @@ -16,6 +16,9 @@ class BrokerIdGenerator(object): def get_broker_id(self) -> str: raise NotImplementedError('Not implemented') + def detect_broker_id(self): + raise NotImplementedError('Not implemented') + def wait_for_broker_id_absence(self): while self.is_registered(): sleep(1) @@ -58,6 +61,9 @@ def __init__(self, zk: BukuExhibitor, ip: str, kafka_props: KafkaProperties): def get_broker_id(self): return self.broker_id + def detect_broker_id(self): + return self.broker_id + def is_registered(self): return self.zk.is_broker_registered(self.broker_id) @@ -72,22 +78,27 @@ def __init__(self, zk: BukuExhibitor, kafka_properties: KafkaProperties): def get_broker_id(self): return None - def is_registered(self): + def detect_broker_id(self): meta_path = '{}/meta.properties'.format(self.kafka_properties.get_property('log.dirs')) while not os.path.isfile(meta_path): - return False + return None with open(meta_path) as f: lines = f.readlines() for line in lines: match = re.search('broker\.id=(\d+)', line) if match: - return self.zk.is_broker_registered(match.group(1)) - return False + return match.group(1) + return None + + def is_registered(self): + broker_id = self.detect_broker_id() + return self.zk.is_broker_registered(broker_id) -def get_broker_id_policy(policy: str, zk: BukuExhibitor, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator: +def get_broker_id_policy(policy: str, zk: BukuExhibitor, + kafka_props: KafkaProperties, env_provider: EnvProvider) -> BrokerIdGenerator: if policy == 'ip': - return BrokerIDByIp(zk, amazon.get_own_ip(), kafka_props) + return BrokerIDByIp(zk, env_provider.get_own_ip(), kafka_props) elif policy == 'auto': return BrokerIdAutoAssign(zk, kafka_props) else: diff --git a/bubuku/zookeeper/exhibior.py b/bubuku/zookeeper/exhibior.py index 14e8bb2..dd104f0 100644 --- a/bubuku/zookeeper/exhibior.py +++ b/bubuku/zookeeper/exhibior.py @@ -38,4 +38,9 @@ def _query_exhibitors(self, exhibitors): return None def query_from_amazon(self): - return self.amazon.get_addresses_by_lb_name(self.zk_stack_name) \ No newline at end of file + return self.amazon.get_addresses_by_lb_name(self.zk_stack_name) + + +class LocalAddressProvider(AddressListProvider): + def get_latest_address(self) -> (list, int): + return ('zookeeper',), 2181 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..aa4c5b7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: '2' +services: + + bubuku: + build: . + depends_on: + - zookeeper + environment: + BROKER_ID_POLICY: "auto" + DEVELOPMENT_MODE: "True" + HEALTH_PORT: "8080" + BUKU_FEATURES: "restart_on_exhibitor,rebalance_on_start,graceful_terminate,use_ip_address" + + zookeeper: + image: wurstmeister/zookeeper:3.4.6 diff --git a/download_kafka.sh b/download_kafka.sh new file mode 100644 index 0000000..07aeaf9 --- /dev/null +++ b/download_kafka.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +SCALA_VERSION=${1} +KAFKA_VERSION=${2} +KAFKA_DIR=${3} + +mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | sed -rn 's/.*"preferred":.*"(.*)"/\1/p') +url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" + + +wget "${url}" -O "/tmp/kafka_release.tgz" +tar xf /tmp/kafka_release.tgz -C /opt +rm -rf /tmp/kafka_release.tgz +mv /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} $KAFKA_DIR diff --git a/log4j.properties b/log4j.properties new file mode 100644 index 0000000..22f5b98 --- /dev/null +++ b/log4j.properties @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=logs + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.MaxFileSize=64MB +log4j.appender.kafkaAppender.MaxBackupIndex=10 +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=%p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.MaxFileSize=32MB +log4j.appender.stateChangeAppender.MaxBackupIndex=10 +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=%p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.MaxFileSize=32MB +log4j.appender.requestAppender.MaxBackupIndex=10 +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=%p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log +log4j.appender.cleanerAppender.MaxFileSize=32MB +log4j.appender.cleanerAppender.MaxBackupIndex=20 +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=%p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.MaxFileSize=32MB +log4j.appender.controllerAppender.MaxBackupIndex=20 +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=%p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false \ No newline at end of file diff --git a/server.properties b/server.properties new file mode 100644 index 0000000..517de0f --- /dev/null +++ b/server.properties @@ -0,0 +1,51 @@ +log.dirs=/data/kafka-logs +port=9092 +auto.create.topics.enable=false +delete.topic.enable=true +auto.leader.rebalance.enable=true +leader.imbalance.check.interval.seconds=100 +unclean.leader.election.enable=true +min.insync.replicas=2 +reserved.broker.max.id=67108864 +### from http://kafka.apache.org/documentation.html#prodconfig + +# Replication configurations +num.replica.fetchers=4 +replica.fetch.max.bytes=1048576 +replica.fetch.wait.max.ms=500 +replica.high.watermark.checkpoint.interval.ms=5000 +replica.socket.timeout.ms=30000 +replica.socket.receive.buffer.bytes=65536 +replica.lag.time.max.ms=10000 +replica.lag.max.messages=4000 + +controller.socket.timeout.ms=30000 +controller.message.queue.size=10 + +# Log configuration +#num.partitions=8 +#message.max.bytes=1000000 +#auto.create.topics.enable=true +log.index.interval.bytes=4096 +log.index.size.max.bytes=10485760 +log.retention.hours=168 +log.flush.interval.ms=10000 +log.flush.interval.messages=20000 +log.flush.scheduler.interval.ms=2000 +log.roll.hours=168 +log.retention.check.interval.ms=300000 +log.segment.bytes=1073741824 + +# ZK configuration +zookeeper.connection.timeout.ms=6000 +zookeeper.sync.time.ms=2000 + +# Socket server configuration +num.io.threads=8 +num.network.threads=8 +socket.request.max.bytes=104857600 +socket.receive.buffer.bytes=1048576 +socket.send.buffer.bytes=1048576 +queued.max.requests=16 +fetch.purgatory.purge.interval.requests=100 +producer.purgatory.purge.interval.requests=100 diff --git a/setup.py b/setup.py index f5a5abd..1b5cbe2 100755 --- a/setup.py +++ b/setup.py @@ -8,6 +8,7 @@ import setuptools from setuptools import setup from setuptools.command.test import test +from distutils.core import Command if sys.version_info < (3, 4, 0): sys.stderr.write('FATAL: Bubuku needs to be run with Python 3.4+\n') @@ -53,6 +54,36 @@ def read_version(package): ] +class DockerUpCommand(Command): + description = "Start up docker compose with 3 bubuku and 1 zookeeper instances" + user_options = [ + ('bubuku-scale=', None, 'Specify number of bubuku instances') + ] + + def initialize_options(self): + self.bubuku_scale = 2 + + def finalize_options(self): + pass + + def run(self): + os.system('docker-compose up -d --build && docker-compose scale bubuku=' + str(self.bubuku_scale)) + + +class DockerDownCommand(Command): + description = "Stop docker compose" + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + os.system('docker-compose down') + + class PyTest(test): def run_tests(self): try: @@ -85,12 +116,13 @@ def setup_package(): test_suite='tests', packages=setuptools.find_packages(exclude=['tests', 'tests.*']), install_requires=[req for req in read('requirements.txt').split('\\n') if req != ''], - cmdclass={'test': PyTest}, + cmdclass={'test': PyTest, 'docker_up': DockerUpCommand, 'docker_down': DockerDownCommand}, tests_require=['pytest-cov', 'pytest'], command_options=command_options, - entry_points={'console_scripts': CONSOLE_SCRIPTS}, + entry_points={ + 'console_scripts': CONSOLE_SCRIPTS, + }, ) - if __name__ == '__main__': setup_package()