Skip to content

Commit

Permalink
Merge 92fd4ef into b23ef50
Browse files Browse the repository at this point in the history
  • Loading branch information
martysp21 committed Dec 8, 2023
2 parents b23ef50 + 92fd4ef commit 7e619e8
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 2 deletions.
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ services:
deploy:
replicas: ${GREENWAVE_LISTENERS:-1}

jaeger:
image: jaegertracing/all-in-one
ports:
- 127.0.0.1:6831:6831/udp
- 127.0.0.1:6832:6832/udp
- 127.0.0.1:5778:5778
- 127.0.0.1:16686:16686
- 127.0.0.1:4317:4317
- 127.0.0.1:4318:4318
- 127.0.0.1:14250:14250
- 127.0.0.1:14268:14268
- 127.0.0.1:14269:14269
- 127.0.0.1:9411:9411
environment:
- COLLECTOR_ZIPKIN_HOST_PORT=:9411
- COLLECTOR_OTLP_ENABLED=true

networks:
default:
driver: bridge
4 changes: 4 additions & 0 deletions docker/greenwave-settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@
'handlers': ['console'],
},
}

OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = 'http://jaeger:4318/v1/traces'
OTEL_EXPORTER_SERVICE_NAME = "greenwave"

20 changes: 20 additions & 0 deletions greenwave/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
from greenwave.subjects.subject_type import load_subject_types

from dogpile.cache import make_region
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
import requests
from werkzeug.exceptions import default_exceptions

Expand All @@ -28,6 +34,8 @@ def create_app(config_obj=None):
if logging_config:
logging.config.dictConfig(logging_config)

init_tracing(app)

policies_dir = app.config['POLICIES_DIR']
log.debug("config: Loading policies from %r", policies_dir)
app.config['policies'] = load_policies(policies_dir)
Expand Down Expand Up @@ -68,3 +76,15 @@ def healthcheck():
Returns a 200 response if the application is alive and able to serve requests.
"""
return 'Health check OK', 200, [('Content-Type', 'text/plain')]


def init_tracing(app):
endpoint = app.config.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
service_name = app.config.get("OTEL_EXPORTER_SERVICE_NAME")
if not endpoint or not service_name:
return
provider = TracerProvider(resource=Resource.create({SERVICE_NAME: service_name}))
trace.set_tracer_provider(provider)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint)))

FlaskInstrumentor().instrument_app(app, tracer_provider=provider)
5 changes: 5 additions & 0 deletions greenwave/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ class Config(object):
"ca_certs": "/etc/pki/umb/umb-ca",
}


OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = None
OTEL_EXPORTER_SERVICE_NAME = "greenwave"


# Secure cookies
PERMANENT_SESSION_LIFETIME = 300
SESSION_COOKIE_NAME = "session"
Expand Down
6 changes: 6 additions & 0 deletions greenwave/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from greenwave.policies import applicable_decision_context_product_version_pairs
from greenwave.utils import right_before_this_time


from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -61,6 +64,7 @@ class Consumer:
hub_config_prefix = 'greenwave_consumer_'
default_topic = 'item.new'
monitor_labels = {'handler': 'greenwave_consumer'}
context = None

def __init__(self, hub, *args, **kwargs):
"""
Expand Down Expand Up @@ -92,6 +96,7 @@ def consume(self, message):
try:
message = message.get('body', message)
log.debug('Processing message "%s"', message)
self.context = TraceContextTextMapPropagator().extract(message)

with self.flask_app.app_context():
self._consume_message(message)
Expand All @@ -108,6 +113,7 @@ def _inc(self, messaging_counter):

