From c5928d7a68a4198b6fada51b43a6d274b6b48ccf Mon Sep 17 00:00:00 2001 From: Lukas Holecek Date: Fri, 8 Oct 2021 10:03:40 +0200 Subject: [PATCH] Add stomp consumers Starts message consumers in WSGI HTTP server. This allows scaling the consumers easily with `gunicorn --workers=X` and enables access to `api/v1.0/metrics` endpoint containing message metrics. Example to start the consumers: gunicorn-3 --workers=4 greenwave.resultsdb_listener:app --- Dockerfile | 1 + docker-compose.yml | 82 +++++++-- docker/dev.env | 1 - docker/fedmsg.d/base.py | 48 ----- docker/fedmsg.d/logging.py | 38 ---- docker/greenwave-settings.py | 21 ++- docker/resultsdb-settings.py | 19 +- docker/umb.yml | 7 +- docker/waiverdb-settings.py | 11 +- greenwave/app_factory.py | 1 + greenwave/base_listener.py | 303 ++++++++++++++++++++++++++++++++ greenwave/config.py | 25 +++ greenwave/resultsdb_listener.py | 120 +++++++++++++ greenwave/waiverdb_listener.py | 32 ++++ requirements.txt | 1 + 15 files changed, 590 insertions(+), 120 deletions(-) delete mode 100644 docker/fedmsg.d/base.py delete mode 100644 docker/fedmsg.d/logging.py create mode 100644 greenwave/base_listener.py create mode 100644 greenwave/resultsdb_listener.py create mode 100644 greenwave/waiverdb_listener.py diff --git a/Dockerfile b/Dockerfile index 7545f086..2751fcbb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ RUN dnf -y install \ python3-prometheus_client \ python3-PyYAML \ python3-requests \ + python3-stomppy \ && dnf -y clean all \ && rm -rf /tmp/* diff --git a/docker-compose.yml b/docker-compose.yml index 8e92c443..73c66c4e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ version: '2.1' services: dev: build: . + image: greenwave_dev user: ${DEV_USER_ID:-1000} working_dir: /src env_file: ["docker/dev.env"] @@ -20,23 +21,6 @@ services: - resultsdb - waiverdb - resultsdb-consumer: - build: . - user: ${DEV_USER_ID:-1000} - working_dir: / - env_file: docker/dev.env - command: ["/usr/bin/fedmsg-hub-3"] - volumes: - - ./:/src:ro,z - - ./docker/home:/home/dev:rw,z - - ./docker/greenwave-settings.py:/etc/greenwave/settings.py:ro,z - - ./conf/policies/:/etc/greenwave/policies/:ro,z - - ./conf/subject_types/:/etc/greenwave/subject_types/:ro,z - - ./docker/fedmsg.d:/etc/fedmsg.d:ro,z - depends_on: - - memcached - - umb - resultsdb-db: image: postgres:9.5.2 restart: always @@ -52,6 +36,8 @@ services: context: ../resultsdb dockerfile: Dockerfile working_dir: /code + environment: + - GREENWAVE_LISTENERS=${GREENWAVE_LISTENERS:-1} command: ["bash", "-c", "/start.sh"] volumes: - ../resultsdb:/code:ro,z @@ -76,6 +62,8 @@ services: dockerfile: openshift/containers/waiverdb/Dockerfile working_dir: /code env_file: ["docker/waiverdb.env"] + environment: + - GREENWAVE_LISTENERS=${GREENWAVE_LISTENERS:-1} command: ["bash", "-c", "/start.sh"] volumes: - ../waiverdb:/code:ro,z @@ -92,8 +80,64 @@ services: memcached: image: "quay.io/factory2/memcached" + resultsdb-listener: + build: . + image: greenwave_dev + user: ${DEV_USER_ID:-1000} + working_dir: / + env_file: ["docker/dev.env"] + command: + - "/usr/bin/gunicorn-3" + - "--reload" + - "--bind=0.0.0.0:8082" + - "--access-logfile=-" + - "--enable-stdio-inheritance" + - "--workers=2" + - "greenwave.resultsdb_listener:app" + ports: + - 8082:8082 + volumes: + - ./:/src:ro,z + - ./docker/home:/home/dev:rw,z + - ./docker/greenwave-settings.py:/etc/greenwave/settings.py:ro,z + - ./conf/policies/:/etc/greenwave/policies/:ro,z + - ./conf/subject_types/:/etc/greenwave/subject_types/:ro,z + depends_on: + - memcached + - umb + deploy: + replicas: ${GREENWAVE_LISTENERS:-1} + + waiverdb-listener: + build: . + image: greenwave_dev + user: ${DEV_USER_ID:-1000} + working_dir: / + env_file: ["docker/dev.env"] + command: + - "/usr/bin/gunicorn-3" + - "--reload" + - "--bind=0.0.0.0:8083" + - "--access-logfile=-" + - "--enable-stdio-inheritance" + - "--workers=2" + - "greenwave.waiverdb_listener:app" + ports: + - 8083:8083 + volumes: + - ./:/src:ro,z + - ./docker/home:/home/dev:rw,z + - ./docker/greenwave-settings.py:/etc/greenwave/settings.py:ro,z + - ./conf/policies/:/etc/greenwave/policies/:ro,z + - ./conf/subject_types/:/etc/greenwave/subject_types/:ro,z + depends_on: + - memcached + - umb + deploy: + replicas: ${GREENWAVE_LISTENERS:-1} + umb: - image: docker-registry.upshift.redhat.com/c3i/umb:latest + image: images.paas.redhat.com/exd-sp-guild-c3i/umb:latest restart: unless-stopped command: - "java" @@ -105,6 +149,8 @@ services: ports: - 5671:5671 # amqp - 61612:61612 # stomp + deploy: + replicas: ${GREENWAVE_LISTENERS:-1} networks: default: diff --git a/docker/dev.env b/docker/dev.env index dfc020bb..d727ddd4 100644 --- a/docker/dev.env +++ b/docker/dev.env @@ -5,4 +5,3 @@ WAIVERDB_TEST_URL=http://waiverdb:5004/ RESULTSDB_TEST_URL=http://resultsdb:5001/ PYTEST_ADDOPTS=-o cache_dir=/home/dev/.pytest_cache PYTHONPATH=/src -RESULTSDB_HANDLER=please diff --git a/docker/fedmsg.d/base.py b/docker/fedmsg.d/base.py deleted file mode 100644 index c4df0713..00000000 --- a/docker/fedmsg.d/base.py +++ /dev/null @@ -1,48 +0,0 @@ -import os - -config = { - # fedmsg boilerplate - 'endpoints': {}, - 'sign_messages': False, - 'validate_signatures': False, - - # STOMP settings - 'zmq_enabled': False, - 'stomp_uri': 'umb:61612', - 'stomp_heartbeat': 900000, - 'stomp_ack_mode': 'client-individual', - - # Hacks to make us publish to - # /topic/VirtualTopic.eng.greenwave.decision.update - 'topic_prefix': '/topic/VirtualTopic', - 'environment': 'eng', - 'resultsdb_topic_suffix': 'resultsdb.result.new', - 'waiverdb_topic_suffix': 'waiverdb.waiver.new', - - # Workaround for moksha memory leak and mitigate message loss. - # Memory leak is fixed in python-moksha-hub-1.5.7 (https://github.com/mokshaproject/moksha/pull/57). - 'moksha.blocking_mode': True, - - # moksha-monitor-exporter's point of contact - 'moksha.monitoring.socket': 'tcp://0.0.0.0:10030', -} - -# Enable one consumer or the other in different deployments. -if os.environ.get("RESULTSDB_HANDLER") and os.environ.get("WAIVERDB_HANDLER"): - raise ValueError("Both RESULTSDB_HANDLER and WAIVERDB_HANDLER may not" - "be specified. Only one.") - -if os.environ.get("RESULTSDB_HANDLER"): - config.update({ - 'resultsdb_handler': True, - 'waiverdb_handler': False, - 'stomp_queue': '/queue/Consumer.client-greenwave.resultsdb.VirtualTopic.eng.resultsdb.result.new', - }) -elif os.environ.get("WAIVERDB_HANDLER"): - config.update({ - 'resultsdb_handler': False, - 'waiverdb_handler': True, - 'stomp_queue': '/queue/Consumer.client-greenwave.waiverdb.VirtualTopic.eng.waiverdb.waiver.new', - }) - - diff --git a/docker/fedmsg.d/logging.py b/docker/fedmsg.d/logging.py deleted file mode 100644 index 12b821b2..00000000 --- a/docker/fedmsg.d/logging.py +++ /dev/null @@ -1,38 +0,0 @@ -config = dict(logging={ - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "bare": { - "datefmt": "%Y-%m-%d %H:%M:%S", - "format": "[%(asctime)s][%(name)10s %(levelname)7s] %(message)s" - } - }, - "loggers": { - "greenwave": { - "handlers": ["console"], "propagate": True, "level": "DEBUG" - }, - "moksha": { - "handlers": ["console"], "propagate": False, "level": "DEBUG" - }, - "requests": { - "handlers": ["console"], "propagate": False, "level": "DEBUG" - }, - "resultsdb_handler": { - "handlers": ["console"], "propagate": False, "level": "DEBUG" - }, - "waiverdb_handler": { - "handlers": ["console"], "propagate": False, "level": "DEBUG" - }, - "dogpile": { - "handlers": ["console"], "propagate": False, "level": "DEBUG" - }, - }, - "handlers": { - "console": { - "formatter": "bare", - "class": "logging.StreamHandler", - "stream": "ext://sys.stdout", - "level": "DEBUG" - } - }, -}) diff --git a/docker/greenwave-settings.py b/docker/greenwave-settings.py index 30a5f8f5..5fbaeb68 100644 --- a/docker/greenwave-settings.py +++ b/docker/greenwave-settings.py @@ -2,10 +2,20 @@ HOST = '127.0.0.1' PORT = 8080 DEBUG = True -POLICIES_DIR = '/etc/greenwave/policies/' -WAIVERDB_API_URL = 'http://waiverdb:5004/api/v1.0' -RESULTSDB_API_URL = 'http://resultsdb:5001/api/v2.0' -GREENWAVE_API_URL = 'http://dev:8080/api/v1.0' +POLICIES_DIR = "/etc/greenwave/policies/" +WAIVERDB_API_URL = "http://waiverdb:5004/api/v1.0" +RESULTSDB_API_URL = "http://resultsdb:5001/api/v2.0" +LISTENER_HOSTS = "umb:61612" +LISTENER_CONNECTION_SSL = None +LISTENER_CONNECTION = { + "heartbeats": (10000, 20000), + "keepalive": True, + "timeout": 5000, + "reconnect_sleep_initial": 1.0, + "reconnect_sleep_increase": 1.0, + "reconnect_sleep_max": 10.0, + "reconnect_attempts_max": 5, +} CACHE = { # 'backend': "dogpile.cache.null", 'backend': "dogpile.cache.memcached", @@ -26,6 +36,9 @@ 'dogpile.cache': { 'level': 'DEBUG', }, + "stomp.py": { + "level": "DEBUG", + }, }, 'handlers': { 'console': { diff --git a/docker/resultsdb-settings.py b/docker/resultsdb-settings.py index 3506522d..c89c0918 100644 --- a/docker/resultsdb-settings.py +++ b/docker/resultsdb-settings.py @@ -1,3 +1,5 @@ +import os + SECRET_KEY = 'resultsdb' SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://resultsdb:resultsdb@resultsdb-db:5432/resultsdb' FILE_LOGGING = False @@ -6,14 +8,15 @@ STREAM_LOGGING = True RUN_HOST = '0.0.0.0' RUN_PORT = 5001 -MESSAGE_BUS_PUBLISH = True -MESSAGE_BUS_PLUGIN = 'stomp' +ADDITIONAL_RESULT_OUTCOMES = ("RUNNING", "QUEUED", "ERROR") + +MESSAGE_BUS_PUBLISH = os.environ.get("GREENWAVE_LISTENERS", "") not in ("", "0") +MESSAGE_BUS_PLUGIN = "stomp" MESSAGE_BUS_KWARGS = { - 'modname': 'resultsdb', - 'destination': '/topic/VirtualTopic.eng.resultsdb.result.new', - 'connection': { - 'host_and_ports': [('localhost', 61612)], - 'use_ssl': False, + "modname": "resultsdb", + "destination": "/topic/VirtualTopic.eng.resultsdb.result.new", + "connection": { + "host_and_ports": [("umb", 61612)], + "use_ssl": False, }, } -ADDITIONAL_RESULT_OUTCOMES = ('RUNNING', 'QUEUED', 'ERROR') diff --git a/docker/umb.yml b/docker/umb.yml index 2fa333bf..3fef39d6 100644 --- a/docker/umb.yml +++ b/docker/umb.yml @@ -22,9 +22,12 @@ umb: plugins: enableLdapBackedAuthentication: false enableLdapBackedAuthorization: false + enablePlatformSecurityAuthorizationBroker: false + enablePlatformLibraryAuthentication: false + enablePlatformLibraryAuthorization: false transportConnectors: - - amqp://0.0.0.0:5671?transport.maximumConnections=2500&transport.needClientAuth=true&transport.transformer=jms&wireFormat.allowNonSaslConnections=true&transport.closeAsync=false&transport.daemon=true&wireFormat.maxInactivityDurationInitalDelay=60000&wireFormat.maxInactivityDuration=60000 - - stomp://0.0.0.0:61612?transport.maximumConnections=2500&transport.needClientAuth=true&transport.closeAsync=false&transport.daemon=true&wireFormat.maxInactivityDurationInitalDelay=60000&wireFormat.maxInactivityDuration=60000 + - amqp://0.0.0.0:5671?transport.maximumConnections=2500&transport.needClientAuth=false&transport.transformer=jms&wireFormat.allowNonSaslConnections=true&transport.closeAsync=false&transport.daemon=true&wireFormat.maxInactivityDurationInitalDelay=60000&wireFormat.maxInactivityDuration=60000 + - stomp://0.0.0.0:61612?transport.maximumConnections=2500&transport.needClientAuth=false&transport.closeAsync=false&transport.daemon=true&wireFormat.maxInactivityDurationInitalDelay=60000&wireFormat.maxInactivityDuration=60000 logging: pattern: dateformat: "yyyy-MM-dd'T'HH:mm:ss.SSSZ" diff --git a/docker/waiverdb-settings.py b/docker/waiverdb-settings.py index 04e3fff8..1f5c63b6 100644 --- a/docker/waiverdb-settings.py +++ b/docker/waiverdb-settings.py @@ -9,7 +9,16 @@ PORT = 5004 #AUTH_METHOD = 'OIDC' AUTH_METHOD = 'dummy' -MESSAGE_BUS_PUBLISH = False SUPERUSERS = ['dummy'] #OIDC_CLIENT_SECRETS = '/etc/secret/client_secrets.json' RESULTSDB_API_URL = 'http://resultsdb:5001/api/v2.0' + +MESSAGE_BUS_PUBLISH = os.environ.get("GREENWAVE_LISTENERS", "") not in ("", "0") +MESSAGE_PUBLISHER = "stomp" +STOMP_CONFIGS = { + "destination": "/topic/VirtualTopic.eng.waiverdb.waiver.new", + "connection": { + "host_and_ports": [("umb", 61612)], + "use_ssl": False, + }, +} diff --git a/greenwave/app_factory.py b/greenwave/app_factory.py index 1a8a98c2..5a562d02 100644 --- a/greenwave/app_factory.py +++ b/greenwave/app_factory.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: GPL-2.0+ import logging +import logging.config from flask import Flask from greenwave.api_v1 import api diff --git a/greenwave/base_listener.py b/greenwave/base_listener.py new file mode 100644 index 00000000..2c175f74 --- /dev/null +++ b/greenwave/base_listener.py @@ -0,0 +1,303 @@ +# SPDX-License-Identifier: GPL-2.0+ +import json +import logging +import os +import signal +import threading +import uuid + +import stomp +from requests.exceptions import HTTPError + +import greenwave.app_factory +from greenwave.logger import init_logging, log_to_stdout +from greenwave.monitor import ( + messaging_rx_counter, + messaging_rx_failed_counter, + messaging_rx_ignored_counter, + messaging_rx_processed_ok_counter, + messaging_tx_failed_counter, + messaging_tx_sent_ok_counter, + messaging_tx_stopped_counter, + messaging_tx_to_send_counter, + publish_decision_exceptions_result_counter, +) +from greenwave.policies import applicable_decision_context_product_version_pairs +from greenwave.utils import right_before_this_time + +GREENWAVE_LISTENER_PREFIX = "greenwave" + + +def _equals_except_keys(lhs, rhs, except_keys): + keys = lhs.keys() - except_keys + return lhs.keys() == rhs.keys() and all(lhs[key] == rhs[key] for key in keys) + + +def _is_decision_unchanged(old_decision, decision): + """ + Returns true only if new decision is same as old one + (ignores result_id values). + """ + if old_decision is None or decision is None: + return old_decision == decision + + requirements_keys = ("satisfied_requirements", "unsatisfied_requirements") + if not _equals_except_keys(old_decision, decision, requirements_keys): + return False + + ignore_keys = ("result_id",) + for key in requirements_keys: + old_requirements = old_decision[key] + requirements = decision[key] + if len(old_requirements) != len(requirements): + return False + + for old_requirement, requirement in zip(old_requirements, requirements): + if not _equals_except_keys(old_requirement, requirement, ignore_keys): + return False + + return True + + +def _send_ack(listener, headers): + listener.connection.ack(headers["message-id"], listener.uid) + + +def _send_nack(listener, headers): + listener.connection.nack(headers["message-id"], listener.uid) + + +class BaseListener(stomp.ConnectionListener): + monitor_labels = {"handler": "greenwave_consumer"} + + def __init__(self, uid_suffix): + super().__init__() + self.connection = None + self.topic = None + + self.connection_condition = threading.Condition() + self.connecting = False + self.stop = False + + self.uid = f"{GREENWAVE_LISTENER_PREFIX}-{uid_suffix}-{uuid.uuid1().hex}" + + init_logging() + log_to_stdout(logging.DEBUG) + self.app = greenwave.app_factory.create_app() + + self.destination = self.app.config["LISTENER_DECISION_UPDATE_DESTINATION"] + + def on_error(self, frame): + self.app.logger.warning("Received an error: %s", frame.body) + + def on_message(self, frame): + with self.connection_condition: + if self.stop: + _send_nack(self, frame.headers) + return + + self.app.logger.debug("Received a message: %s", frame.body) + _send_ack(self, frame.headers) + self._inc(messaging_rx_counter) + + try: + data = json.loads(frame.body) + except json.JSONDecodeError as e: + self.app.logger.debug("Failed to decode JSON message: %s", e) + self._inc(messaging_rx_ignored_counter) + return + + try: + with self.app.app_context(): + processed = self._consume_message(data) + except BaseException: + self._inc(messaging_rx_failed_counter) + raise + + if processed: + self._inc(messaging_rx_processed_ok_counter) + else: + self._inc(messaging_rx_ignored_counter) + + def connect(self): + with self.connection_condition: + if self.connecting or self.connection.is_connected(): + return + + self.app.logger.debug("Connecting listener") + self.connecting = True + try: + self.connection.connect(wait=True) + except BaseException: + self.app.logger.exception("Failed to connect") + self._terminate() + finally: + self.connecting = False + + def subscribe(self): + self.connection.subscribe( + destination=self.topic, id=self.uid, ack="client-individual" + ) + self.app.logger.debug("Subscribed %s to %s", self.uid, self.topic) + + def on_connected(self, frame): + self.app.logger.debug("Listener: on_connected") + self.subscribe() + + def on_disconnected(self): + self.app.logger.debug("Listener: on_disconnected") + self._terminate() + + def on_receiver_loop_completed(self, frame): + """This is also called on heartbeat timeout.""" + self.app.logger.debug("Listener: on_receiver_loop_completed") + self._terminate() + + def listen(self): + if self.connection is not None: + self.app.logger.warning("Already connected") + return + + def handler(signum, frame): + self.app.logger.warning("Stopping listener on signal %s", signum) + self.disconnect() + + signal.signal(signal.SIGTERM, handler) + + hosts = self.app.config["LISTENER_HOSTS"] + hosts_and_ports = [tuple(url.split(":")) for url in hosts.split(",")] + connection_args = self.app.config["LISTENER_CONNECTION"] + if not connection_args.get("host_and_ports"): + connection_args["host_and_ports"] = hosts_and_ports + + connection = stomp.connect.StompConnection11(**connection_args) + + ssl_args = self.app.config["LISTENER_CONNECTION_SSL"] + if ssl_args: + if not ssl_args.get("for_hosts"): + ssl_args["for_hosts"] = hosts_and_ports + connection.set_ssl(**ssl_args) + + self.connection = connection + connection.set_listener("", self) + self.connect() + + self.app.logger.info("Listening on %s", self.topic) + + def disconnect(self): + self.app.logger.debug("Disconnecting listener") + with self.connection_condition: + self.stop = True + self.connection.disconnect() + + def _terminate(self): + self.disconnect() + os.kill(os.getpid(), signal.SIGQUIT) + + def _consume_message(self, message): + """ + Processes messages and either calls + self._publish_decision_change() and returns True or just + returns False. + """ + raise NotImplementedError() + + def _inc(self, messaging_counter): + """Helper method to increase monitoring counter.""" + messaging_counter.labels(**self.monitor_labels).inc() + + def _publish_decision_update(self, decision): + message = {"msg": decision} + body = json.dumps(message) + while True: + try: + self.connection.send( + body=body, headers={}, destination=self.destination + ) + break + except stomp.exception.NotConnectedException: + self.app.logger.warning("Reconnecting to send message") + self.connect() + except Exception: + self.app.logger.exception("Error sending decision update message") + self._inc(messaging_tx_failed_counter) + raise + + self._inc(messaging_tx_sent_ok_counter) + + def _old_and_new_decisions(self, submit_time, **request_data): + """Returns decision before and after submit time.""" + try: + decision = greenwave.decision.make_decision(request_data, self.app.config) + + request_data["when"] = right_before_this_time(submit_time) + old_decision = greenwave.decision.make_decision( + request_data, self.app.config + ) + self.app.logger.debug("old decision: %s", old_decision) + except HTTPError as e: + self.app.logger.exception( + "Failed to retrieve decision for data=%s, error: %s", request_data, e + ) + self._inc(messaging_tx_stopped_counter) + return None, None + + return old_decision, decision + + @publish_decision_exceptions_result_counter.count_exceptions() + def _publish_decision_change( + self, submit_time, subject, testcase, product_version, publish_testcase + ): + + policy_attributes = dict( + subject=subject, + testcase=testcase, + ) + + if product_version: + policy_attributes["product_version"] = product_version + + policies = self.app.config["policies"] + contexts_product_versions = applicable_decision_context_product_version_pairs( + policies, **policy_attributes + ) + + self.app.logger.info("Getting greenwave info") + + for decision_context, product_version in sorted(contexts_product_versions): + self._inc(messaging_tx_to_send_counter) + + old_decision, decision = self._old_and_new_decisions( + submit_time, + decision_context=decision_context, + product_version=product_version, + subject_type=subject.type, + subject_identifier=subject.identifier, + ) + if decision is None: + continue + + if _is_decision_unchanged(old_decision, decision): + self.app.logger.debug( + "Skipped emitting fedmsg, decision did not change: %s", decision + ) + self._inc(messaging_tx_stopped_counter) + continue + + decision.update( + { + "subject_type": subject.type, + "subject_identifier": subject.identifier, + # subject is for backwards compatibility only: + "subject": [subject.to_dict()], + "decision_context": decision_context, + "product_version": product_version, + "previous": old_decision, + } + ) + if publish_testcase: + decision["testcase"] = testcase + + self.app.logger.info("Publising decision change message: %r", decision) + self._publish_decision_update(decision) + self._inc(messaging_tx_stopped_counter) diff --git a/greenwave/config.py b/greenwave/config.py index 8de01406..6cef10ba 100644 --- a/greenwave/config.py +++ b/greenwave/config.py @@ -56,6 +56,31 @@ class Config(object): OUTCOMES_ERROR = ('ERROR',) OUTCOMES_INCOMPLETE = ('QUEUED', 'RUNNING') + LISTENER_HOSTS = "" + LISTENER_RESULTSDB_QUEUE = ( + "/queue/Consumer.client-greenwave.dev.VirtualTopic.eng.resultsdb.result.new" + ) + LISTENER_WAIVERDB_QUEUE = ( + "/queue/Consumer.client-greenwave.dev.VirtualTopic.eng.waiverdb.waiver.new" + ) + LISTENER_DECISION_UPDATE_DESTINATION = ( + "/topic/VirtualTopic.eng.greenwave.decision.update" + ) + LISTENER_CONNECTION = { + "heartbeats": (10000, 20000), + "keepalive": True, + "timeout": 5000, + "reconnect_sleep_initial": 1.0, + "reconnect_sleep_increase": 1.0, + "reconnect_sleep_max": 60.0, + "reconnect_attempts_max": 10, + } + LISTENER_CONNECTION_SSL = { + "key_file": "/etc/pki/umb/umb-key", + "cert_file": "/etc/pki/umb/umb-crt", + "ca_certs": "/etc/pki/umb/umb-ca", + } + class ProductionConfig(Config): DEBUG = False diff --git a/greenwave/resultsdb_listener.py b/greenwave/resultsdb_listener.py new file mode 100644 index 00000000..38a4c277 --- /dev/null +++ b/greenwave/resultsdb_listener.py @@ -0,0 +1,120 @@ +# SPDX-License-Identifier: GPL-2.0+ +from greenwave.base_listener import BaseListener +from greenwave.product_versions import subject_product_version +from greenwave.subjects.factory import ( + create_subject_from_data, + UnknownSubjectDataError, +) + + +def _unpack_value(value): + """ + If value is list with single element, returns the element, otherwise + returns the value. + """ + if isinstance(value, list) and len(value) == 1: + value = value[0] + return value + + +def _get_brew_task_id(msg): + data = msg.get("data") + if not data: + return None + + task_id = _unpack_value(data.get("brew_task_id")) + try: + return int(task_id) + except (ValueError, TypeError): + return None + + +class ResultsDBListener(BaseListener): + monitor_labels = {"handler": "resultsdb"} + + def __init__(self): + super().__init__(uid_suffix="resultsdb") + self.topic = self.app.config["LISTENER_RESULTSDB_QUEUE"] + self.koji_base_url = self.app.config["KOJI_BASE_URL"] + + @staticmethod + def announcement_subject(msg): + """ + Returns pairs of (subject type, subject identifier) for announcement + consideration from the message. + + Args: + message (munch.Munch): A fedmsg about a new result. + """ + + try: + data = msg["data"] # New format + except KeyError: + data = msg["task"] # Old format + + unpacked = {k: _unpack_value(v) for k, v in data.items()} + + try: + subject = create_subject_from_data(unpacked) + except UnknownSubjectDataError: + return None + + # note: it is *intentional* that we do not handle old format + # compose-type messages, because it is impossible to reliably + # produce a decision from these. compose decisions can only be + # reliably made from new format messages, where we can rely on + # productmd.compose.id being available. See: + # https://pagure.io/greenwave/issue/122 + # https://pagure.io/taskotron/resultsdb/issue/92 + # https://pagure.io/taskotron/resultsdb/pull-request/101 + # https://pagure.io/greenwave/pull-request/262#comment-70350 + if subject.type == "compose" and "productmd.compose.id" not in data: + return None + + return subject + + def _consume_message(self, msg): + try: + testcase = msg["testcase"]["name"] + except KeyError: + testcase = msg["task"]["name"] + + try: + submit_time = msg["submit_time"] + except KeyError: + submit_time = msg["result"]["submit_time"] + + outcome = msg.get("outcome") + if outcome in self.app.config["OUTCOMES_INCOMPLETE"]: + self.app.logger.debug("Assuming no decision change on outcome %r", outcome) + return False + + brew_task_id = _get_brew_task_id(msg) + + subject = self.announcement_subject(msg) + if subject is None: + return False + + self.app.logger.debug("Considering subject: %r", subject) + + product_version = subject_product_version( + subject, + self.koji_base_url, + brew_task_id, + ) + + self.app.logger.debug("Guessed product version: %r", product_version) + + self._publish_decision_change( + submit_time=submit_time, + subject=subject, + testcase=testcase, + product_version=product_version, + publish_testcase=False, + ) + return True + + +listener = ResultsDBListener() +listener.listen() +app = listener.app diff --git a/greenwave/waiverdb_listener.py b/greenwave/waiverdb_listener.py new file mode 100644 index 00000000..37a94df1 --- /dev/null +++ b/greenwave/waiverdb_listener.py @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: GPL-2.0+ +from greenwave.base_listener import BaseListener +from greenwave.subjects.factory import create_subject + + +class WaiverDBListener(BaseListener): + monitor_labels = {"handler": "waiverdb"} + + def __init__(self): + super().__init__(uid_suffix="waiverdb") + self.topic = self.app.config["LISTENER_WAIVERDB_QUEUE"] + self.koji_base_url = self.app.config["KOJI_BASE_URL"] + + def _consume_message(self, msg): + product_version = msg["product_version"] + testcase = msg["testcase"] + subject = create_subject(msg["subject_type"], msg["subject_identifier"]) + submit_time = msg["timestamp"] + + self._publish_decision_change( + submit_time=submit_time, + subject=subject, + testcase=testcase, + product_version=product_version, + publish_testcase=True, + ) + return True + + +listener = WaiverDBListener() +listener.listen() +app = listener.app diff --git a/requirements.txt b/requirements.txt index 04b994e8..195f443b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ flask python-dateutil requests +stomp.py PyYAML dogpile.cache fedmsg[consumers]