Skip to content

Commit

Permalink
Merge pull request #7 from briehl/develop
Browse files Browse the repository at this point in the history
Switch over to MongoDB
  • Loading branch information
briehl committed Oct 24, 2018
2 parents f2f793e + 2721833 commit 5559a29
Show file tree
Hide file tree
Showing 17 changed files with 260 additions and 88 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ python:
- 3.6
services:
- docker
# env:
# global:

before_install:
- sudo apt-get -qq update
Expand Down
32 changes: 20 additions & 12 deletions deploy.cfg.example
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
[feeds]
# Redis info
redis-host=localhost
redis-port=6379
redis-user=
redis-pw=

# MongoDB Info
mongo-host=
mongo-db=
mongo-user=
mongo-pw=
# DB info
# db-engine - allowed values = redis, mongodb. Others will raise an error on startup.
db-engine=mongodb

# db-name - name of the database to use. default = "feeds".
db-name=feeds

# Other db info. The usual - host, port, user, and password. You know the drill.
db-host=localhost
db-port=6379
db-user=
db-pw=

auth-url=https://ci.kbase.us/services/auth
admins=wjriehl,scanon,kkeller

# admins are allowed to use their auth tokens to create global notifications.
# examples would be notices about KBase downtime or events.
admins=wjriehl,scanon,kkeller,mmdrake

# fake user name for the global feed. Should be something that's not a valid
# user name.
global-feed=_global_
19 changes: 19 additions & 0 deletions feeds/activity/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,22 @@ def deserialize(cls, serial):
deserial.time = struct['m']
deserial.id = struct['i']
return deserial

@classmethod
def from_dict(cls, serial):
"""
Returns a new Notification from a serialized dictionary (e.g. used in Mongo)
"""
assert serial
deserial = cls(
serial['actor'],
str(serial['verb']),
serial['object'],
serial['source'],
level=str(serial['level']),
target=serial.get('target'),
context=serial.get('context')
)
deserial.time = serial['created']
deserial.id = serial['act_id']
return deserial
55 changes: 29 additions & 26 deletions feeds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,24 @@
ENV_AUTH_TOKEN = "AUTH_TOKEN"

INI_SECTION = "feeds"
DB_HOST = "redis-host"
DB_HOST_PORT = "redis-port"
DB_USER = "redis-user"
DB_PW = "redis-pw"
DB_DB = "redis-db"
AUTH_URL = "auth-url"
ADMIN_LIST = "admins"
GLOBAL_FEED = "global-feed"

KEY_DB_HOST = "db-host"
KEY_DB_PORT = "db-port"
KEY_DB_USER = "db-user"
KEY_DB_PW = "db-pw"
KEY_DB_NAME = "db-name"
KEY_DB_ENGINE = "db-engine"
KEY_AUTH_URL = "auth-url"
KEY_ADMIN_LIST = "admins"
KEY_GLOBAL_FEED = "global-feed"
KEY_DEBUG = "debug"


class FeedsConfig(object):
"""
Loads a config set from the root deploy.cfg file. This should be in ini format.
Keys of note are:
redis-host
redis-port
redis-user
redis-pw
auth-url
global-feed - name of the feed to represent a global user, or
notifications that everyone should see.
"""

