Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch over to MongoDB #7

Merged
merged 4 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ python:
- 3.6
services:
- docker
# env:
# global:

before_install:
- sudo apt-get -qq update
Expand Down
32 changes: 20 additions & 12 deletions deploy.cfg.example
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
[feeds]
# Redis info
redis-host=localhost
redis-port=6379
redis-user=
redis-pw=

# MongoDB Info
mongo-host=
mongo-db=
mongo-user=
mongo-pw=
# DB info
# db-engine - allowed values = redis, mongodb. Others will raise an error on startup.
db-engine=mongodb

# db-name - name of the database to use. default = "feeds".
db-name=feeds

# Other db info. The usual - host, port, user, and password. You know the drill.
db-host=localhost
db-port=6379
db-user=
db-pw=

auth-url=https://ci.kbase.us/services/auth
admins=wjriehl,scanon,kkeller

# admins are allowed to use their auth tokens to create global notifications.
# examples would be notices about KBase downtime or events.
admins=wjriehl,scanon,kkeller,mmdrake

# fake user name for the global feed. Should be something that's not a valid
# user name.
global-feed=_global_
19 changes: 19 additions & 0 deletions feeds/activity/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,22 @@ def deserialize(cls, serial):
deserial.time = struct['m']
deserial.id = struct['i']
return deserial

@classmethod
def from_dict(cls, serial):
"""
Returns a new Notification from a serialized dictionary (e.g. used in Mongo)
"""
assert serial
deserial = cls(
serial['actor'],
str(serial['verb']),
serial['object'],
serial['source'],
level=str(serial['level']),
target=serial.get('target'),
context=serial.get('context')
)
deserial.time = serial['created']
deserial.id = serial['act_id']
return deserial
55 changes: 29 additions & 26 deletions feeds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,24 @@
ENV_AUTH_TOKEN = "AUTH_TOKEN"

INI_SECTION = "feeds"
DB_HOST = "redis-host"
DB_HOST_PORT = "redis-port"
DB_USER = "redis-user"
DB_PW = "redis-pw"
DB_DB = "redis-db"
AUTH_URL = "auth-url"
ADMIN_LIST = "admins"
GLOBAL_FEED = "global-feed"

KEY_DB_HOST = "db-host"
KEY_DB_PORT = "db-port"
KEY_DB_USER = "db-user"
KEY_DB_PW = "db-pw"
KEY_DB_NAME = "db-name"
KEY_DB_ENGINE = "db-engine"
KEY_AUTH_URL = "auth-url"
KEY_ADMIN_LIST = "admins"
KEY_GLOBAL_FEED = "global-feed"
KEY_DEBUG = "debug"


class FeedsConfig(object):
"""
Loads a config set from the root deploy.cfg file. This should be in ini format.

Keys of note are:

redis-host
redis-port
redis-user
redis-pw
auth-url
global-feed - name of the feed to represent a global user, or
notifications that everyone should see.
"""

def __init__(self):
Expand All @@ -44,16 +39,24 @@ def __init__(self):
raise ConfigError(
"Error parsing config file: section {} not found!".format(INI_SECTION)
)
self.redis_host = self._get_line(cfg, DB_HOST)
self.redis_port = self._get_line(cfg, DB_HOST_PORT)
self.redis_user = self._get_line(cfg, DB_USER, required=False)
self.redis_pw = self._get_line(cfg, DB_PW, required=False)
self.redis_db = self._get_line(cfg, DB_DB, required=False)
if self.redis_db is None:
self.redis_db = 0
self.global_feed = self._get_line(cfg, GLOBAL_FEED)
self.auth_url = self._get_line(cfg, AUTH_URL)
self.admins = self._get_line(cfg, ADMIN_LIST).split(",")
self.db_engine = self._get_line(cfg, KEY_DB_ENGINE)
self.db_host = self._get_line(cfg, KEY_DB_HOST)
self.db_port = self._get_line(cfg, KEY_DB_PORT)
try:
self.db_port = int(self.db_port)
except ValueError:
raise ConfigError("{} must be an int! Got {}".format(KEY_DB_PORT, self.db_port))
self.db_user = self._get_line(cfg, KEY_DB_USER, required=False)
self.db_pw = self._get_line(cfg, KEY_DB_PW, required=False)
self.db_name = self._get_line(cfg, KEY_DB_NAME, required=False)
self.global_feed = self._get_line(cfg, KEY_GLOBAL_FEED)
self.auth_url = self._get_line(cfg, KEY_AUTH_URL)
self.admins = self._get_line(cfg, KEY_ADMIN_LIST).split(",")
self.debug = self._get_line(cfg, KEY_DEBUG, required=False)
if not self.debug or self.debug.lower() != "true":
self.debug = False
else:
self.debug = True

def _find_config_path(self):
"""
Expand Down
14 changes: 14 additions & 0 deletions feeds/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,17 @@ class MissingLevelError(Exception):
Raised if looking for a Notification Level that doesn't exist.
"""
pass


class ActivityStorageError(Exception):
"""
Raised if an activity is failed to be stored in a database.
"""
pass


class ActivityRetrievalError(Exception):
"""
Raised if the service fails to retrieve an activity from a database.
"""
pass
14 changes: 6 additions & 8 deletions feeds/feeds/notification/notification_feed.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from ..base import BaseFeed
from feeds.activity.notification import Notification
from feeds.storage.redis.activity_storage import RedisActivityStorage
from feeds.storage.redis.timeline_storage import RedisTimelineStorage
from feeds.storage.mongodb.activity_storage import MongoActivityStorage
from feeds.storage.mongodb.timeline_storage import MongoTimelineStorage
from cachetools import TTLCache
import logging


