-
Notifications
You must be signed in to change notification settings - Fork 0
/
FsQueue.py
171 lines (134 loc) · 3.86 KB
/
FsQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
""" FsQueue
Elastic queue is based on filesystem.
It's like almost same as Python's Queue.Queue.
"""
__author__ = 'FUKUDA Masahiro'
__author_email__ = 'masahiro@fukuda.org'
__version__ = '1.2'
__all__ = ['Empty', 'Full', 'Queue']
import tempfile, os
import pickle
from time import sleep
import Queue
import time
try:
import threading
except ImportError:
import dummy_threading as threading
Empty = Queue.Empty
Full = Queue.Full
_count_mutex = threading.Lock()
_count = 0
def _counter():
global _count_mutex, _count
try:
_count_mutex.acquire()
return _count
finally:
_count += 1
_count_mutex.release()
class Queue(object):
"""Create a queue object.
"""
def __init__(self, maxsize=0, dir='_tmp_fsqueue', init=False):
self.base_dir = dir
self.queue_dir = os.path.join(dir, 'queue')
self.tmp_dir = os.path.join(dir, 'tmp')
self.count_dir = os.path.join(dir, 'count')
if init:
self.init_queue()
try:
os.mkdir(dir, 0700)
except OSError:
pass
try:
os.mkdir(self.queue_dir, 0700)
except OSError:
pass
try:
os.mkdir(self.tmp_dir, 0700)
except OSError:
pass
try:
os.mkdir(self.count_dir, 0700)
except OSError:
pass
def task_done(self):
for f in os.listdir(self.count_dir):
try:
os.rmdir(os.path.join(self.count_dir, f))
return
except OSError:
pass
raise ValueError('task_done() called too many times')
def join(self):
while(not self.empty()):
#print "wait queue become empty."
sleep(0.1)
def qsize(self):
return len(os.listdir(self.queue_dir))
def empty(self):
return self.qsize() == 0
def full(self):
# full() is not supported.
# always return "False"
return False
def put(self, item, block=True, timeout=None):
tmpf = tempfile.mkstemp('', '', self.tmp_dir)
tmpfd = os.fdopen(tmpf[0], "w")
pickle.dump(item, tmpfd, pickle.HIGHEST_PROTOCOL)
tmpfd.close()
basename = os.path.basename(tmpf[1])
os.rename(tmpf[1],
os.path.join(self.queue_dir,
"%s.%s.%s" % (os.stat(tmpf[1]).st_mtime, _counter(), basename)))
os.mkdir(os.path.join(self.count_dir, basename), 0700)
def put_nowait(self, item):
return self.put(item, False)
def get_nowait(self):
try:
queues = os.listdir(self.queue_dir)
except OSError:
raise Empty
queues.sort()
tmp = None
for q in queues:
src = os.path.join(self.queue_dir, q)
dst = os.path.join(self.tmp_dir, q)
try:
os.rename(src, dst)
tmp = dst
break
except OSError:
continue
if tmp is None:
raise Empty
f = open(tmp, 'r')
data = pickle.load(f)
f.close()
os.unlink(tmp)
return data
def get(self, block=True, timeout=None):
endtime = time.time() + timeout if timeout else None
while(True):
try:
return self.get_nowait()
except Empty:
time.sleep(0.01)
if endtime and endtime - time.time() <= 0.0:
raise Empty
def init_queue(self):
import shutil
shutil.rmtree(self.base_dir)
if __name__ == "__main__":
q = Queue()
for a in xrange(10):
q.put('ikatako' * a)
while(True):
try:
print q.get_nowait()
except Empty:
break
print "wait"
q.join()
print "end"