def __init__(self):
Expand All @@ -44,16 +39,24 @@ def __init__(self):
raise ConfigError(
"Error parsing config file: section {} not found!".format(INI_SECTION)
)
self.redis_host = self._get_line(cfg, DB_HOST)
self.redis_port = self._get_line(cfg, DB_HOST_PORT)
self.redis_user = self._get_line(cfg, DB_USER, required=False)
self.redis_pw = self._get_line(cfg, DB_PW, required=False)
self.redis_db = self._get_line(cfg, DB_DB, required=False)
if self.redis_db is None:
self.redis_db = 0
self.global_feed = self._get_line(cfg, GLOBAL_FEED)
self.auth_url = self._get_line(cfg, AUTH_URL)
self.admins = self._get_line(cfg, ADMIN_LIST).split(",")
self.db_engine = self._get_line(cfg, KEY_DB_ENGINE)
self.db_host = self._get_line(cfg, KEY_DB_HOST)
self.db_port = self._get_line(cfg, KEY_DB_PORT)
try:
self.db_port = int(self.db_port)
except ValueError:
raise ConfigError("{} must be an int! Got {}".format(KEY_DB_PORT, self.db_port))
self.db_user = self._get_line(cfg, KEY_DB_USER, required=False)
self.db_pw = self._get_line(cfg, KEY_DB_PW, required=False)
self.db_name = self._get_line(cfg, KEY_DB_NAME, required=False)
self.global_feed = self._get_line(cfg, KEY_GLOBAL_FEED)
self.auth_url = self._get_line(cfg, KEY_AUTH_URL)
self.admins = self._get_line(cfg, KEY_ADMIN_LIST).split(",")
self.debug = self._get_line(cfg, KEY_DEBUG, required=False)
if not self.debug or self.debug.lower() != "true":
self.debug = False
else:
self.debug = True

def _find_config_path(self):
"""
Expand Down
14 changes: 14 additions & 0 deletions feeds/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,17 @@ class MissingLevelError(Exception):
Raised if looking for a Notification Level that doesn't exist.
"""
pass


class ActivityStorageError(Exception):
"""
Raised if an activity is failed to be stored in a database.
"""
pass


class ActivityRetrievalError(Exception):
"""
Raised if the service fails to retrieve an activity from a database.
"""
pass
14 changes: 6 additions & 8 deletions feeds/feeds/notification/notification_feed.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from ..base import BaseFeed
from feeds.activity.notification import Notification
from feeds.storage.redis.activity_storage import RedisActivityStorage
from feeds.storage.redis.timeline_storage import RedisTimelineStorage
from feeds.storage.mongodb.activity_storage import MongoActivityStorage
from feeds.storage.mongodb.timeline_storage import MongoTimelineStorage
from cachetools import TTLCache
import logging


class NotificationFeed(BaseFeed):
def __init__(self, user_id):
self.user_id = user_id
self.timeline_storage = RedisTimelineStorage(self.user_id)
self.activity_storage = RedisActivityStorage()
self.timeline_storage = MongoTimelineStorage(self.user_id)
self.activity_storage = MongoActivityStorage()
self.timeline = None
self.cache = TTLCache(1000, 600)

Expand Down Expand Up @@ -41,10 +41,8 @@ def get_activities(self, count=10):
# 4. Return them.
if count < 1 or not isinstance(count, int):
raise ValueError('Count must be an integer > 0')
self._update_timeline()
note_ids = self.timeline
serial_notes = self.activity_storage.get_from_storage(note_ids)
note_list = [Notification.deserialize(note) for note in serial_notes]
serial_notes = self.timeline_storage.get_timeline(count=count)
note_list = [Notification.from_dict(note) for note in serial_notes]
return note_list

def mark_activities(self, activity_ids, seen=False):
Expand Down
13 changes: 13 additions & 0 deletions feeds/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging


def get_log(name):
return logging.getLogger(__name__)


def log(name, msg, *args, level=logging.INFO):
logging.getLogger(__name__).log(level, msg, *args)


