-
Notifications
You must be signed in to change notification settings - Fork 0
/
example.py
executable file
·137 lines (101 loc) · 4.11 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import sys, threading, time, logging
#################################################
## an example of a retryable decorator
## see README for details.
#################################################
# some exceptions
class ConnectionError(Exception):
pass
class SomeOtherError(Exception):
pass
# retryable decorator. it either expects a stoppable object or will look in the func parameters
def retryable(retry_frequency_seconds, retryable_exceptions, max_tries=0, stop_object=None):
def wrap(f):
def func(*args, **kwargs):
count = 0
retry_forever = max_tries <= 0
last_exception = None
should_stop = False
stoppable_object = stop_object or args[0]
# TODO: probably should verify stoppable_object is really stoppable.
while not should_stop:
try:
return f(*args, **kwargs)
except retryable_exceptions:
logging.exception("caught retryable exception")
count = count + 1
if not stoppable_object.stopped:
try:
logging.debug("sleeping %d seconds before retrying (attempt=%d)..." % (retry_frequency_seconds, count))
stoppable_object.stop_condition.acquire()
stoppable_object.stop_condition.wait(retry_frequency_seconds)
finally:
stoppable_object.stop_condition.release()
should_stop = stoppable_object.stopped or (count >= max_tries and not retry_forever)
raise last_exception
return func
return wrap
# object with a stop condition.
class Stoppable(object):
stopped = False
stop_condition = threading.Condition()
def stop(self):
logging.debug("Stop requested")
self.stopped = True
try:
self.stop_condition.acquire()
self.stop_condition.notify_all()
finally:
self.stop_condition.release()
# this would represent your core code.
class MainThread(threading.Thread, Stoppable):
i = 0
def __init__(self):
threading.Thread.__init__(self)
@retryable(1, (ConnectionError, SomeOtherError))
def get_job_from_database (self):
""" Pretend this represents retrieval of job data from a database. A ConnectionError or SomeOtherError
is a valid retryable exception. """
self.i = self.i + 1
logging.info("i=%d" % self.i)
if self.i < 3:
raise ConnectionError("I'm a connection exception - please retry on this.")
if self.i < 6:
raise SomeOtherError("I'm some other error that is ok to retry.")
elif self.i < 9:
return "job%d" % self.i
else:
raise Exception("this is not caught and will explode")
def run(self):
""" hello, I do stuff in a loop, like look for new jobs to run."""
try:
while (not self.stopped):
job = self.get_job_from_database()
if (job):
# TODO: now do something with that job. make more db operations, etc.
print job
try:
self.stop_condition.acquire()
self.stop_condition.wait(10) # sleep 10 seconds.
except: # can be interrupted legitimately.
pass
finally:
self.stop_condition.release()
except:
logging.exception("uh oh, there's an uncaught error. I better notify my maintainers...")
finally:
print "stopped"
def debug_start():
fmt = "%(asctime)-15s %(levelname)s [%(module)s] %(message)s"
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=fmt)
main = MainThread()
main.start()
return main
#start
main = debug_start()
try:
while main.is_alive():
time.sleep(60)
except KeyboardInterrupt:
main.stop()
main.join()