Skip to content

Commit

Permalink
Merge faf5936 into bb20bbf
Browse files Browse the repository at this point in the history
  • Loading branch information
briehl committed Feb 13, 2019
2 parents bb20bbf + faf5936 commit 52f51a2
Show file tree
Hide file tree
Showing 22 changed files with 738 additions and 87 deletions.
10 changes: 6 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ services:
- docker

env:
# - MONGODB_VER=mongodb-linux-x86_64-2.6.12
# - MONGODB_VER=mongodb-linux-x86_64-3.4.16
- MONGODB_VER=mongodb-linux-x86_64-3.6.2
- MONGODB_VER=mongodb-linux-x86_64-3.6.2 KAFKA_VER=2.1.0 KAFKA_SCALA_VER=2.12 KAFKA_FILE=kafka_$KAFKA_SCALA_VER-$KAFKA_VER

before_install:
- sudo apt-get -qq update
Expand All @@ -24,8 +22,12 @@ script:
- tar xfz $MONGODB_VER.tgz
- export MONGOD=`pwd`/$MONGODB_VER/bin/mongod
- sed -i "s#^mongo-exe.*#mongo-exe=$MONGOD#" test/test.cfg
- wget http://mirrors.ocf.berkeley.edu/apache/kafka/$KAFKA_VER/$KAFKA_FILE.tgz
- tar xfz $KAFKA_FILE.tgz
- export KAFKA_PATH=`pwd`/$KAFKA_FILE
- sed -i "s#^kafka-path.*#kafka-path=$KAFKA_PATH#" test/test.cfg
- cat test/test.cfg
- make test

after_script:
- coveralls
- coveralls
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ test: all
# flake8 test
pytest --verbose test --cov=feeds --cov-report html feeds -s

start_kafka_listener:
python -m feeds.kafka_listener

start: all
gunicorn --worker-class gevent --timeout 300 --workers 5 --bind :5000 feeds.server:app

