forked from bsharper/sbremote
-
Notifications
You must be signed in to change notification settings - Fork 0
/
singleton.py
147 lines (120 loc) · 4.87 KB
/
singleton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#! /usr/bin/env python
import logging
from multiprocessing import Process
import os
import sys
import tempfile
import unittest
if sys.platform != "win32":
import fcntl
class SingleInstanceException(BaseException):
pass
class SingleInstance(object):
"""Class that can be instantiated only once per machine.
If you want to prevent your script from running in parallel just instantiate SingleInstance() class. If is there another instance already running it will throw a `SingleInstanceException`.
>>> import tendo
... me = SingleInstance()
This option is very useful if you have scripts executed by crontab at small amounts of time.
Remember that this works by creating a lock file with a filename based on the full path to the script file.
Providing a flavor_id will augment the filename with the provided flavor_id, allowing you to create multiple singleton instances from the same file. This is particularly useful if you want specific functions to have their own singleton instances.
"""
def __init__(self, flavor_id="", lockfile=""):
self.initialized = False
if lockfile:
self.lockfile = lockfile
else:
basename = os.path.splitext(os.path.abspath(sys.argv[0]))[0].replace(
"/", "-").replace(":", "").replace("\\", "-") + '-%s' % flavor_id + '.lock'
self.lockfile = os.path.normpath(
tempfile.gettempdir() + '/' + basename)
logger.debug("SingleInstance lockfile: " + self.lockfile)
if sys.platform == 'win32':
try:
# file already exists, we try to remove (in case previous
# execution was interrupted)
if os.path.exists(self.lockfile):
os.unlink(self.lockfile)
self.fd = os.open(
self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except OSError:
type, e, tb = sys.exc_info()
if e.errno == 13:
logger.error(
"Another instance is already running, quitting.")
raise SingleInstanceException()
print(e.errno)
raise
else: # non Windows
self.fp = open(self.lockfile, 'w')
self.fp.flush()
try:
fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
logger.warning(
"Another instance is already running, quitting.")
raise SingleInstanceException()
self.initialized = True
def __del__(self):
if not self.initialized:
return
try:
if sys.platform == 'win32':
if hasattr(self, 'fd'):
os.close(self.fd)
os.unlink(self.lockfile)
else:
fcntl.lockf(self.fp, fcntl.LOCK_UN)
# os.close(self.fp)
if os.path.isfile(self.lockfile):
os.unlink(self.lockfile)
except Exception as e:
if logger:
logger.warning(e)
else:
print("Unloggable error: %s" % e)
sys.exit(-1)
def f(name):
tmp = logger.level
logger.setLevel(logging.CRITICAL) # we do not want to see the warning
try:
me2 = SingleInstance(flavor_id=name) # noqa
except SingleInstanceException:
sys.exit(-1)
logger.setLevel(tmp)
pass
class testSingleton(unittest.TestCase):
def test_1(self):
me = SingleInstance(flavor_id="test-1")
del me # now the lock should be removed
assert True
def test_2(self):
p = Process(target=f, args=("test-2",))
p.start()
p.join()
# the called function should succeed
assert p.exitcode == 0, "%s != 0" % p.exitcode
def test_3(self):
me = SingleInstance(flavor_id="test-3") # noqa -- me should still kept
p = Process(target=f, args=("test-3",))
p.start()
p.join()
# the called function should fail because we already have another
# instance running
assert p.exitcode != 0, "%s != 0 (2nd execution)" % p.exitcode
# note, we return -1 but this translates to 255 meanwhile we'll
# consider that anything different from 0 is good
p = Process(target=f, args=("test-3",))
p.start()
p.join()
# the called function should fail because we already have another
# instance running
assert p.exitcode != 0, "%s != 0 (3rd execution)" % p.exitcode
def test_4(self):
lockfile = '/tmp/foo.lock'
me = SingleInstance(lockfile=lockfile)
assert me.lockfile == lockfile
logger = logging.getLogger("tendo.singleton")
if __name__ == "__main__":
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
unittest.main()