Skip to content

Commit

Permalink
Taiga-events integration (realtime taiga)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Antukh authored and Jesús Espino committed Sep 29, 2014
1 parent b14c8d8 commit 43e16c2
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 125 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -7,6 +7,7 @@ psycopg2==2.5.4
pillow==2.5.3
pytz==2014.4
six==1.8.0
amqp==1.4.6
djmail==0.9
django-pgjson==0.2.0
djorm-pgarray==1.0.4
Expand Down
4 changes: 3 additions & 1 deletion settings/common.py
Expand Up @@ -87,7 +87,9 @@
DJMAIL_TEMPLATE_EXTENSION = "jinja"

# Events backend
EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend"
# EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend"
EVENTS_PUSH_BACKEND = "taiga.events.backends.rabbitmq.EventsPushBackend"
EVENTS_PUSH_BACKEND_OPTIONS = {"url": "//guest:guest@127.0.0.1/"}

# Message System
MESSAGE_STORAGE = "django.contrib.messages.storage.session.SessionStorage"
Expand Down
17 changes: 17 additions & 0 deletions taiga/events/__init__.py
@@ -0,0 +1,17 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

default_app_config = "taiga.events.apps.EventsAppConfig"
39 changes: 39 additions & 0 deletions taiga/events/apps.py
@@ -0,0 +1,39 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import sys
from django.apps import AppConfig
from django.db.models import signals

from . import signal_handlers as handlers


def connect_events_signals():
signals.post_save.connect(handlers.on_save_any_model, dispatch_uid="events_change")
signals.post_delete.connect(handlers.on_delete_any_model, dispatch_uid="events_delete")


def disconnect_events_signals():
signals.post_save.disconnect(dispatch_uid="events_change")
signals.post_delete.disconnect(dispatch_uid="events_delete")


class EventsAppConfig(AppConfig):
name = "taiga.events"
verbose_name = "Events App Config"

def ready(self):
connect_events_signals()
2 changes: 1 addition & 1 deletion taiga/events/backends/base.py
Expand Up @@ -21,7 +21,7 @@

class BaseEventsPushBackend(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def emit_event(self, message:str, *, channel:str="events"):
def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
pass


Expand Down
5 changes: 4 additions & 1 deletion taiga/events/backends/postgresql.py
Expand Up @@ -20,7 +20,10 @@

class EventsPushBackend(base.BaseEventsPushBackend):
@transaction.atomic
def emit_event(self, message:str, *, channel:str="events"):
def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
routing_key = routing_key.replace(".", "__")
channel = "{channel}_{routing_key}".format(channel=channel,
routing_key=routing_key)
sql = "NOTIFY {channel}, %s".format(channel=channel)
cursor = connection.cursor()
cursor.execute(sql, [message])
Expand Down
65 changes: 65 additions & 0 deletions taiga/events/backends/rabbitmq.py
@@ -0,0 +1,65 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import json
import logging

from amqp import Connection as AmqpConnection
from amqp.basic_message import Message as AmqpMessage
from urllib.parse import urlparse

from . import base

log = logging.getLogger("tagia.events")


def _make_rabbitmq_connection(url):
parse_result = urlparse(url)

# Parse host & user/password
try:
(authdata, host) = parse_result.netloc.split("@")
except Exception as e:
raise RuntimeError("Invalid url") from e

try:
(user, password) = authdata.split(":")
except Exception:
(user, password) = ("guest", "guest")

vhost = parse_result.path
return AmqpConnection(host=host, userid=user,
password=password, virtual_host=vhost)


class EventsPushBackend(base.BaseEventsPushBackend):
def __init__(self, url):
self.url = url

def emit_event(self, message:str, *, routing_key:str, channel:str="events"):
connection = _make_rabbitmq_connection(self.url)

try:
rchannel = connection.channel()
message = AmqpMessage(message)

rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True)
rchannel.basic_publish(message, routing_key=routing_key, exchange=channel)
rchannel.close()

except Exception:
log.error("Unhandled exception", exc_info=True)

finally:
connection.close()
61 changes: 0 additions & 61 deletions taiga/events/changes.py

This file was deleted.

101 changes: 101 additions & 0 deletions taiga/events/events.py
@@ -0,0 +1,101 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import json
import collections

from django.contrib.contenttypes.models import ContentType

from taiga.base.utils import json
from . import middleware as mw
from . import backends

# The complete list of content types
# of allowed models for change events
watched_types = set([
"userstories.userstory",
"issues.issue",
"tasks.task",
"wiki.wiki_page",
"milestones.milestone",
])


def _get_type_for_model(model_instance):
"""
Get content type tuple from model instance.
"""
ct = ContentType.objects.get_for_model(model_instance)
return ".".join([ct.app_label, ct.model])


def emit_event(data:dict, routing_key:str, *,
sessionid:str=None, channel:str="events"):
if not sessionid:
sessionid = mw.get_current_session_id()

data = {"session_id": sessionid,
"data": data}

backend = backends.get_events_backend()
return backend.emit_event(message=json.dumps(data),
routing_key=routing_key,
channel=channel)


def emit_event_for_model(obj, *, type:str="change", channel:str="events",
content_type:str=None, sessionid:str=None):
"""
Sends a model change event.
"""

assert type in set(["create", "change", "delete"])
assert hasattr(obj, "project_id")

if not content_type:
content_type = _get_type_for_model(obj)

projectid = getattr(obj, "project_id")
pk = getattr(obj, "pk", None)

app_name, model_name = content_type.split(".", 1)
routing_key = "changes.project.{0}.{1}".format(projectid, app_name)

data = {"type": type,
"matches": content_type,
"pk": pk}

return emit_event(routing_key=routing_key,
channel=channel,
sessionid=sessionid,
data=data)


def emit_event_for_ids(ids, content_type:str, projectid:int, *,
type:str="change", channel:str="events", sessionid:str=None):
assert type in set(["create", "change", "delete"])
assert isinstance(ids, collections.Iterable)
assert content_type, "content_type parameter is mandatory"

app_name, model_name = content_type.split(".", 1)
routing_key = "changes.project.{0}.{1}".format(projectid, app_name)

data = {"type": type,
"matches": content_type,
"pk": ids}

return emit_event(routing_key=routing_key,
channel=channel,
sessionid=sessionid,
data=data)
53 changes: 0 additions & 53 deletions taiga/events/models.py

This file was deleted.

0 comments on commit 43e16c2

Please sign in to comment.