forked from fedora-infra/fedmsg
/
test_consumers.py
138 lines (109 loc) · 5.36 KB
/
test_consumers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# -*- coding: utf-8 -*-
#
# This file is part of fedmsg.
# Copyright (C) 2017 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Jeremy Cline <jcline@redhat.com>
"""Tests for the :mod:`fedmsg.consumers` module."""
import json
import os
import unittest
from moksha.hub.zeromq.zeromq import ZMQMessage
import mock
from fedmsg import crypto
from fedmsg.consumers import FedmsgConsumer
from fedmsg.tests.crypto.test_x509 import SSLDIR
FIXTURES_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), '../fixtures/'))
class DummyConsumer(FedmsgConsumer):
"""Set attributes necessary to instantiate a consumer."""
config_key = 'dummy'
validate_signatures = True
class FedmsgConsumerReplayTests(unittest.TestCase):
"""Tests for the replay functionality of fedmsg consumers method."""
def setUp(self):
self.config = {
'dummy': True,
'ssldir': SSLDIR,
'ca_cert_cache': os.path.join(SSLDIR, 'fedora_ca.crt'),
'ca_cert_cache_expiry': 1497618475, # Stop fedmsg overwriting my CA, See Issue 420
'crypto_validate_backends': ['x509'],
}
self.hub = mock.Mock(config=self.config)
self.consumer = DummyConsumer(self.hub)
def test_backlog_message_validation(self):
with open(os.path.join(FIXTURES_DIR, 'sample_datanommer_response.json')) as fd:
replay_messages = json.load(fd)
self.consumer.get_datagrepper_results = mock.Mock(
return_value=replay_messages['raw_messages'])
last_message = json.dumps({'message': {'body': {'msg_id': 'myid', 'timestamp': 0}}})
# This places all the messages from a call to "get_datagrepper_results" in the
# "incoming" queue.Queue
self.consumer._backlog(last_message)
while not self.consumer.incoming.empty():
self.consumer.validate(self.consumer.incoming.get())
class FedmsgConsumerValidateTests(unittest.TestCase):
"""Tests for the :meth:`FedmsgConsumer.validate` method."""
def setUp(self):
self.config = {
'dummy': True,
'ssldir': SSLDIR,
'certname': 'shell-app01.phx2.fedoraproject.org',
'ca_cert_cache': os.path.join(SSLDIR, 'ca.crt'),
'ca_cert_cache_expiry': 1497618475, # Stop fedmsg overwriting my CA, See Issue 420
'crl_location': "http://threebean.org/fedmsg-tests/crl.pem",
'crl_cache': os.path.join(SSLDIR, 'crl.pem'),
'crl_cache_expiry': 1497618475,
'crypto_validate_backends': ['x509'],
}
self.hub = mock.Mock(config=self.config)
self.consumer = DummyConsumer(self.hub)
def test_topic_mismatch(self):
"""Assert a RuntimeWarning is raised for topic mismatches."""
message = {'topic': 't1', 'body': {'topic': 't2'}}
self.assertRaises(RuntimeWarning, self.consumer.validate, message)
def test_valid_signature(self):
"""Assert the API accepts and validates dictionary messages."""
message = {'topic': 't1', 'body': crypto.sign({'topic': 't1'}, **self.config)}
self.consumer.validate(message)
def test_invalid_signature(self):
"""Assert a RuntimeWarning is raised for topic mismatches."""
message = {'topic': 't1', 'body': crypto.sign({'topic': 't1'}, **self.config)}
message['body']['signature'] = 'thisisnotmysignature'
self.assertRaises(RuntimeWarning, self.consumer.validate, message)
def test_no_topic_in_body(self):
"""Assert an empty topic is placed in the message if the key is missing."""
self.consumer.validate_signatures = False
message = {'body': {'some': 'stuff'}}
self.consumer.validate(message)
self.assertEqual({'body': {'topic': None, 'msg': {'some': 'stuff'}}}, message)
@mock.patch('fedmsg.consumers.fedmsg.crypto.validate')
def test_zmqmessage_text_body(self, mock_crypto_validate):
self.consumer.validate_signatures = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', u'{"some": "stuff"}')
self.consumer.validate(message)
mock_crypto_validate.assert_called_once_with({'topic': u't1', 'msg': {'some': 'stuff'}})
@mock.patch('fedmsg.consumers.warnings.warn')
@mock.patch('fedmsg.consumers.fedmsg.crypto.validate')
def test_zmqmessage_binary_body(self, mock_crypto_validate, mock_warn):
self.consumer.validate_signatures = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', b'{"some": "stuff"}')
self.consumer.validate(message)
mock_crypto_validate.assert_called_once_with({'topic': u't1', 'msg': {'some': 'stuff'}})
mock_warn.assert_any_call('Message body is not unicode', DeprecationWarning)