diff --git a/.travis.yml b/.travis.yml index 28dabe3..743b88c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,9 +7,7 @@ services: - docker env: - # - MONGODB_VER=mongodb-linux-x86_64-2.6.12 - # - MONGODB_VER=mongodb-linux-x86_64-3.4.16 - - MONGODB_VER=mongodb-linux-x86_64-3.6.2 + - MONGODB_VER=mongodb-linux-x86_64-3.6.2 KAFKA_VER=2.1.0 KAFKA_SCALA_VER=2.12 KAFKA_FILE=kafka_$KAFKA_SCALA_VER-$KAFKA_VER before_install: - sudo apt-get -qq update @@ -24,8 +22,12 @@ script: - tar xfz $MONGODB_VER.tgz - export MONGOD=`pwd`/$MONGODB_VER/bin/mongod - sed -i "s#^mongo-exe.*#mongo-exe=$MONGOD#" test/test.cfg + - wget http://mirrors.ocf.berkeley.edu/apache/kafka/$KAFKA_VER/$KAFKA_FILE.tgz + - tar xfz $KAFKA_FILE.tgz + - export KAFKA_PATH=`pwd`/$KAFKA_FILE + - sed -i "s#^kafka-path.*#kafka-path=$KAFKA_PATH#" test/test.cfg - cat test/test.cfg - make test after_script: - - coveralls \ No newline at end of file + - coveralls diff --git a/Makefile b/Makefile index ad30f89..203553e 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,9 @@ test: all # flake8 test pytest --verbose test --cov=feeds --cov-report html feeds -s +start_kafka_listener: + python -m feeds.kafka_listener + start: all gunicorn --worker-class gevent --timeout 300 --workers 5 --bind :5000 feeds.server:app diff --git a/deployment/conf/.templates/deploy.cfg.templ b/deployment/conf/.templates/deploy.cfg.templ index 8923c68..540ef08 100644 --- a/deployment/conf/.templates/deploy.cfg.templ +++ b/deployment/conf/.templates/deploy.cfg.templ @@ -38,3 +38,10 @@ workspace-url = {{ default .Env.workspace_url "https://ci.kbase.us/services/ws" groups-url = {{ default .Env.groups_url "https://ci.kbase.us/services/groups" }} njs-url = {{ default .Env.njs_url "https://ci.kbase.us/services/njs_wrapper" }} nms-url = {{ default .Env.nms_url "https://ci.kbase.us/services/narrative_method_store/rpc" }} + +[kafka] +host = {{ default .Env.kafka_host "localhost:9092" }} + +# Topics is a comma-separated list. Probably just one to start with, though. +topics = {{ default .Env.kafka_topics "feeds" }} +group-id == {{ default .Env.kafka_group_id "feeds_group" }} diff --git a/deployment/deploy.cfg.example b/deployment/deploy.cfg.example index e5a8697..7608d48 100644 --- a/deployment/deploy.cfg.example +++ b/deployment/deploy.cfg.example @@ -39,3 +39,8 @@ default-note-count = 100 # Useful for testing, etc. # SET TO FALSE IN PRODUCTION! debug=False + +[kafka] +host=localhost:9092 +topics=feeds +group-id=feeds_group diff --git a/dev-requirements.txt b/dev-requirements.txt index 1940a0a..1f8025c 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -4,4 +4,5 @@ flake8==3.5.0 pytest==3.8.2 coveralls==1.5.1 requests-mock==1.5.2 -semver==2.8.1 \ No newline at end of file +semver==2.8.1 +jinja2==2.10 diff --git a/feeds/config.py b/feeds/config.py index f2c28f6..539f686 100644 --- a/feeds/config.py +++ b/feeds/config.py @@ -7,7 +7,7 @@ ENV_CONFIG_BACKUP = "KB_DEPLOYMENT_CONFIG" ENV_AUTH_TOKEN = "AUTH_TOKEN" -INI_SECTION = "feeds" +FEEDS_SECTION = "feeds" KEY_DB_HOST = "db-host" KEY_DB_PORT = "db-port" @@ -24,6 +24,68 @@ KEY_WS_URL = "workspace-url" KEY_NMS_URL = "nms-url" KEY_DEFAULT_COUNT = "default-note-count" +KEY_USE_KAFKA = "use-kafka" + +KAFKA_SECTION = "kafka" +KEY_KAFKA_HOST = "host" +KEY_KAFKA_TOPICS = "topics" +KEY_KAFKA_GROUP_ID = "group-id" + + +class ConfigFileLoader(object): + def __init__(self): + config_file = self._find_config_path() + self.cfg = self._load_config(config_file) + + def has_section(self, section: str) -> bool: + return self.cfg.has_section(section) + + def get_line(self, section: str, key: str, required: bool=True) -> str: + """ + A little wrapper that raises a ConfigError if a required key isn't present. + """ + val = None + try: + val = self.cfg.get(section, key) + except configparser.NoOptionError: + if required: + raise ConfigError("Required option {} not found in config section " + "{}".format(key, section)) + if not val and required: + raise ConfigError("Required option {} has no value!".format(key)) + return val + + def _find_config_path(self): + """ + A little helper to test whether a given file path, or one given by an + environment variable, exists. + """ + for env in [ENV_CONFIG_PATH, ENV_CONFIG_BACKUP]: + env_path = os.environ.get(env) + if env_path: + if not os.path.isfile(env_path): + raise ConfigError( + "Environment variable {} is set to {}, " + "which is not a config file.".format(ENV_CONFIG_PATH, env_path) + ) + else: + return env_path + if not os.path.isfile(DEFAULT_CONFIG_PATH): + raise ConfigError( + "Unable to find config file - can't start server. Either set the {} or {} " + "environment variable to a path, or copy 'deploy.cfg.example' to " + "'deploy.cfg'".format(ENV_CONFIG_PATH, ENV_CONFIG_BACKUP) + ) + return DEFAULT_CONFIG_PATH + + def _load_config(self, cfg_file): + config = configparser.ConfigParser() + with open(cfg_file, "r") as cfg: + try: + config.read_file(cfg) + except configparser.Error as e: + raise ConfigError("Error parsing config file {}: {}".format(cfg_file, e)) + return config class FeedsConfig(object): @@ -38,44 +100,43 @@ def __init__(self): self.auth_token = os.environ.get(ENV_AUTH_TOKEN) if self.auth_token is None: raise RuntimeError("The AUTH_TOKEN environment variable must be set!") - config_file = self._find_config_path() - cfg = self._load_config(config_file) - if not cfg.has_section(INI_SECTION): + cfg = ConfigFileLoader() + if not cfg.has_section(FEEDS_SECTION): raise ConfigError( - "Error parsing config file: section {} not found!".format(INI_SECTION) + "Error parsing config file: section {} not found!".format(FEEDS_SECTION) ) - self.db_engine = self._get_line(cfg, KEY_DB_ENGINE) - self.db_host = self._get_line(cfg, KEY_DB_HOST) + self.db_engine = cfg.get_line(FEEDS_SECTION, KEY_DB_ENGINE) + self.db_host = cfg.get_line(FEEDS_SECTION, KEY_DB_HOST) try: - self.db_port = self._get_line(cfg, KEY_DB_PORT) + self.db_port = cfg.get_line(FEEDS_SECTION, KEY_DB_PORT) self.db_port = int(self.db_port) assert self.db_port > 0 except (ValueError, AssertionError): raise ConfigError("{} must be an int > 0! Got {}".format(KEY_DB_PORT, self.db_port)) - self.db_user = self._get_line(cfg, KEY_DB_USER, required=False) - self.db_pw = self._get_line(cfg, KEY_DB_PW, required=False) - self.db_name = self._get_line(cfg, KEY_DB_NAME, required=False) - self.global_feed = self._get_line(cfg, KEY_GLOBAL_FEED) + self.db_user = cfg.get_line(FEEDS_SECTION, KEY_DB_USER, required=False) + self.db_pw = cfg.get_line(FEEDS_SECTION, KEY_DB_PW, required=False) + self.db_name = cfg.get_line(FEEDS_SECTION, KEY_DB_NAME, required=False) + self.global_feed = cfg.get_line(FEEDS_SECTION, KEY_GLOBAL_FEED) self.global_feed_type = "user" # doesn't matter, need a valid Entity type... try: - self.lifespan = self._get_line(cfg, KEY_LIFESPAN) + self.lifespan = cfg.get_line(FEEDS_SECTION, KEY_LIFESPAN) self.lifespan = int(self.lifespan) assert self.lifespan > 0 except (ValueError, AssertionError): raise ConfigError("{} must be an int > 0! Got {}".format(KEY_LIFESPAN, self.lifespan)) - self.debug = self._get_line(cfg, KEY_DEBUG, required=False) + self.debug = cfg.get_line(FEEDS_SECTION, KEY_DEBUG, required=False) if not self.debug or self.debug.lower() != "true": self.debug = False else: self.debug = True - self.auth_url = self._get_line(cfg, KEY_AUTH_URL) - self.njs_url = self._get_line(cfg, KEY_NJS_URL) - self.ws_url = self._get_line(cfg, KEY_WS_URL) - self.groups_url = self._get_line(cfg, KEY_GROUPS_URL) - self.nms_url = self._get_line(cfg, KEY_NMS_URL) - self.default_max_notes = self._get_line(cfg, KEY_DEFAULT_COUNT) + self.auth_url = cfg.get_line(FEEDS_SECTION, KEY_AUTH_URL) + self.njs_url = cfg.get_line(FEEDS_SECTION, KEY_NJS_URL) + self.ws_url = cfg.get_line(FEEDS_SECTION, KEY_WS_URL) + self.groups_url = cfg.get_line(FEEDS_SECTION, KEY_GROUPS_URL) + self.nms_url = cfg.get_line(FEEDS_SECTION, KEY_NMS_URL) + self.default_max_notes = cfg.get_line(FEEDS_SECTION, KEY_DEFAULT_COUNT) try: - self.default_max_notes = self._get_line(cfg, KEY_DEFAULT_COUNT) + self.default_max_notes = cfg.get_line(FEEDS_SECTION, KEY_DEFAULT_COUNT) self.default_max_notes = int(self.default_max_notes) assert self.default_max_notes > 0 except (ValueError, AssertionError): @@ -83,58 +144,32 @@ def __init__(self): "{} must be an int > 0! Got {}".format(KEY_DEFAULT_COUNT, self.default_max_notes) ) - def _find_config_path(self): - """ - A little helper to test whether a given file path, or one given by an - environment variable, exists. - """ - for env in [ENV_CONFIG_PATH, ENV_CONFIG_BACKUP]: - env_path = os.environ.get(env) - if env_path: - if not os.path.isfile(env_path): - raise ConfigError( - "Environment variable {} is set to {}, " - "which is not a config file.".format(ENV_CONFIG_PATH, env_path) - ) - else: - return env_path - if not os.path.isfile(DEFAULT_CONFIG_PATH): + +class KafkaConfig(object): + def __init__(self): + cfg = ConfigFileLoader() + if not cfg.has_section(KAFKA_SECTION): raise ConfigError( - "Unable to find config file - can't start server. Either set the {} or {} " - "environment variable to a path, or copy 'deploy.cfg.example' to " - "'deploy.cfg'".format(ENV_CONFIG_PATH, ENV_CONFIG_BACKUP) + "Error parsing config file: section {} not found!".format(KAFKA_SECTION) ) - return DEFAULT_CONFIG_PATH - - def _load_config(self, cfg_file): - config = configparser.ConfigParser() - with open(cfg_file, "r") as cfg: - try: - config.read_file(cfg) - except configparser.Error as e: - raise ConfigError("Error parsing config file {}: {}".format(cfg_file, e)) - return config - - def _get_line(self, config, key, required=True): - """ - A little wrapper that raises a ConfigError if a required key isn't present. - """ - val = None - try: - val = config.get(INI_SECTION, key) - except configparser.NoOptionError: - if required: - raise ConfigError("Required option {} not found in config".format(key)) - if not val and required: - raise ConfigError("Required option {} has no value!".format(key)) - return val + self.kafka_host = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_HOST) + self.kafka_group_id = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_GROUP_ID) + self.kafka_topics = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_TOPICS).split(",") __config = None +__kafka_config = None -def get_config(from_disk=False): +def get_config(): global __config if not __config: __config = FeedsConfig() return __config + + +def get_kafka_config(): + global __kafka_config + if not __kafka_config: + __kafka_config = KafkaConfig() + return __kafka_config diff --git a/feeds/kafka/__init__.py b/feeds/kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/feeds/kafka/consumer.py b/feeds/kafka/consumer.py new file mode 100644 index 0000000..dacc892 --- /dev/null +++ b/feeds/kafka/consumer.py @@ -0,0 +1,58 @@ +from kafka import KafkaConsumer +from kafka.consumer.fetcher import ConsumerRecord +from typing import List +import json +from feeds.api.util import parse_notification_params +from feeds.logger import ( + log, + log_error +) +from feeds.activity.notification import Notification +from feeds.managers.notification_manager import NotificationManager + + +class KafkaNotificationConsumer(object): + def __init__(self, server: str, topics: List[str], group_id: str) -> None: + self.server = server + self.topics = topics + self.group_id = group_id + self.consumer = KafkaConsumer(topics[0], + client_id="feeds-kakfa-consumer", + bootstrap_servers=[server], + consumer_timeout_ms=1000, + group_id=group_id, + enable_auto_commit=True, + auto_commit_interval_ms=1000, + auto_offset_reset="earliest") + + def poll(self) -> None: + for msg in self.consumer: + print(msg) + self._process_message(msg) + + def _process_message(self, message: ConsumerRecord) -> None: + try: + note_params = parse_notification_params(json.loads(message.value)) + # create a Notification from params. + new_note = Notification( + note_params.get('actor'), + note_params.get('verb'), + note_params.get('object'), + note_params.get('source'), + level=note_params.get('level'), + target=note_params.get('target', []), + context=note_params.get('context'), + expires=note_params.get('expires'), + external_key=note_params.get('external_key'), + users=note_params.get('users', []) + ) + # pass it to the NotificationManager to dole out to its audience feeds. + manager = NotificationManager() + manager.add_notification(new_note) + log(__name__, "Created notification from Kafka with id {}".format(new_note.id)) + except Exception as e: + log_error(__name__, e) + + def __str__(self): + return ("KafkaNotificationConsumer: host:{},group_id:{}," + "topics:{}".format(self.server, self.group_id, self.topics)) diff --git a/feeds/kafka/listener.py b/feeds/kafka/listener.py new file mode 100644 index 0000000..2d5b6ad --- /dev/null +++ b/feeds/kafka/listener.py @@ -0,0 +1,24 @@ +""" +Main controller class for the "app" that listens to Kafka. +In __init__, this sets up the environment and build the KafkaConsumer. +Activating it with start_listening() will run an endless loop that will +listen for notifications coming in from Kafka, and push them into +the Mongo feeds db by the same mechanism as the main server. +""" + +from feeds.config import get_kafka_config +from .consumer import KafkaNotificationConsumer +import time + + +class KafkaListener(object): + def __init__(self): + kafka_cfg = get_kafka_config() + self.consumer = KafkaNotificationConsumer(kafka_cfg.kafka_host, + kafka_cfg.kafka_topics, + kafka_cfg.kafka_group_id) + + def start_listening(self): + while True: + self.consumer.poll() + time.sleep(1) diff --git a/feeds/kafka_listener.py b/feeds/kafka_listener.py new file mode 100644 index 0000000..2086adc --- /dev/null +++ b/feeds/kafka_listener.py @@ -0,0 +1,9 @@ +from .kafka.listener import KafkaListener +from logger import log + +if __name__ == "__main__": + name = 'kafka_listener' + log(name, "Initializing Kafka listener.") + listener = KafkaListener() + log(name, "Starting Kafka listener loop.") + listener.start_listening() diff --git a/requirements.txt b/requirements.txt index f223674..d447a58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ cachetools==2.1.0 pymongo==3.7.2 redis==2.10.6 flask-cors==3.0.6 +kafka-python==1.4.4 diff --git a/test/_data/kafka/kafka.props.tmpl b/test/_data/kafka/kafka.props.tmpl new file mode 100644 index 0000000..883ea55 --- /dev/null +++ b/test/_data/kafka/kafka.props.tmpl @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://:{{ kafka_port }} + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +#advertised.listeners=PLAINTEXT://your.host.name:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs={{ kafka_log_dir }} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:{{ zookeeper_port }} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=6000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/test/_data/kafka/zookeeper.props.tmpl b/test/_data/kafka/zookeeper.props.tmpl new file mode 100644 index 0000000..19712b8 --- /dev/null +++ b/test/_data/kafka/zookeeper.props.tmpl @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={{ zookeeper_data_dir }} +# the port at which the clients will connect +clientPort={{ zookeeper_port }} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/test/conftest.py b/test/conftest.py index 7d031cf..d47fcc6 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -7,6 +7,7 @@ import test.util as test_util from .util import test_config from .mongo_controller import MongoController +from .kafka_controller import KafkaController import shutil import time import re @@ -34,7 +35,7 @@ def mongo_notes(mongo): @pytest.fixture(scope="module") def mongo(): mongoexe = test_util.get_mongo_exe() - tempdir = test_util.get_temp_dir() + tempdir = test_util.get_mongo_temp_dir() mongo = MongoController(mongoexe, tempdir) print("running MongoDB {} on port {} in dir {}".format( mongo.db_version, mongo.port, mongo.temp_dir @@ -43,13 +44,35 @@ def mongo(): feeds.storage.mongodb.connection._connection = None yield mongo - del_temp = test_util.get_delete_temp_files() + del_temp = test_util.get_delete_mongo_files() print("Shutting down MongoDB,{} deleting temp files".format(" not" if not del_temp else "")) mongo.destroy(del_temp) if del_temp: - shutil.rmtree(test_util.get_temp_dir()) + shutil.rmtree(test_util.get_mongo_temp_dir()) # time.sleep(5) # wait for Mongo to go away +@pytest.fixture() #scope="module") +def kafka(): + tempdir = test_util.get_kafka_temp_dir() + kafka = KafkaController( + test_util.get_kafka_root(), + test_util.get_kafka_config(), + test_util.get_zookeeper_config(), + tempdir + ) + print("running Kafka on port {} with zookeeper on port {} and data dir {}".format( + kafka.kafka_port, kafka.zookeeper_port, kafka.temp_dir + )) + # a little hack to make the kafka host work right + feeds.config.get_kafka_config() + feeds.config.__kafka_config.kafka_host = "localhost:{}".format(kafka.kafka_port) + yield kafka + del_temp = test_util.get_delete_kafka_files() + print("Shutting down Kafka,{} deleting temp files".format(" not" if not del_temp else "")) + kafka.destroy(del_temp) + if del_temp: + shutil.rmtree(test_util.get_kafka_temp_dir()) + @pytest.fixture(scope="module") def app(): from feeds.server import create_app diff --git a/test/kafka/__init__.py b/test/kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/kafka/test_consumer.py b/test/kafka/test_consumer.py new file mode 100644 index 0000000..aa0af8d --- /dev/null +++ b/test/kafka/test_consumer.py @@ -0,0 +1,75 @@ +import pytest +import json +from kafka import (KafkaProducer, TopicPartition) +from feeds.kafka.consumer import KafkaNotificationConsumer +import test.util as test_util +from feeds.config import get_kafka_config +import time +from uuid import uuid4 + + +def test_kafka_consumer(client, kafka, mongo, mock_valid_users, mock_valid_user_token): + user = "test_consumer" + cfg = get_kafka_config() + # set up consumer + consumer = KafkaNotificationConsumer(cfg.kafka_host, cfg.kafka_topics, cfg.kafka_group_id) + # set up producer + producer = KafkaProducer(bootstrap_servers=cfg.kafka_host) + topic = cfg.kafka_topics[0] + # make a notification post + mock_valid_users({"kafkatest": "KBase Test", user: "Test Consumer"}) + mock_valid_user_token(user, "Test Consumer") + + note = { + "actor": { "type": "user", "id": "kafkatest" }, + "verb": "invite", + "object": { "type": "user", "id": user }, + "target": [{ "type": "user", "id": user }], + "users": [{ "type": "user", "id": user }], + "level": "alert", + "source": "a_service", + "context": { + "foo": "bar" + } + } + producer.send(topic, bytes(json.dumps(note), "utf-8")) + producer.flush() + consumer.poll() + + # fetch notifications and expect that our new one is in there + response = client.get("/api/V1/notifications", headers={"Authorization": "token-"+str(uuid4())}) + data = json.loads(response.data) + assert len(data["user"]["feed"]) == 1 + note = data["user"]["feed"][0] + assert note['actor'] == {"type": "user", "id": "kafkatest", "name": "KBase Test"} + assert note['context'] == {"foo": "bar"} + # consumer.consumer.close() + + +def test_bad_input(kafka): + """ + Expect to log and continue. + """ + user = "test_consumer" + cfg = get_kafka_config() + # set up consumer + consumer = KafkaNotificationConsumer(cfg.kafka_host, cfg.kafka_topics, cfg.kafka_group_id) + # set up producer + producer = KafkaProducer(bootstrap_servers=cfg.kafka_host) + topic = cfg.kafka_topics[0] + # no object or actor! + note = { + "verb": "invite", + "target": [{ "type": "user", "id": user }], + "users": [{ "type": "user", "id": user }], + "level": "alert", + "source": "a_service", + "context": { + "foo": "bar" + } + } + producer.send(topic, bytes(json.dumps(note), "utf-8")) + producer.flush() + print("COMMITTED: {}".format(consumer.consumer.committed(TopicPartition("feeds", 0)))) + consumer.poll() + # consumer.consumer.close() diff --git a/test/kafka/test_listener.py b/test/kafka/test_listener.py new file mode 100644 index 0000000..00d979b --- /dev/null +++ b/test/kafka/test_listener.py @@ -0,0 +1,7 @@ +import pytest +from feeds.kafka.listener import KafkaListener + + +def test_kafka_listener(kafka): + listener = KafkaListener() + assert listener.consumer is not None diff --git a/test/kafka_controller.py b/test/kafka_controller.py new file mode 100644 index 0000000..b992412 --- /dev/null +++ b/test/kafka_controller.py @@ -0,0 +1,164 @@ +import os +from pathlib import Path +from jinja2 import Template +import tempfile +import subprocess +import time +import shutil +import test.util as test_util + +KAFKA_SCRIPT = "bin/kafka-server-start.sh" +ZOOKEEPER_SCRIPT = "bin/zookeeper-server-start.sh" +KAFKA_TOPIC_SCRIPT = "bin/kafka-topics.sh" +KAFKA_CONFIG_OUT = "kafka.properties" +ZOOKEEPER_CONFIG_OUT = "zookeeper.properties" +ZOOKEEPER_DATA_DIR = "zookeeper" +KAFKA_LOG_DIR = "kafka-logs" +KAFKA_STDOUT = "kafka_stdout" +ZOOKEEPER_STDOUT = "zookeeper_stdout" + + +class KafkaController(object): + def __init__(self, kafka_root: Path, kafka_config_template: Path, + zookeeper_config_template: Path, root_temp_dir: Path) -> None: + self.kafka_root = kafka_root + kafka_cmd = os.path.join(kafka_root, KAFKA_SCRIPT) + zookeeper_cmd = os.path.join(kafka_root, ZOOKEEPER_SCRIPT) + + if not kafka_cmd or not os.access(kafka_cmd, os.X_OK): + raise test_util.TestException("Kafka startup script {} doesn't exist or isn't executable".format(kafka_cmd)) + + if not zookeeper_cmd or not os.access(zookeeper_cmd, os.X_OK): + raise test_util.TestException("Zookeeper startup script {} doesn't exist or isn't executable".format(zookeeper_cmd)) + + if not root_temp_dir: + raise test_util.TestException("Kafka temp_dir is None") + + # make temp dirs + self._init_temp_dir(root_temp_dir) + self._init_kafka_configs(kafka_config_template, zookeeper_config_template) + self._start_process(kafka_cmd, zookeeper_cmd) + + def _init_temp_dir(self, root_temp_dir: Path) -> None: + """ + Sets up internal property temp_dir + """ + root_temp_dir = root_temp_dir.absolute() + os.makedirs(root_temp_dir, exist_ok=True) + self.temp_dir = Path(tempfile.mkdtemp(prefix='KafkaController-', dir=str(root_temp_dir))) + data_dir = self.temp_dir.joinpath('data') + os.makedirs(data_dir) + + def _init_kafka_configs(self, kafka_config: Path, zookeeper_config: Path) -> None: + """ + Sets up internal properties kafka_port and zookeeper_port, + kafka_config_path, zookeeper_config_path + """ + self.kafka_port = test_util.find_free_port() + self.zookeeper_port = test_util.find_free_port() + self.zookeeper_data_dir = os.path.join(self.temp_dir, ZOOKEEPER_DATA_DIR) + self.kafka_log_dir = os.path.join(self.temp_dir, KAFKA_LOG_DIR) + + template_data = { + "zookeeper_port": self.zookeeper_port, + "kafka_port": self.kafka_port, + "zookeeper_data_dir": str(self.zookeeper_data_dir), + "kafka_log_dir": str(self.kafka_log_dir) + } + + kafka_config = kafka_config.absolute() + with open(kafka_config, "r") as f: + k_file = f.read() + tmpl = Template(k_file) + self.kafka_config_path = os.path.join(self.temp_dir, KAFKA_CONFIG_OUT) + with open(self.kafka_config_path, "w") as f: + f.write(tmpl.render(template_data)) + + zookeeper_config = zookeeper_config.absolute() + with open(zookeeper_config, "r") as f: + z_file = f.read() + tmpl = Template(z_file) + self.zookeeper_config_path = os.path.join(self.temp_dir, ZOOKEEPER_CONFIG_OUT) + with open(self.zookeeper_config_path, "w") as f: + f.write(tmpl.render(template_data)) + + def _start_process(self, kafka_cmd: Path, zookeeper_cmd: Path) -> None: + self._zookeeper_out = open(os.path.join(self.temp_dir, ZOOKEEPER_STDOUT), "w") + zookeeper_run_cmd = [zookeeper_cmd, self.zookeeper_config_path] + self._zookeeper_proc = subprocess.Popen(zookeeper_run_cmd, stdout=self._zookeeper_out, stderr=subprocess.STDOUT) + + time.sleep(5) + + self._kafka_out = open(os.path.join(self.temp_dir, KAFKA_STDOUT), "w") + kafka_run_cmd = [kafka_cmd, self.kafka_config_path] + self._kafka_proc = subprocess.Popen(kafka_run_cmd, stdout=self._kafka_out, stderr=subprocess.STDOUT) + + time.sleep(5) + + cfg = test_util.test_config() + topics = cfg.get("kafka", "topics").split(",") + for topic in topics: + create_topic_cmd = [os.path.join(self.kafka_root, KAFKA_TOPIC_SCRIPT), "--create", "--zookeeper", + "localhost:{}".format(self.zookeeper_port), "--replication-factor", "1", + "--partitions", "1", "--topic", topic] + print("Running create topic script for {}".format(topic)) + print(" ".join(create_topic_cmd)) + subprocess.run(create_topic_cmd) + + + def destroy(self, delete_temp_files: bool) -> None: + if self._kafka_out: + self._kafka_out.close() + if self._kafka_proc: + self._kafka_proc.terminate() + if self._zookeeper_out: + self._zookeeper_out.close() + if self._zookeeper_proc: + self._zookeeper_proc.terminate() + if delete_temp_files and self.temp_dir: + shutil.rmtree(self.temp_dir) + +def main(): + from . import conftest + from kafka import ( + KafkaProducer, + KafkaConsumer + ) + conftest.pytest_sessionstart(None) + + tempdir = test_util.get_kafka_temp_dir() + kc = KafkaController( + test_util.get_kafka_root(), + test_util.get_kafka_config(), + test_util.get_zookeeper_config(), + tempdir + ) + print("Running Kafka and Zookeeper") + print("Zookeeper port: {}".format(kc.zookeeper_port)) + print("Kafka port: {}".format(kc.kafka_port)) + print("Temp dir: {}".format(kc.temp_dir)) + host = "localhost:{}".format(kc.kafka_port) + print("connecting to Kafka host: {}".format(host)) + time.sleep(3) + print("posting test message") + msg = b"Simple Test Message" + topic = "test" + p = KafkaProducer(bootstrap_servers=host) + + p.send(topic, msg) + print("done. waiting before consuming it.") + time.sleep(3) + print("consuming test message") + c = KafkaConsumer(bootstrap_servers=host, + group_id=None, + consumer_timeout_ms=1000, + enable_auto_commit=True, + auto_offset_reset="earliest") + c.subscribe([topic]) + for msg in c: + print("got message: {}".format(msg.value)) + input("Done. Press enter to shutdown and destroy.") + kc.destroy(True) + +if __name__ == '__main__': + main() diff --git a/test/mongo_controller.py b/test/mongo_controller.py index f44f4ab..88ad65e 100644 --- a/test/mongo_controller.py +++ b/test/mongo_controller.py @@ -108,7 +108,7 @@ def main(): import conftest conftest.pytest_sessionstart(None) mongoexe = test_util.get_mongo_exe() - root_temp_dir = test_util.get_temp_dir() + root_temp_dir = test_util.get_mongo_temp_dir() mc = MongoController(mongoexe, root_temp_dir, False) print('port: ' + str(mc.port)) diff --git a/test/test.cfg b/test/test.cfg index 7664c94..9360656 100644 --- a/test/test.cfg +++ b/test/test.cfg @@ -15,7 +15,18 @@ debug=False lifespan=30 default-note-count=100 +[kafka] +host=localhost:9092 +group-id=feeds_group +topics=feeds + [test] mongo-exe=/usr/local/bin/mongod -test-temp-dir=./test-temp-dir -delete-temp-files=true +mongo-temp-dir=./mongo-temp-dir +delete-mongo-files=true + +kafka-path=/Users/wjriehl/Projects/kafka/kafka-2.1.0-src +kafka-temp-dir=./kafka-temp-dir +kafka-config=./test/_data/kafka/kafka.props.tmpl +zookeeper-config=./test/_data/kafka/zookeeper.props.tmpl +delete-kafka-files=true diff --git a/test/test_config.py b/test/test_config.py index 2ed99e5..3723ac7 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -22,7 +22,11 @@ 'nms-url=nms', 'global-feed=global', 'lifespan=30', - 'default-note-count=100' + 'default-note-count=100', + '[kafka]', + 'host=localhost:9092', + 'topics=foo,bar', + 'group-id=feeds_group' ] @pytest.fixture(scope="function") @@ -93,7 +97,7 @@ def test_config_bad_note_count(dummy_config, dummy_auth_token, bad_val): def test_config_check_debug(dummy_config, dummy_auth_token): cfg_text = GOOD_CONFIG.copy() - cfg_text.append("debug=true") + cfg_text.insert(1, "debug=true") cfg_path = dummy_config(cfg_text) feeds_config_backup = os.environ.get('FEEDS_CONFIG') os.environ['FEEDS_CONFIG'] = cfg_path @@ -218,20 +222,26 @@ def test_get_config(dummy_config, dummy_auth_token): assert cfg.db_port == 5 assert cfg.auth_url == 'baz' assert cfg.auth_token == FAKE_AUTH_TOKEN + + cfg = config.get_kafka_config() + assert cfg.kafka_group_id == 'feeds_group' + assert cfg.kafka_host == 'localhost:9092' + assert cfg.kafka_topics == ['foo', 'bar'] + del os.environ['FEEDS_CONFIG'] if path_backup is not None: os.environ['FEEDS_CONFIG'] = path_backup config.__config = None -def test_config_bad_parsing(dummy_config, dummy_auth_token): - cfg_path = dummy_config(["fleeble"]) +def test_config_missing_section(dummy_config, dummy_auth_token): + cfg_path = dummy_config(["[fleeble]"]) path_backup = os.environ.get('FEEDS_CONFIG') os.environ['FEEDS_CONFIG'] = cfg_path config.__config = None with pytest.raises(ConfigError) as e: config.FeedsConfig() - assert "Error parsing config file" in str(e.value) + assert "Error parsing config file: section feeds not found" in str(e.value) if path_backup is not None: os.environ['FEEDS_CONFIG'] = path_backup @@ -261,3 +271,27 @@ def test_config_missing_reqs(dummy_config, dummy_auth_token): if path_backup is not None: os.environ['FEEDS_CONFIG'] = path_backup + + +def test_kafka_config_missing_section(dummy_config): + cfg_path = dummy_config(["[fleeble]"]) + path_backup = os.environ.get('FEEDS_CONFIG') + os.environ['FEEDS_CONFIG'] = cfg_path + config.__kafka_config = None + with pytest.raises(ConfigError) as e: + config.KafkaConfig() + assert "Error parsing config file: section kafka not found" in str(e.value) + if path_backup is not None: + os.environ['FEEDS_CONFIG'] = path_backup + + +def test_config_bad_parsing(dummy_config, dummy_auth_token): + cfg_path = dummy_config(["nope"]) + path_backup = os.environ.get('FEEDS_CONFIG') + os.environ['FEEDS_CONFIG'] = cfg_path + config.__config = None + with pytest.raises(ConfigError) as e: + config.FeedsConfig() + assert "Error parsing config file {}:".format(cfg_path) in str(e.value) + if path_backup is not None: + os.environ['FEEDS_CONFIG'] = path_backup diff --git a/test/util.py b/test/util.py index d8e12a7..1aa907e 100644 --- a/test/util.py +++ b/test/util.py @@ -7,33 +7,54 @@ import configparser MONGO_EXE = "mongo-exe" -TEMP_DIR = "test-temp-dir" -DELETE_TEMP_FILES = "delete-temp-files" +MONGO_TEMP_DIR = "mongo-temp-dir" +DELETE_MONGO_FILES = "delete-mongo-files" +KAFKA_PATH = "kafka-path" +KAFKA_CONFIG = "kafka-config" +ZOOKEEPER_CONFIG = "zookeeper-config" +KAFKA_TEMP_DIR = "kafka-temp-dir" +DELETE_KAFKA_FILES = "delete-kafka-files" def assert_is_uuid(s): # raises a ValueError if not. Good enough for testing. uuid.UUID(s) + class TestException(Exception): pass + def get_mongo_exe() -> Path: cfg = test_config() return Path(os.path.abspath(cfg.get('test', MONGO_EXE))) + def find_free_port() -> int: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) return s.getsockname()[1] -def get_temp_dir() -> Path: + +def get_mongo_temp_dir() -> Path: cfg = test_config() - return Path(os.path.abspath(cfg.get('test', TEMP_DIR))) + return Path(os.path.abspath(cfg.get('test', MONGO_TEMP_DIR))) + + +def get_kafka_temp_dir() -> Path: + cfg = test_config() + return Path(os.path.abspath(cfg.get('test', KAFKA_TEMP_DIR))) + + +def get_delete_mongo_files() -> bool: + cfg = test_config() + return cfg.get('test', DELETE_MONGO_FILES).lower() == "true" + -def get_delete_temp_files() -> bool: +def get_delete_kafka_files() -> bool: cfg = test_config() - return cfg.get('test', DELETE_TEMP_FILES).lower() == "true" + return cfg.get('test', DELETE_KAFKA_FILES).lower() == "true" + def test_config(): """ @@ -44,3 +65,18 @@ def test_config(): with open(os.environ['FEEDS_CONFIG'], 'r') as f: cfg.read_file(f) return cfg + + +def get_kafka_root() -> Path: + cfg = test_config() + return Path(os.path.abspath(cfg.get('test', KAFKA_PATH))) + + +def get_kafka_config() -> Path: + cfg = test_config() + return Path(os.path.abspath(cfg.get('test', KAFKA_CONFIG))) + + +def get_zookeeper_config() -> Path: + cfg = test_config() + return Path(os.path.abspath(cfg.get('test', ZOOKEEPER_CONFIG)))