diff --git a/requirements.txt b/requirements.txt index f5eaa4610..352c78932 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ psycopg2==2.5.4 pillow==2.5.3 pytz==2014.4 six==1.8.0 +amqp==1.4.6 djmail==0.9 django-pgjson==0.2.0 djorm-pgarray==1.0.4 diff --git a/settings/common.py b/settings/common.py index d3ac205e4..e90ca7745 100644 --- a/settings/common.py +++ b/settings/common.py @@ -87,7 +87,9 @@ DJMAIL_TEMPLATE_EXTENSION = "jinja" # Events backend -EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend" +# EVENTS_PUSH_BACKEND = "taiga.events.backends.postgresql.EventsPushBackend" +EVENTS_PUSH_BACKEND = "taiga.events.backends.rabbitmq.EventsPushBackend" +EVENTS_PUSH_BACKEND_OPTIONS = {"url": "//guest:guest@127.0.0.1/"} # Message System MESSAGE_STORAGE = "django.contrib.messages.storage.session.SessionStorage" diff --git a/taiga/events/__init__.py b/taiga/events/__init__.py index e69de29bb..bc6d8fa28 100644 --- a/taiga/events/__init__.py +++ b/taiga/events/__init__.py @@ -0,0 +1,17 @@ +# Copyright (C) 2014 Andrey Antukh +# Copyright (C) 2014 Jesús Espino +# Copyright (C) 2014 David Barragán +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +default_app_config = "taiga.events.apps.EventsAppConfig" diff --git a/taiga/events/apps.py b/taiga/events/apps.py new file mode 100644 index 000000000..40b518349 --- /dev/null +++ b/taiga/events/apps.py @@ -0,0 +1,39 @@ +# Copyright (C) 2014 Andrey Antukh +# Copyright (C) 2014 Jesús Espino +# Copyright (C) 2014 David Barragán +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys +from django.apps import AppConfig +from django.db.models import signals + +from . import signal_handlers as handlers + + +def connect_events_signals(): + signals.post_save.connect(handlers.on_save_any_model, dispatch_uid="events_change") + signals.post_delete.connect(handlers.on_delete_any_model, dispatch_uid="events_delete") + + +def disconnect_events_signals(): + signals.post_save.disconnect(dispatch_uid="events_change") + signals.post_delete.disconnect(dispatch_uid="events_delete") + + +class EventsAppConfig(AppConfig): + name = "taiga.events" + verbose_name = "Events App Config" + + def ready(self): + connect_events_signals() diff --git a/taiga/events/backends/base.py b/taiga/events/backends/base.py index 58f7a1a77..4eefcb555 100644 --- a/taiga/events/backends/base.py +++ b/taiga/events/backends/base.py @@ -21,7 +21,7 @@ class BaseEventsPushBackend(object, metaclass=abc.ABCMeta): @abc.abstractmethod - def emit_event(self, message:str, *, channel:str="events"): + def emit_event(self, message:str, *, routing_key:str, channel:str="events"): pass diff --git a/taiga/events/backends/postgresql.py b/taiga/events/backends/postgresql.py index 907504659..696a0813d 100644 --- a/taiga/events/backends/postgresql.py +++ b/taiga/events/backends/postgresql.py @@ -20,7 +20,10 @@ class EventsPushBackend(base.BaseEventsPushBackend): @transaction.atomic - def emit_event(self, message:str, *, channel:str="events"): + def emit_event(self, message:str, *, routing_key:str, channel:str="events"): + routing_key = routing_key.replace(".", "__") + channel = "{channel}_{routing_key}".format(channel=channel, + routing_key=routing_key) sql = "NOTIFY {channel}, %s".format(channel=channel) cursor = connection.cursor() cursor.execute(sql, [message]) diff --git a/taiga/events/backends/rabbitmq.py b/taiga/events/backends/rabbitmq.py new file mode 100644 index 000000000..a745a1961 --- /dev/null +++ b/taiga/events/backends/rabbitmq.py @@ -0,0 +1,65 @@ +# Copyright (C) 2014 Andrey Antukh +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import logging + +from amqp import Connection as AmqpConnection +from amqp.basic_message import Message as AmqpMessage +from urllib.parse import urlparse + +from . import base + +log = logging.getLogger("tagia.events") + + +def _make_rabbitmq_connection(url): + parse_result = urlparse(url) + + # Parse host & user/password + try: + (authdata, host) = parse_result.netloc.split("@") + except Exception as e: + raise RuntimeError("Invalid url") from e + + try: + (user, password) = authdata.split(":") + except Exception: + (user, password) = ("guest", "guest") + + vhost = parse_result.path + return AmqpConnection(host=host, userid=user, + password=password, virtual_host=vhost) + + +class EventsPushBackend(base.BaseEventsPushBackend): + def __init__(self, url): + self.url = url + + def emit_event(self, message:str, *, routing_key:str, channel:str="events"): + connection = _make_rabbitmq_connection(self.url) + + try: + rchannel = connection.channel() + message = AmqpMessage(message) + + rchannel.exchange_declare(exchange=channel, type="topic", auto_delete=True) + rchannel.basic_publish(message, routing_key=routing_key, exchange=channel) + rchannel.close() + + except Exception: + log.error("Unhandled exception", exc_info=True) + + finally: + connection.close() diff --git a/taiga/events/changes.py b/taiga/events/changes.py deleted file mode 100644 index fc15c8d32..000000000 --- a/taiga/events/changes.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (C) 2014 Andrey Antukh -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import json - -from django.contrib.contenttypes.models import ContentType -from . import backends - -# The complete list of content types -# of allowed models for change events -watched_types = ( - ("userstories", "userstory"), - ("issues", "issue"), -) - - -def _get_type_for_model(model_instance): - """ - Get content type tuple from model instance. - """ - ct = ContentType.objects.get_for_model(model_instance) - return (ct.app_label, ct.model) - - -def emit_change_event_for_model(model_instance, sessionid:str, *, - type:str="change", channel:str="events"): - """ - Emit change event for notify of model change to - all connected frontends. - """ - content_type = _get_type_for_model(model_instance) - - assert hasattr(model_instance, "project_id") - assert content_type in watched_types - assert type in ("create", "change", "delete") - - project_id = model_instance.project_id - routing_key = "project.{0}".format(project_id) - - data = {"type": "model-changes", - "routing_key": routing_key, - "session_id": sessionid, - "data": { - "type": type, - "matches": ".".join(content_type), - "pk": model_instance.pk}} - - backend = backends.get_events_backend() - return backend.emit_event(json.dumps(data), channel="events") - diff --git a/taiga/events/events.py b/taiga/events/events.py new file mode 100644 index 000000000..f1d053af3 --- /dev/null +++ b/taiga/events/events.py @@ -0,0 +1,101 @@ +# Copyright (C) 2014 Andrey Antukh +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import collections + +from django.contrib.contenttypes.models import ContentType + +from taiga.base.utils import json +from . import middleware as mw +from . import backends + +# The complete list of content types +# of allowed models for change events +watched_types = set([ + "userstories.userstory", + "issues.issue", + "tasks.task", + "wiki.wiki_page", + "milestones.milestone", +]) + + +def _get_type_for_model(model_instance): + """ + Get content type tuple from model instance. + """ + ct = ContentType.objects.get_for_model(model_instance) + return ".".join([ct.app_label, ct.model]) + + +def emit_event(data:dict, routing_key:str, *, + sessionid:str=None, channel:str="events"): + if not sessionid: + sessionid = mw.get_current_session_id() + + data = {"session_id": sessionid, + "data": data} + + backend = backends.get_events_backend() + return backend.emit_event(message=json.dumps(data), + routing_key=routing_key, + channel=channel) + + +def emit_event_for_model(obj, *, type:str="change", channel:str="events", + content_type:str=None, sessionid:str=None): + """ + Sends a model change event. + """ + + assert type in set(["create", "change", "delete"]) + assert hasattr(obj, "project_id") + + if not content_type: + content_type = _get_type_for_model(obj) + + projectid = getattr(obj, "project_id") + pk = getattr(obj, "pk", None) + + app_name, model_name = content_type.split(".", 1) + routing_key = "changes.project.{0}.{1}".format(projectid, app_name) + + data = {"type": type, + "matches": content_type, + "pk": pk} + + return emit_event(routing_key=routing_key, + channel=channel, + sessionid=sessionid, + data=data) + + +def emit_event_for_ids(ids, content_type:str, projectid:int, *, + type:str="change", channel:str="events", sessionid:str=None): + assert type in set(["create", "change", "delete"]) + assert isinstance(ids, collections.Iterable) + assert content_type, "content_type parameter is mandatory" + + app_name, model_name = content_type.split(".", 1) + routing_key = "changes.project.{0}.{1}".format(projectid, app_name) + + data = {"type": type, + "matches": content_type, + "pk": ids} + + return emit_event(routing_key=routing_key, + channel=channel, + sessionid=sessionid, + data=data) diff --git a/taiga/events/models.py b/taiga/events/models.py deleted file mode 100644 index 6958276f4..000000000 --- a/taiga/events/models.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (C) 2014 Andrey Antukh -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -from django.db.models import signals -from django.dispatch import receiver - -from . import middleware as mw -from . import changes - - -@receiver(signals.post_save, dispatch_uid="events_dispatcher_on_change") -def on_save_any_model(sender, instance, created, **kwargs): - # Ignore any object that can not have project_id - content_type = changes._get_type_for_model(instance) - - # Ignore changes on import - if getattr(instance, '_importing', False): - return - - # Ignore any other changes - if content_type not in changes.watched_types: - return - - sesionid = mw.get_current_session_id() - - if created: - changes.emit_change_event_for_model(instance, sesionid, type="create") - else: - changes.emit_change_event_for_model(instance, sesionid, type="change") - - -@receiver(signals.post_delete, dispatch_uid="events_dispatcher_on_delete") -def on_delete_any_model(sender, instance, **kwargs): - # Ignore any object that can not have project_id - content_type = changes._get_type_for_model(instance) - - # Ignore any other changes - if content_type not in changes.watched_types: - return - - sesionid = mw.get_current_session_id() - changes.emit_change_event_for_model(instance, sesionid, type="delete") diff --git a/taiga/events/signal_handlers.py b/taiga/events/signal_handlers.py new file mode 100644 index 000000000..c2841a76b --- /dev/null +++ b/taiga/events/signal_handlers.py @@ -0,0 +1,34 @@ + +from django.db.models import signals +from django.dispatch import receiver + +from . import middleware as mw +from . import events + + +def on_save_any_model(sender, instance, created, **kwargs): + # Ignore any object that can not have project_id + content_type = events._get_type_for_model(instance) + + # Ignore any other events + if content_type not in events.watched_types: + return + + sesionid = mw.get_current_session_id() + + if created: + events.emit_event_for_model(instance, sessionid=sesionid, type="create") + else: + events.emit_event_for_model(instance, sessionid=sesionid, type="change") + + +def on_delete_any_model(sender, instance, **kwargs): + # Ignore any object that can not have project_id + content_type = events._get_type_for_model(instance) + + # Ignore any other changes + if content_type not in events.watched_types: + return + + sesionid = mw.get_current_session_id() + events.emit_event_for_model(instance, sessionid=sesionid, type="delete") diff --git a/taiga/projects/management/commands/sample_data.py b/taiga/projects/management/commands/sample_data.py index b9288034c..febdb6686 100644 --- a/taiga/projects/management/commands/sample_data.py +++ b/taiga/projects/management/commands/sample_data.py @@ -14,11 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import random +import datetime + from django.core.management.base import BaseCommand from django.db import transaction from django.utils.timezone import now from django.conf import settings - from django.contrib.webdesign import lorem_ipsum from django.contrib.contenttypes.models import ContentType @@ -34,9 +36,8 @@ from taiga.projects.attachments.models import * from taiga.projects.history.services import take_snapshot +from taiga.events.apps import disconnect_events_signals -import random -import datetime ATTACHMENT_SAMPLE_DATA = [ "taiga/projects/management/commands/sample_data", @@ -102,6 +103,9 @@ class Command(BaseCommand): @transaction.atomic def handle(self, *args, **options): + # Prevent events emission when sample data is running + disconnect_events_signals() + self.users = [User.objects.get(is_superuser=True)] # create users diff --git a/taiga/projects/userstories/api.py b/taiga/projects/userstories/api.py index 9999fa52c..f4b604ed9 100644 --- a/taiga/projects/userstories/api.py +++ b/taiga/projects/userstories/api.py @@ -87,7 +87,9 @@ def bulk_update_backlog_order(self, request, **kwargs): project = get_object_or_404(Project, pk=data["project_id"]) self.check_permissions(request, "bulk_update_order", project) - services.update_userstories_order_in_bulk(data["bulk_stories"], field="backlog_order") + services.update_userstories_order_in_bulk(data["bulk_stories"], + project=project, + field="backlog_order") services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) return response.NoContent() @@ -102,7 +104,9 @@ def bulk_update_sprint_order(self, request, **kwargs): project = get_object_or_404(Project, pk=data["project_id"]) self.check_permissions(request, "bulk_update_order", project) - services.update_userstories_order_in_bulk(data["bulk_stories"], field="sprint_order") + services.update_userstories_order_in_bulk(data["bulk_stories"], + project=project, + field="sprint_order") services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) return response.NoContent() @@ -116,7 +120,9 @@ def bulk_update_kanban_order(self, request, **kwargs): project = get_object_or_404(Project, pk=data["project_id"]) self.check_permissions(request, "bulk_update_order", project) - services.update_userstories_order_in_bulk(data["bulk_stories"], field="kanban_order") + services.update_userstories_order_in_bulk(data["bulk_stories"], + project=project, + field="kanban_order") services.snapshot_userstories_in_bulk(data["bulk_stories"], request.user) return response.NoContent() diff --git a/taiga/projects/userstories/services.py b/taiga/projects/userstories/services.py index 54fb3810e..0d70cb1eb 100644 --- a/taiga/projects/userstories/services.py +++ b/taiga/projects/userstories/services.py @@ -18,6 +18,7 @@ from taiga.base.utils import db, text from taiga.projects.history.services import take_snapshot +from taiga.events import events from . import models @@ -48,7 +49,7 @@ def create_userstories_in_bulk(bulk_data, callback=None, precall=None, **additio return userstories -def update_userstories_order_in_bulk(bulk_data:list, field:str): +def update_userstories_order_in_bulk(bulk_data:list, field:str, project:object): """ Update the order of some user stories. `bulk_data` should be a list of tuples with the following format: @@ -61,6 +62,10 @@ def update_userstories_order_in_bulk(bulk_data:list, field:str): user_story_ids.append(us_data["us_id"]) new_order_values.append({field: us_data["order"]}) + events.emit_event_for_ids(ids=user_story_ids, + content_type="userstories.userstory", + projectid=project.pk) + db.update_in_bulk_with_ids(user_story_ids, new_order_values, model=models.UserStory) diff --git a/tests/integration/test_userstories.py b/tests/integration/test_userstories.py index c58e78909..63721ecaa 100644 --- a/tests/integration/test_userstories.py +++ b/tests/integration/test_userstories.py @@ -36,8 +36,11 @@ def test_create_userstories_in_bulk(): def test_update_userstories_order_in_bulk(): data = [{"us_id": 1, "order": 1}, {"us_id": 2, "order": 2}] + project = mock.Mock() + project.pk = 1 + with mock.patch("taiga.projects.userstories.services.db") as db: - services.update_userstories_order_in_bulk(data, "backlog_order") + services.update_userstories_order_in_bulk(data, "backlog_order", project) db.update_in_bulk_with_ids.assert_called_once_with([1, 2], [{"backlog_order": 1}, {"backlog_order": 2}], model=models.UserStory)