/
test_apm_plugin.py
169 lines (131 loc) · 5.44 KB
/
test_apm_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from unittest.mock import patch, ANY, Mock
import pytest
from elasticapm.traces import Span
from opentracing import SpanContext
from rele import Callback
from rele.contrib.apm_middleware import ELASTIC_APM_TRACE_PARENT
from rele.middleware import register_middleware
from tests.test_worker import sub_stub
@pytest.fixture()
def apm_middleware(config):
config.middleware = ['rele.contrib.APMMiddleware']
register_middleware(config)
@pytest.fixture
def start_active_span_mock():
with patch('rele.contrib.apm_middleware.Tracer.start_active_span') as mock:
yield mock
@pytest.fixture
def active_span_mock():
with patch('rele.contrib.apm_middleware.Tracer.active_span') as mock:
mock.return_value = Mock(spec=Span)
yield mock
@pytest.fixture
def extract_mock():
with patch('rele.contrib.apm_middleware.Tracer.extract') as mock:
yield mock
@pytest.fixture
def instrument_mock():
with patch('rele.contrib.apm_middleware.elasticapm.instrument') as mock:
yield mock
@pytest.fixture
def carrier_get_trace_parent_mock():
with patch('rele.contrib.apm_middleware.Carrier.get_trace_parent') as mock:
mock.return_value = '1234'
yield mock
@pytest.fixture
def message_with_trace_parent(message_wrapper):
message_wrapper.attributes[ELASTIC_APM_TRACE_PARENT] = "trace-example-foo"
return message_wrapper
@pytest.mark.usefixtures('instrument_mock', 'active_span_mock')
class TestAPMPlugin:
@pytest.mark.usefixtures(
'carrier_get_trace_parent_mock', 'apm_middleware')
def test_span_is_started_on_pre_publish(
self, publisher, start_active_span_mock):
message = {'foo': 'bar'}
topic = 'order-cancelled'
publisher.publish(topic=topic, data=message, myattr='hello')
start_active_span_mock.assert_called_with(
topic, finish_on_close=False)
@pytest.mark.usefixtures('apm_middleware')
def test_apm_tracer_id_is_injected_in_message_attributes_on_pre_publish(
self, publisher):
publisher.publish(
topic='order-cancelled',
data={'foo': 'bar'},
myattr='hello'
)
call_args = publisher._client.publish.call_args
assert ELASTIC_APM_TRACE_PARENT in call_args[1].keys()
@pytest.mark.usefixtures('apm_middleware', 'start_active_span_mock')
def test_message_is_published_even_when_apm_fails(
self, instrument_mock, publisher):
instrument_mock.side_effect = Exception('Something went wrong on APM')
publisher.publish(
topic='order-cancelled',
data={'foo': 'bar'},
myattr='hello'
)
assert publisher._client.publish.called_once()
def test_message_is_published_even_when_apm_fails_setting_up_apm(
self, publisher, config):
with patch('rele.contrib.apm_middleware.'
'Client.__init__') as client_mock:
client_mock.side_effect = Exception(
'Something went wrong initializing APM'
)
config.middleware = ['rele.contrib.APMMiddleware']
register_middleware(config)
publisher.publish(
topic='order-cancelled',
data={'foo': 'bar'},
myattr='hello'
)
assert publisher._client.publish.called_once()
@pytest.mark.parametrize('blocking', (False, True, ))
@pytest.mark.usefixtures('apm_middleware')
def test_span_is_finished_after_message_is_published(
self, blocking, publisher, active_span_mock):
publisher.publish(
topic='order-cancelled',
data={'foo': 'bar'},
blocking=blocking,
myattr='hello'
)
active_span_mock.finish.assert_called()
def test_message_is_published_when_apm_fails_finishing_active_span(
self, publisher, active_span_mock):
active_span_mock.finish.side_effect = Exception(
"APM failed finishing the active span"
)
publisher.publish(
topic='order-cancelled',
data={'foo': 'bar'},
myattr='hello'
)
assert publisher._client.publish.called_once()
@pytest.mark.usefixtures('apm_middleware', 'start_active_span_mock')
def test_instrument_is_started_before_processing_message(
self, message_with_trace_parent, instrument_mock):
callback = Callback(sub_stub)
callback(message_with_trace_parent)
instrument_mock.called_once()
@pytest.mark.usefixtures('apm_middleware', 'instrument_mock', 'start_active_span_mock')
def test_parent_trace_is_extracted_before_processing_message(
self, message_with_trace_parent, extract_mock):
callback = Callback(sub_stub)
callback(message_with_trace_parent)
assert ELASTIC_APM_TRACE_PARENT in extract_mock.call_args[0][1].keys()
@pytest.mark.usefixtures('apm_middleware', 'start_active_span_mock')
def test_span_as_child_of_parent_context_is_started_before_processing_message(
self, message_with_trace_parent, extract_mock,
start_active_span_mock):
parent_context_mock = Mock(spec=SpanContext)
extract_mock.return_value = parent_context_mock
callback = Callback(sub_stub)
callback(message_with_trace_parent)
start_active_span_mock.assert_called_with(
ANY,
child_of=parent_context_mock,
finish_on_close=False
)