def _publish_decision_update_fedora_messaging(self, decision):
try:
TraceContextTextMapPropagator().inject(decision, self.context)
msg = fedora_messaging.api.Message(
topic='greenwave.decision.update',
body=decision
Expand Down
3 changes: 3 additions & 0 deletions greenwave/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import datetime

from opentelemetry import trace
from werkzeug.exceptions import (
BadRequest,
NotFound,
Expand All @@ -21,6 +22,7 @@
from greenwave.waivers import waive_answers

log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)


class RuleContext:
Expand Down Expand Up @@ -174,6 +176,7 @@ def _decision_subjects_for_request(data):
yield create_subject(data['subject_type'], data['subject_identifier'])


@tracer.start_as_current_span("make_decision")
def make_decision(data, config):
if not data:
raise UnsupportedMediaType('No JSON payload in request')
Expand Down
14 changes: 13 additions & 1 deletion greenwave/listeners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid

import stomp
from opentelemetry.context import Context
from requests.exceptions import HTTPError

import greenwave.app_factory
Expand All @@ -25,6 +26,9 @@
from greenwave.policies import applicable_decision_context_product_version_pairs
from greenwave.utils import right_before_this_time

from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


GREENWAVE_LISTENER_PREFIX = "greenwave"


Expand Down Expand Up @@ -84,6 +88,8 @@ def __init__(self, uid_suffix, config_obj=None):

self.destination = self.app.config["LISTENER_DECISION_UPDATE_DESTINATION"]

self.context = None

def on_error(self, frame):
self.app.logger.warning("Received an error: %s", frame.body)

Expand Down Expand Up @@ -198,13 +204,15 @@ def _consume_message(self, message):
self._publish_decision_change() and returns True or just
returns False.
"""
raise NotImplementedError()
self.context: Context = TraceContextTextMapPropagator().extract(message)

def _inc(self, messaging_counter):
"""Helper method to increase monitoring counter."""
messaging_counter.labels(**self.monitor_labels).inc()

def _publish_decision_update(self, decision):
self.app.logger.debug(f"Trace Context: {self.context}")
TraceContextTextMapPropagator().inject(decision, self.context)
message = {"msg": decision, "topic": self.destination}
body = json.dumps(message)
while True:
Expand All @@ -216,7 +224,11 @@ def _publish_decision_update(self, decision):
"decision_context": decision["decision_context"],
"policies_satisfied": str(decision["policies_satisfied"]).lower(),
"summary": decision["summary"],

}
if decision.get("traceparent", None):
headers["traceparent"] = decision["traceparent"]

self.connection.send(
body=body, headers=headers, destination=self.destination
)
Expand Down
1 change: 1 addition & 0 deletions greenwave/listeners/resultsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def announcement_subject(msg):
return subject

def _consume_message(self, msg):
super()._consume_message(message=msg)
try:
testcase = msg["testcase"]["name"]
except KeyError:
Expand Down
1 change: 1 addition & 0 deletions greenwave/listeners/waiverdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def __init__(self, config_obj=None):
self.koji_base_url = self.app.config["KOJI_BASE_URL"]

def _consume_message(self, msg):
super()._consume_message(message=msg)
product_version = msg["product_version"]
testcase = msg["testcase"]
subject = create_subject(msg["subject_type"], msg["subject_identifier"])
Expand Down
3 changes: 3 additions & 0 deletions greenwave/request_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests

from json import dumps
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError, ConnectTimeout, RetryError
from urllib3.util.retry import Retry
Expand All @@ -14,6 +15,8 @@

log = logging.getLogger(__name__)

RequestsInstrumentor().instrument()


class ErrorResponse(requests.Response):
def __init__(self, status_code, error_message, url):
Expand Down
3 changes: 3 additions & 0 deletions greenwave/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
from defusedxml.xmlrpc import xmlrpc_client
from urllib.parse import urlparse
from flask import current_app
from opentelemetry import trace
from werkzeug.exceptions import BadGateway, NotFound

from greenwave.cache import cached
from greenwave.request_session import get_requests_session
from greenwave.xmlrpc_server_proxy import get_server_proxy

log = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)

requests_session = threading.local().requests_session = get_requests_session()

Expand Down Expand Up @@ -63,6 +65,7 @@ def __init__(self, ignore_ids: List[int], when: str, url: str):
else:
self.since = None

@tracer.start_as_current_span("retrieve")
def retrieve(self, *args, **kwargs):
items = self._retrieve_all(*args, **kwargs)
return [item for item in items if item['id'] not in self.ignore_ids]
Expand Down
41 changes: 41 additions & 0 deletions manual-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# How to manual test UMB messaging

In `greenwave/listeners/base.py` comment out lines 299 - 304. Remember to reactivate them
once you are finished.

```
# if _is_decision_unchanged(old_decision, decision):
# self.app.logger.debug(
# "Skipped emitting fedora message, decision did not change: %s", decision
# )
# self._inc(decision_unchanged_counter.labels(decision_context=decision_context))
# continue
```

Run podman/docker compose.

```
podman compose up -d
```

Send testing request to UMB so resultdb can acquire it and send it to greenwave.

```
python3 send_umb_message.py --host 127.0.0.1 --port 61612 --topic VirtualTopic.eng.resultsdb.result.new -p resultdb_json.json
```

Check the logs of UMB where you should see whole message that is being sent out.

```
podman compose logs umb
```

Get logs of resultsdb listener

```
podman compose logs resultsdb-listener | tail -n 20
```

For changig code there is no need of rebuilding the container as source code is mounted
into container. If you are experiencing old code rerun the containers
by `podman compose up -d`.
64 changes: 64 additions & 0 deletions manual-tests/resultdb_json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"data": {
"brew_task_id": [
"57212843"
],
"ci_docs": [
"https://docs.engineering.redhat.com/display/CVP/Container+Verification+Pipeline+E2E+Documentation"
],
"ci_email": [
"cvp@redhat.com"
],
"ci_name": [
"Container Verification Pipeline"
],
"ci_team": [
"CVP Development Team"
],
"ci_url": [
"https://jenkins-cvp-5c79a5a8d70cc51dd4c37835.apps.ocp-c1.prod.psi.redhat.com/"
],
"full_names": [
"registry-proxy.engineering.redhat.com/rh-osbs/openshift-ose-must-gather:v4.15.0-202311271051.p0.gc7f5e3f.assembly.stream"
],
"id": [
"sha256:a7fc01280c6b8173611c75a2cbd5a19f5d2ce42d9578d4efcc944e4bc80b09a0"
],
"issuer": [
"exd-ocp-buildvm-bot-prod"
],
"item": [
"avahi"
],
"log": [
"https://jenkins-cvp-5c79a5a8d70cc51dd4c37835.apps.ocp-c1.prod.psi.redhat.com/job/cvp-product-test/2555/console"
],
"msg_id": [
"ID:jenkins-2-8dcwr-46389-1700226798425-136563:1:1:1:1"
],
"scratch": [
"false"
],
"system_architecture": [
"x86_64"
],
"system_provider": [
"openshift"
],
"type": [
"redhat-module"
]
},
"groups": [],
"href": "https://resultsdb-api.engineering.redhat.com/api/v2.0/results/23659469",
"id": 23659469,
"note": "Result status PASSED",
"outcome": "PASSED",
"ref_url": "http://external-ci-coldstorage.datahub.redhat.com/cvp/cvp-product-test/ose-must-gather-container-v4.15.0-202311271051.p0.gc7f5e3f.assembly.stream/1e4142dc-18cf-409d-b401-57e555156ab6/",
"submit_time": "2023-11-27T11:42:24.538119",
"testcase": {
"href": "https://resultsdb-api.engineering.redhat.com/api/v2.0/testcases/cvp.rhproduct.default.source-container-compliance",
"name": "baseos-ci.redhat-module.tier0.functional",
"ref_url": null
}
}
Loading

0 comments on commit 7e619e8

Please sign in to comment.