This repository has been archived by the owner on Jun 12, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
test_message_store.py
117 lines (92 loc) · 4.12 KB
/
test_message_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
from twisted.internet.defer import inlineCallbacks
from vumi.application.tests.test_sandbox import (
ResourceTestCaseBase, DummyAppWorker)
from go.apps.jsbox.message_store import MessageStoreResource
from go.vumitools.tests.helpers import GoMessageHelper, VumiApiHelper
class StubbedAppWorker(DummyAppWorker):
def __init__(self):
super(StubbedAppWorker, self).__init__()
self.user_api = None
def user_api_for_api(self, api):
return self.user_api
class TestMessageStoreResource(ResourceTestCaseBase):
app_worker_cls = StubbedAppWorker
resource_cls = MessageStoreResource
@inlineCallbacks
def setUp(self):
super(TestMessageStoreResource, self).setUp()
self.vumi_helper = yield self.add_helper(VumiApiHelper())
self.msg_helper = self.add_helper(GoMessageHelper())
self.user_helper = yield self.vumi_helper.make_user(u"user")
self.app_worker.user_api = self.user_helper.user_api
self.message_store = self.vumi_helper.get_vumi_api().mdb
self.conversation = yield self.user_helper.create_conversation(
u'jsbox', started=True)
# store inbound
yield self.message_store.add_inbound_message(
self.msg_helper.make_inbound('hello'),
batch_id=self.conversation.batch.key)
# store outbound
outbound_msg = self.msg_helper.make_outbound('hello')
yield self.message_store.add_outbound_message(
outbound_msg, batch_id=self.conversation.batch.key)
# ack outbound
event = self.msg_helper.make_ack(outbound_msg)
yield self.message_store.add_event(event)
# monkey patch for when no conversation_key is provided
self.app_worker.conversation_for_api = lambda *a: self.conversation
yield self.create_resource({})
@inlineCallbacks
def test_handle_progress_status(self):
reply = yield self.dispatch_command('progress_status')
self.assertTrue(reply['success'])
self.assertEqual(reply['progress_status'], {
'ack': 1,
'delivery_report': 0,
'delivery_report_delivered': 0,
'delivery_report_failed': 0,
'delivery_report_pending': 0,
'nack': 0,
'sent': 1,
})
@inlineCallbacks
def test_handle_count_replies(self):
reply = yield self.dispatch_command('count_replies')
self.assertTrue(reply['success'])
self.assertEqual(reply['count'], 1)
@inlineCallbacks
def test_handle_count_sent_messages(self):
reply = yield self.dispatch_command('count_sent_messages')
self.assertTrue(reply['success'])
self.assertEqual(reply['count'], 1)
@inlineCallbacks
def test_handle_count_inbound_uniques(self):
reply = yield self.dispatch_command('count_inbound_uniques')
self.assertTrue(reply['success'])
# TODO fix once we support uniques properly again
self.assertEqual(reply['count'], 0)
@inlineCallbacks
def test_handle_count_outbound_uniques(self):
reply = yield self.dispatch_command('count_outbound_uniques')
self.assertTrue(reply['success'])
# TODO fix once we support uniques properly again
self.assertEqual(reply['count'], 0)
@inlineCallbacks
def test_handle_inbound_throughput(self):
reply = yield self.dispatch_command('inbound_throughput',
sample_time=60)
self.assertTrue(reply['success'])
self.assertEqual(reply['throughput'], 1)
@inlineCallbacks
def test_handle_outbound_throughput(self):
reply = yield self.dispatch_command('outbound_throughput',
sample_time=60)
self.assertTrue(reply['success'])
self.assertEqual(reply['throughput'], 1)
@inlineCallbacks
def test_invalid_conversation_key(self):
reply = yield self.dispatch_command('progress_status',
conversation_key='foo')
self.assertFalse(reply['success'])
self.assertEqual(reply['reason'], 'Invalid conversation_key')