class NotificationFeed(BaseFeed):
def __init__(self, user_id):
self.user_id = user_id
self.timeline_storage = RedisTimelineStorage(self.user_id)
self.activity_storage = RedisActivityStorage()
self.timeline_storage = MongoTimelineStorage(self.user_id)
self.activity_storage = MongoActivityStorage()
self.timeline = None
self.cache = TTLCache(1000, 600)

Expand Down Expand Up @@ -41,10 +41,8 @@ def get_activities(self, count=10):
# 4. Return them.
if count < 1 or not isinstance(count, int):
raise ValueError('Count must be an integer > 0')
self._update_timeline()
note_ids = self.timeline
serial_notes = self.activity_storage.get_from_storage(note_ids)
note_list = [Notification.deserialize(note) for note in serial_notes]
serial_notes = self.timeline_storage.get_timeline(count=count)
note_list = [Notification.from_dict(note) for note in serial_notes]
return note_list

def mark_activities(self, activity_ids, seen=False):
Expand Down
13 changes: 13 additions & 0 deletions feeds/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging


def get_log(name):
return logging.getLogger(__name__)


def log(name, msg, *args, level=logging.INFO):
logging.getLogger(__name__).log(level, msg, *args)


def log_error(name, error):
log(name, "Exception: " + str(error) + error, level=logging.ERROR)
11 changes: 3 additions & 8 deletions feeds/managers/notification_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
"""

from .base import BaseManager
from ..storage.redis.activity_storage import RedisActivityStorage
from ..feeds.notification.notification_feed import NotificationFeed
from ..storage.mongodb.activity_storage import MongoActivityStorage
from feeds.config import get_config


Expand All @@ -25,12 +24,8 @@ def add_notification(self, note):
note.validate() # any errors get raised to be caught by the server.
target_users = self.get_target_users(note)
# add the notification to the database.
activity_storage = RedisActivityStorage()
activity_storage.add_to_storage(note)
for user in target_users:
# add the notification to the appropriate users' feeds.
feed = NotificationFeed(user)
feed.add_notification(note)
activity_storage = MongoActivityStorage()
activity_storage.add_to_storage(note, target_users)

def get_target_users(self, note):
"""
Expand Down
20 changes: 8 additions & 12 deletions feeds/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from http.client import responses
from flask.logging import default_handler
from .util import epoch_ms
from .config import FeedsConfig
from .config import get_config
from .auth import (
validate_service_token,
validate_user_token
Expand All @@ -33,12 +33,6 @@ def _initialize_logging():
root.setLevel('INFO')


def _initialize_config():
# TODO - include config for:
# * database access
return FeedsConfig()


def _log(msg, *args, level=logging.INFO):
logging.getLogger(__name__).log(level, msg, *args)

Expand Down Expand Up @@ -96,7 +90,7 @@ def _get_notification_params(params):

def create_app(test_config=None):
_initialize_logging()
_initialize_config()
cfg = get_config()

app = Flask(__name__, instance_relative_config=True)
if test_config is None:
Expand All @@ -106,6 +100,7 @@ def create_app(test_config=None):

@app.before_request
def preprocess_request():
_log('%s %s', request.method, request.path)
pass

@app.after_request
Expand Down Expand Up @@ -187,10 +182,11 @@ def add_notification():
Once validated, will be used as the Source of the notification,
and used in logic to determine which feeds get notified.
"""
token = _get_auth_token(request)
service = validate_service_token(token) # can also be an admin user
if not service:
raise InvalidTokenError("Token must come from a service, not a user!")
if not cfg.debug:
token = _get_auth_token(request)
service = validate_service_token(token) # can also be an admin user
if not service:
raise InvalidTokenError("Token must come from a service, not a user!")
params = _get_notification_params(json.loads(request.get_data()))
# create a Notification from params.
new_note = Notification(
Expand Down
7 changes: 4 additions & 3 deletions feeds/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def remove_from_storage(self, activity_ids):


class TimelineStorage(BaseStorage):
def __init__(self):
pass
def __init__(self, user_id):
assert user_id
self.user_id = user_id

def add_to_timeline(self, activity):
raise NotImplementedError()

def get_timeline(self):
def get_timeline(self, count=10):
raise NotImplementedError()

def remove_from_timeline(self, activity_ids):
Expand Down
Empty file.
48 changes: 48 additions & 0 deletions feeds/storage/mongodb/activity_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List
from ..base import ActivityStorage
from .connection import get_feeds_collection
from feeds.exceptions import (
ActivityStorageError
)
from pymongo import PyMongoError


class MongoActivityStorage(ActivityStorage):
def add_to_storage(self, activity, target_users: List[str]):
"""
Adds a single activity to the MongoDB.
Returns None if successful.
Raises an ActivityStorageError if it fails.
"""
coll = get_feeds_collection()
act_doc = {
"act_id": activity.id,
"actor": activity.actor,
"verb": activity.verb.id,
"object": activity.object,
"target": activity.target,
"source": activity.source,
"level": activity.level.id,
"users": target_users,
"unseen": target_users,
"created": activity.time,
"context": activity.context
}
try:
coll.insert_one(act_doc)
except PyMongoError as e:
raise ActivityStorageError("Failed to store activity: " + str(e))

def get_from_storage(self, activity_ids):
pass

def remove_from_storage(self, activity_ids):
raise NotImplementedError()

def change_seen_mark(self, act_id: str, user: str, seen: bool):
"""
:param act_id: activity id
:user: user id
:seen: whether or not it's been seen. Boolean.
"""
raise NotImplementedError()
Loading