forked from celery/celery
/
test_tyrant.py
109 lines (84 loc) · 2.94 KB
/
test_tyrant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import sys
import socket
import unittest2 as unittest
from celery.exceptions import ImproperlyConfigured
from celery import states
from celery.utils import gen_unique_id
from celery.backends import tyrant
from celery.backends.tyrant import TyrantBackend
_no_tyrant_msg = "* Tokyo Tyrant %s. Will not execute related tests."
_no_tyrant_msg_emitted = False
class SomeClass(object):
def __init__(self, data):
self.data = data
def get_tyrant_or_None():
def emit_no_tyrant_msg(reason):
global _no_tyrant_msg_emitted
if not _no_tyrant_msg_emitted:
sys.stderr.write("\n" + _no_tyrant_msg % reason + "\n")
_no_tyrant_msg_emitted = True
if tyrant.pytyrant is None:
return emit_no_tyrant_msg("not installed")
try:
tb = TyrantBackend()
try:
tb.open()
except socket.error:
return emit_no_tyrant_msg("not running")
return tb
except ImproperlyConfigured, exc:
if "need to install" in str(exc):
return emit_no_tyrant_msg("not installed")
return emit_no_tyrant_msg("not configured")
class TestTyrantBackend(unittest.TestCase):
def test_cached_connection(self):
tb = get_tyrant_or_None()
if not tb:
return # Skip test
self.assertIsNotNone(tb._connection)
tb.close()
self.assertIsNone(tb._connection)
tb.open()
self.assertIsNone(tb._connection)
def test_mark_as_done(self):
tb = get_tyrant_or_None()
if not tb:
return
tid = gen_unique_id()
self.assertFalse(tb.is_successful(tid))
self.assertEqual(tb.get_status(tid), states.PENDING)
self.assertIsNone(tb.get_result(tid), None)
tb.mark_as_done(tid, 42)
self.assertTrue(tb.is_successful(tid))
self.assertEqual(tb.get_status(tid), states.SUCCESS)
self.assertEqual(tb.get_result(tid), 42)
def test_is_pickled(self):
tb = get_tyrant_or_None()
if not tb:
return
tid2 = gen_unique_id()
result = {"foo": "baz", "bar": SomeClass(12345)}
tb.mark_as_done(tid2, result)
# is serialized properly.
rindb = tb.get_result(tid2)
self.assertEqual(rindb.get("foo"), "baz")
self.assertEqual(rindb.get("bar").data, 12345)
def test_mark_as_failure(self):
tb = get_tyrant_or_None()
if not tb:
return
tid3 = gen_unique_id()
try:
raise KeyError("foo")
except KeyError, exception:
pass
tb.mark_as_failure(tid3, exception)
self.assertFalse(tb.is_successful(tid3))
self.assertEqual(tb.get_status(tid3), states.FAILURE)
self.assertIsInstance(tb.get_result(tid3), KeyError)
def test_process_cleanup(self):
tb = get_tyrant_or_None()
if not tb:
return
tb.process_cleanup()
self.assertIsNone(tb._connection)