Expand Down
7 changes: 7 additions & 0 deletions deployment/conf/.templates/deploy.cfg.templ
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ workspace-url = {{ default .Env.workspace_url "https://ci.kbase.us/services/ws"
groups-url = {{ default .Env.groups_url "https://ci.kbase.us/services/groups" }}
njs-url = {{ default .Env.njs_url "https://ci.kbase.us/services/njs_wrapper" }}
nms-url = {{ default .Env.nms_url "https://ci.kbase.us/services/narrative_method_store/rpc" }}

[kafka]
host = {{ default .Env.kafka_host "localhost:9092" }}

# Topics is a comma-separated list. Probably just one to start with, though.
topics = {{ default .Env.kafka_topics "feeds" }}
group-id == {{ default .Env.kafka_group_id "feeds_group" }}
5 changes: 5 additions & 0 deletions deployment/deploy.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ default-note-count = 100
# Useful for testing, etc.
# SET TO FALSE IN PRODUCTION!
debug=False

[kafka]
host=localhost:9092
topics=feeds
group-id=feeds_group
3 changes: 2 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ flake8==3.5.0
pytest==3.8.2
coveralls==1.5.1
requests-mock==1.5.2
semver==2.8.1
semver==2.8.1
jinja2==2.10
165 changes: 100 additions & 65 deletions feeds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ENV_CONFIG_BACKUP = "KB_DEPLOYMENT_CONFIG"
ENV_AUTH_TOKEN = "AUTH_TOKEN"

INI_SECTION = "feeds"
FEEDS_SECTION = "feeds"

KEY_DB_HOST = "db-host"
KEY_DB_PORT = "db-port"
Expand All @@ -24,6 +24,68 @@
KEY_WS_URL = "workspace-url"
KEY_NMS_URL = "nms-url"
KEY_DEFAULT_COUNT = "default-note-count"
KEY_USE_KAFKA = "use-kafka"

KAFKA_SECTION = "kafka"
KEY_KAFKA_HOST = "host"
KEY_KAFKA_TOPICS = "topics"
KEY_KAFKA_GROUP_ID = "group-id"


class ConfigFileLoader(object):
def __init__(self):
config_file = self._find_config_path()
self.cfg = self._load_config(config_file)

def has_section(self, section: str) -> bool:
return self.cfg.has_section(section)

def get_line(self, section: str, key: str, required: bool=True) -> str:
"""
A little wrapper that raises a ConfigError if a required key isn't present.
"""
val = None
try:
val = self.cfg.get(section, key)
except configparser.NoOptionError:
if required:
raise ConfigError("Required option {} not found in config section "
"{}".format(key, section))
if not val and required:
raise ConfigError("Required option {} has no value!".format(key))
return val

def _find_config_path(self):
"""
A little helper to test whether a given file path, or one given by an
environment variable, exists.
"""
for env in [ENV_CONFIG_PATH, ENV_CONFIG_BACKUP]:
env_path = os.environ.get(env)
if env_path:
if not os.path.isfile(env_path):
raise ConfigError(
"Environment variable {} is set to {}, "
"which is not a config file.".format(ENV_CONFIG_PATH, env_path)
)
else:
return env_path
if not os.path.isfile(DEFAULT_CONFIG_PATH):
raise ConfigError(
"Unable to find config file - can't start server. Either set the {} or {} "
"environment variable to a path, or copy 'deploy.cfg.example' to "
"'deploy.cfg'".format(ENV_CONFIG_PATH, ENV_CONFIG_BACKUP)
)
return DEFAULT_CONFIG_PATH

def _load_config(self, cfg_file):
config = configparser.ConfigParser()
with open(cfg_file, "r") as cfg:
try:
config.read_file(cfg)
except configparser.Error as e:
raise ConfigError("Error parsing config file {}: {}".format(cfg_file, e))
return config


class FeedsConfig(object):
Expand All @@ -38,103 +100,76 @@ def __init__(self):
self.auth_token = os.environ.get(ENV_AUTH_TOKEN)
if self.auth_token is None:
raise RuntimeError("The AUTH_TOKEN environment variable must be set!")
config_file = self._find_config_path()
cfg = self._load_config(config_file)
if not cfg.has_section(INI_SECTION):
cfg = ConfigFileLoader()
if not cfg.has_section(FEEDS_SECTION):
raise ConfigError(
"Error parsing config file: section {} not found!".format(INI_SECTION)
"Error parsing config file: section {} not found!".format(FEEDS_SECTION)
)
self.db_engine = self._get_line(cfg, KEY_DB_ENGINE)
self.db_host = self._get_line(cfg, KEY_DB_HOST)
self.db_engine = cfg.get_line(FEEDS_SECTION, KEY_DB_ENGINE)
self.db_host = cfg.get_line(FEEDS_SECTION, KEY_DB_HOST)
try:
self.db_port = self._get_line(cfg, KEY_DB_PORT)
self.db_port = cfg.get_line(FEEDS_SECTION, KEY_DB_PORT)
self.db_port = int(self.db_port)
assert self.db_port > 0
except (ValueError, AssertionError):
raise ConfigError("{} must be an int > 0! Got {}".format(KEY_DB_PORT, self.db_port))
self.db_user = self._get_line(cfg, KEY_DB_USER, required=False)
self.db_pw = self._get_line(cfg, KEY_DB_PW, required=False)
self.db_name = self._get_line(cfg, KEY_DB_NAME, required=False)
self.global_feed = self._get_line(cfg, KEY_GLOBAL_FEED)
self.db_user = cfg.get_line(FEEDS_SECTION, KEY_DB_USER, required=False)
self.db_pw = cfg.get_line(FEEDS_SECTION, KEY_DB_PW, required=False)
self.db_name = cfg.get_line(FEEDS_SECTION, KEY_DB_NAME, required=False)
self.global_feed = cfg.get_line(FEEDS_SECTION, KEY_GLOBAL_FEED)
self.global_feed_type = "user" # doesn't matter, need a valid Entity type...
try:
self.lifespan = self._get_line(cfg, KEY_LIFESPAN)
self.lifespan = cfg.get_line(FEEDS_SECTION, KEY_LIFESPAN)
self.lifespan = int(self.lifespan)
assert self.lifespan > 0
except (ValueError, AssertionError):
raise ConfigError("{} must be an int > 0! Got {}".format(KEY_LIFESPAN, self.lifespan))
self.debug = self._get_line(cfg, KEY_DEBUG, required=False)
self.debug = cfg.get_line(FEEDS_SECTION, KEY_DEBUG, required=False)
if not self.debug or self.debug.lower() != "true":
self.debug = False
else:
self.debug = True
self.auth_url = self._get_line(cfg, KEY_AUTH_URL)
self.njs_url = self._get_line(cfg, KEY_NJS_URL)
self.ws_url = self._get_line(cfg, KEY_WS_URL)
self.groups_url = self._get_line(cfg, KEY_GROUPS_URL)
self.nms_url = self._get_line(cfg, KEY_NMS_URL)
self.default_max_notes = self._get_line(cfg, KEY_DEFAULT_COUNT)
self.auth_url = cfg.get_line(FEEDS_SECTION, KEY_AUTH_URL)
self.njs_url = cfg.get_line(FEEDS_SECTION, KEY_NJS_URL)
self.ws_url = cfg.get_line(FEEDS_SECTION, KEY_WS_URL)
self.groups_url = cfg.get_line(FEEDS_SECTION, KEY_GROUPS_URL)
self.nms_url = cfg.get_line(FEEDS_SECTION, KEY_NMS_URL)
self.default_max_notes = cfg.get_line(FEEDS_SECTION, KEY_DEFAULT_COUNT)
try:
self.default_max_notes = self._get_line(cfg, KEY_DEFAULT_COUNT)
self.default_max_notes = cfg.get_line(FEEDS_SECTION, KEY_DEFAULT_COUNT)
self.default_max_notes = int(self.default_max_notes)
assert self.default_max_notes > 0
except (ValueError, AssertionError):
raise ConfigError(
"{} must be an int > 0! Got {}".format(KEY_DEFAULT_COUNT, self.default_max_notes)
)

def _find_config_path(self):
"""
A little helper to test whether a given file path, or one given by an
environment variable, exists.
"""
for env in [ENV_CONFIG_PATH, ENV_CONFIG_BACKUP]:
env_path = os.environ.get(env)
if env_path:
if not os.path.isfile(env_path):
raise ConfigError(
"Environment variable {} is set to {}, "
"which is not a config file.".format(ENV_CONFIG_PATH, env_path)
)
else:
return env_path
if not os.path.isfile(DEFAULT_CONFIG_PATH):

class KafkaConfig(object):
def __init__(self):
cfg = ConfigFileLoader()
if not cfg.has_section(KAFKA_SECTION):
raise ConfigError(
"Unable to find config file - can't start server. Either set the {} or {} "
"environment variable to a path, or copy 'deploy.cfg.example' to "
"'deploy.cfg'".format(ENV_CONFIG_PATH, ENV_CONFIG_BACKUP)
"Error parsing config file: section {} not found!".format(KAFKA_SECTION)
)
return DEFAULT_CONFIG_PATH

def _load_config(self, cfg_file):
config = configparser.ConfigParser()
with open(cfg_file, "r") as cfg:
try:
config.read_file(cfg)
except configparser.Error as e:
raise ConfigError("Error parsing config file {}: {}".format(cfg_file, e))
return config

def _get_line(self, config, key, required=True):
"""
A little wrapper that raises a ConfigError if a required key isn't present.
"""
val = None
try:
val = config.get(INI_SECTION, key)
except configparser.NoOptionError:
if required:
raise ConfigError("Required option {} not found in config".format(key))
if not val and required:
raise ConfigError("Required option {} has no value!".format(key))
return val
self.kafka_host = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_HOST)
self.kafka_group_id = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_GROUP_ID)
self.kafka_topics = cfg.get_line(KAFKA_SECTION, KEY_KAFKA_TOPICS).split(",")