def log_error(name, error):
log(name, "Exception: " + str(error) + error, level=logging.ERROR)
11 changes: 3 additions & 8 deletions feeds/managers/notification_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
"""

from .base import BaseManager
from ..storage.redis.activity_storage import RedisActivityStorage
from ..feeds.notification.notification_feed import NotificationFeed
from ..storage.mongodb.activity_storage import MongoActivityStorage
from feeds.config import get_config


Expand All @@ -25,12 +24,8 @@ def add_notification(self, note):
note.validate() # any errors get raised to be caught by the server.
target_users = self.get_target_users(note)
# add the notification to the database.
activity_storage = RedisActivityStorage()
activity_storage.add_to_storage(note)
for user in target_users:
# add the notification to the appropriate users' feeds.
feed = NotificationFeed(user)
feed.add_notification(note)
activity_storage = MongoActivityStorage()
activity_storage.add_to_storage(note, target_users)

def get_target_users(self, note):
"""
Expand Down
20 changes: 8 additions & 12 deletions feeds/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from http.client import responses
from flask.logging import default_handler
from .util import epoch_ms
from .config import FeedsConfig
from .config import get_config
from .auth import (
validate_service_token,
validate_user_token
Expand All @@ -33,12 +33,6 @@ def _initialize_logging():
root.setLevel('INFO')


def _initialize_config():
# TODO - include config for:
# * database access
return FeedsConfig()


def _log(msg, *args, level=logging.INFO):
logging.getLogger(__name__).log(level, msg, *args)

Expand Down Expand Up @@ -96,7 +90,7 @@ def _get_notification_params(params):

def create_app(test_config=None):
_initialize_logging()
_initialize_config()
cfg = get_config()

app = Flask(__name__, instance_relative_config=True)
if test_config is None:
Expand All @@ -106,6 +100,7 @@ def create_app(test_config=None):

@app.before_request
def preprocess_request():
_log('%s %s', request.method, request.path)
pass

@app.after_request
Expand Down Expand Up @@ -187,10 +182,11 @@ def add_notification():
Once validated, will be used as the Source of the notification,
and used in logic to determine which feeds get notified.
"""
token = _get_auth_token(request)
service = validate_service_token(token) # can also be an admin user
if not service:
raise InvalidTokenError("Token must come from a service, not a user!")
if not cfg.debug:
token = _get_auth_token(request)
service = validate_service_token(token) # can also be an admin user
if not service:
raise InvalidTokenError("Token must come from a service, not a user!")
params = _get_notification_params(json.loads(request.get_data()))
# create a Notification from params.
new_note = Notification(
Expand Down
7 changes: 4 additions & 3 deletions feeds/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def remove_from_storage(self, activity_ids):


class TimelineStorage(BaseStorage):
def __init__(self):
pass
def __init__(self, user_id):
assert user_id
self.user_id = user_id

def add_to_timeline(self, activity):
raise NotImplementedError()

def get_timeline(self):
def get_timeline(self, count=10):
raise NotImplementedError()

def remove_from_timeline(self, activity_ids):
Expand Down
Empty file.
48 changes: 48 additions & 0 deletions feeds/storage/mongodb/activity_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List
from ..base import ActivityStorage
from .connection import get_feeds_collection
from feeds.exceptions import (
ActivityStorageError
)
from pymongo import PyMongoError


class MongoActivityStorage(ActivityStorage):
def add_to_storage(self, activity, target_users: List[str]):
"""
Adds a single activity to the MongoDB.
Returns None if successful.
Raises an ActivityStorageError if it fails.
"""
coll = get_feeds_collection()
act_doc = {
"act_id": activity.id,
"actor": activity.actor,
"verb": activity.verb.id,
"object": activity.object,
"target": activity.target,
"source": activity.source,
"level": activity.level.id,
"users": target_users,
"unseen": target_users,
"created": activity.time,
"context": activity.context
}
try:
coll.insert_one(act_doc)
except PyMongoError as e:
raise ActivityStorageError("Failed to store activity: " + str(e))

def get_from_storage(self, activity_ids):
pass

def remove_from_storage(self, activity_ids):
raise NotImplementedError()

def change_seen_mark(self, act_id: str, user: str, seen: bool):
"""
:param act_id: activity id
:user: user id
:seen: whether or not it's been seen. Boolean.
"""
raise NotImplementedError()
Loading

0 comments on commit 5559a29

Please sign in to comment.