-
Notifications
You must be signed in to change notification settings - Fork 131
/
test_xmpp.py
215 lines (174 loc) · 7.88 KB
/
test_xmpp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from twisted.python import log
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from twisted.words.xish import domish
from vumi.tests.helpers import VumiTestCase
from vumi.tests.utils import LogCatcher
from vumi.transports.xmpp.xmpp import (
XMPPTransport, XMPPClient, XMPPTransportProtocol)
from vumi.transports.tests.helpers import TransportHelper
class DummyXMLStream(object):
def __init__(self):
self.outbox = []
def send(self, message):
self.outbox.append(message)
def addObserver(self, event, observerfn, *args, **kwargs):
"""Ignore."""
class DummyXMPPClient(XMPPClient):
def __init__(self, *args, **kw):
XMPPClient.__init__(self, *args, **kw)
self._connection = None
def startService(self):
pass
def stopService(self):
pass
class DummyXMPPTransportProtocol(XMPPTransportProtocol):
def __init__(self, *args, **kwargs):
XMPPTransportProtocol.__init__(self, *args, **kwargs)
self.xmlstream = DummyXMLStream()
class TestXMPPTransport(VumiTestCase):
@inlineCallbacks
def mk_transport(self):
self.tx_helper = self.add_helper(TransportHelper(XMPPTransport))
transport = yield self.tx_helper.get_transport({
'username': 'user@xmpp.domain.com',
'password': 'testing password',
'status': 'chat',
'status_message': 'XMPP Transport',
'host': 'xmpp.domain.com',
'port': 5222,
'transport_type': 'xmpp',
}, start=False)
transport._xmpp_protocol = DummyXMPPTransportProtocol
transport._xmpp_client = DummyXMPPClient
transport.ping_call.clock = Clock()
transport.presence_call.clock = Clock()
yield transport.startWorker()
yield transport.xmpp_protocol.connectionMade()
self.jid = transport.jid
returnValue(transport)
def assert_ack(self, ack, reply):
self.assertEqual(ack.payload['event_type'], 'ack')
self.assertEqual(ack.payload['user_message_id'], reply['message_id'])
self.assertEqual(ack.payload['sent_message_id'], reply['message_id'])
@inlineCallbacks
def test_outbound_message(self):
transport = yield self.mk_transport()
msg = yield self.tx_helper.make_dispatch_outbound(
"hi", to_addr='user@xmpp.domain.com', from_addr='test@case.com')
xmlstream = transport.xmpp_protocol.xmlstream
self.assertEqual(len(xmlstream.outbox), 1)
message = xmlstream.outbox[0]
self.assertEqual(message['to'], 'user@xmpp.domain.com')
self.assertTrue(message['id'])
self.assertEqual(str(message.children[0]), 'hi')
[ack] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assert_ack(ack, msg)
@inlineCallbacks
def test_inbound_message(self):
transport = yield self.mk_transport()
message = domish.Element((None, "message"))
message['to'] = self.jid.userhost()
message['from'] = 'test@case.com'
message.addUniqueId()
message.addElement((None, 'body'), content='hello world')
protocol = transport.xmpp_protocol
protocol.onMessage(message)
[msg] = yield self.tx_helper.wait_for_dispatched_inbound()
self.assertEqual(msg['to_addr'], self.jid.userhost())
self.assertEqual(msg['from_addr'], 'test@case.com')
self.assertEqual(msg['transport_name'], self.tx_helper.transport_name)
self.assertNotEqual(msg['message_id'], message['id'])
self.assertEqual(msg['transport_metadata']['xmpp_id'], message['id'])
self.assertEqual(msg['content'], 'hello world')
@inlineCallbacks
def test_message_without_id(self):
transport = yield self.mk_transport()
message = domish.Element((None, "message"))
message['to'] = self.jid.userhost()
message['from'] = 'test@case.com'
message.addElement((None, 'body'), content='hello world')
self.assertFalse(message.hasAttribute('id'))
protocol = transport.xmpp_protocol
protocol.onMessage(message)
[msg] = yield self.tx_helper.wait_for_dispatched_inbound()
self.assertTrue(msg['message_id'])
self.assertEqual(msg['transport_metadata']['xmpp_id'], None)
@inlineCallbacks
def test_pinger(self):
"""
The transport's pinger should send a ping after the ping_interval.
"""
transport = yield self.mk_transport()
self.assertEqual(transport.ping_interval, 60)
# The LoopingCall should be configured and started.
self.assertEqual(transport.ping_call.f, transport.send_ping)
self.assertEqual(transport.ping_call.a, ())
self.assertEqual(transport.ping_call.kw, {})
self.assertEqual(transport.ping_call.interval, 60)
self.assertTrue(transport.ping_call.running)
# Stub output stream
xmlstream = DummyXMLStream()
transport.xmpp_client.xmlstream = xmlstream
transport.pinger.xmlstream = xmlstream
# Ping
transport.ping_call.clock.advance(59)
self.assertEqual(xmlstream.outbox, [])
transport.ping_call.clock.advance(2)
self.assertEqual(len(xmlstream.outbox), 1, repr(xmlstream.outbox))
[message] = xmlstream.outbox
self.assertEqual(message['to'], u'user@xmpp.domain.com')
self.assertEqual(message['type'], u'get')
[child] = message.children
self.assertEqual(child.toXml(), u"<ping xmlns='urn:xmpp:ping'/>")
@inlineCallbacks
def test_presence(self):
"""
The transport's presence should be announced regularly.
"""
transport = yield self.mk_transport()
self.assertEqual(transport.presence_interval, 60)
# The LoopingCall should be configured and started.
self.assertFalse(transport.presence_call.running)
# Stub output stream
xmlstream = DummyXMLStream()
transport.xmpp_client.xmlstream = xmlstream
transport.xmpp_client._initialized = True
transport.presence.xmlstream = xmlstream
self.assertEqual(xmlstream.outbox, [])
transport.presence.connectionInitialized()
transport.presence_call.clock.advance(1)
self.assertEqual(len(xmlstream.outbox), 1, repr(xmlstream.outbox))
self.assertEqual(transport.presence_call.f, transport.send_presence)
self.assertEqual(transport.presence_call.a, ())
self.assertEqual(transport.presence_call.kw, {})
self.assertEqual(transport.presence_call.interval, 60)
self.assertTrue(transport.presence_call.running)
[presence] = xmlstream.outbox
self.assertEqual(
presence.toXml(),
u"<presence><status>chat</status></presence>")
@inlineCallbacks
def test_normalizing_from_addr(self):
transport = yield self.mk_transport()
message = domish.Element((None, "message"))
message['to'] = self.jid.userhost()
message['from'] = 'test@case.com/some_xmpp_id'
message.addUniqueId()
message.addElement((None, 'body'), content='hello world')
protocol = transport.xmpp_protocol
protocol.onMessage(message)
[msg] = yield self.tx_helper.wait_for_dispatched_inbound()
self.assertEqual(msg['from_addr'], 'test@case.com')
self.assertEqual(msg['transport_metadata']['xmpp_id'], message['id'])
@inlineCallbacks
def test_xmpp_connection_lost(self):
'''When the XMPP connection is lost, the connection callback should
be called, which should log that the connection was lost.'''
transport = yield self.mk_transport()
with LogCatcher() as lc:
yield transport.xmpp_protocol.connectionLost('Test connection')
[connection_lost_log] = lc.logs
self.assertEqual(
log.textFromEventDict(connection_lost_log),
'XMPP Connection lost. Test connection')