__config = None
__kafka_config = None


def get_config(from_disk=False):
def get_config():
global __config
if not __config:
__config = FeedsConfig()
return __config


def get_kafka_config():
global __kafka_config
if not __kafka_config:
__kafka_config = KafkaConfig()
return __kafka_config
Empty file added feeds/kafka/__init__.py
Empty file.
58 changes: 58 additions & 0 deletions feeds/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from typing import List
import json
from feeds.api.util import parse_notification_params
from feeds.logger import (
log,
log_error
)
from feeds.activity.notification import Notification
from feeds.managers.notification_manager import NotificationManager


class KafkaNotificationConsumer(object):
def __init__(self, server: str, topics: List[str], group_id: str) -> None:
self.server = server
self.topics = topics
self.group_id = group_id
self.consumer = KafkaConsumer(topics[0],
client_id="feeds-kakfa-consumer",
bootstrap_servers=[server],
consumer_timeout_ms=1000,
group_id=group_id,
enable_auto_commit=True,
auto_commit_interval_ms=1000,
auto_offset_reset="earliest")

def poll(self) -> None:
for msg in self.consumer:
print(msg)
self._process_message(msg)

def _process_message(self, message: ConsumerRecord) -> None:
try:
note_params = parse_notification_params(json.loads(message.value))
# create a Notification from params.
new_note = Notification(
note_params.get('actor'),
note_params.get('verb'),
note_params.get('object'),
note_params.get('source'),
level=note_params.get('level'),
target=note_params.get('target', []),
context=note_params.get('context'),
expires=note_params.get('expires'),
external_key=note_params.get('external_key'),
users=note_params.get('users', [])
)
# pass it to the NotificationManager to dole out to its audience feeds.
manager = NotificationManager()
manager.add_notification(new_note)
log(__name__, "Created notification from Kafka with id {}".format(new_note.id))
except Exception as e:
log_error(__name__, e)

def __str__(self):
return ("KafkaNotificationConsumer: host:{},group_id:{},"
"topics:{}".format(self.server, self.group_id, self.topics))
24 changes: 24 additions & 0 deletions feeds/kafka/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Main controller class for the "app" that listens to Kafka.
In __init__, this sets up the environment and build the KafkaConsumer.
Activating it with start_listening() will run an endless loop that will
listen for notifications coming in from Kafka, and push them into
the Mongo feeds db by the same mechanism as the main server.
"""

from feeds.config import get_kafka_config
from .consumer import KafkaNotificationConsumer
import time


class KafkaListener(object):
def __init__(self):
kafka_cfg = get_kafka_config()
self.consumer = KafkaNotificationConsumer(kafka_cfg.kafka_host,
kafka_cfg.kafka_topics,
kafka_cfg.kafka_group_id)

def start_listening(self):
while True:
self.consumer.poll()
time.sleep(1)
9 changes: 9 additions & 0 deletions feeds/kafka_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .kafka.listener import KafkaListener
from logger import log

if __name__ == "__main__":
name = 'kafka_listener'
log(name, "Initializing Kafka listener.")
listener = KafkaListener()
log(name, "Starting Kafka listener loop.")
listener.start_listening()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ cachetools==2.1.0
pymongo==3.7.2
redis==2.10.6
flask-cors==3.0.6
kafka-python==1.4.4
Loading

0 comments on commit 52f51a2

Please sign in to comment.