Skip to content

Commit da0e013

Browse files
committed
make obj replicator locking more optimistic
Basically, do all hashing in the replicator without a lock, then lock briefly to rewrite the hashes file. Retry if someone else has modified the hashes file in the mean time (which should be rare). Also, a little refactoring. Change-Id: I6257a53808d14b567bde70d2d18a9c58cb1e415a
1 parent 82f1d55 commit da0e013

File tree

5 files changed

+117
-69
lines changed

5 files changed

+117
-69
lines changed

swift/obj/replicator.py

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -166,50 +166,59 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
166166

167167
hashed = 0
168168
hashes_file = join(partition_dir, HASH_FILE)
169-
with lock_path(partition_dir):
170-
modified = False
171-
hashes = {}
172-
try:
173-
with open(hashes_file, 'rb') as fp:
174-
hashes = pickle.load(fp)
175-
except Exception:
176-
do_listdir = True
177-
if do_listdir:
178-
hashes = dict(((suff, hashes.get(suff, None))
179-
for suff in os.listdir(partition_dir)
180-
if len(suff) == 3 and isdir(join(partition_dir, suff))))
169+
modified = False
170+
hashes = {}
171+
mtime = -1
172+
try:
173+
with open(hashes_file, 'rb') as fp:
174+
hashes = pickle.load(fp)
175+
mtime = os.path.getmtime(hashes_file)
176+
except Exception:
177+
do_listdir = True
178+
if do_listdir:
179+
for suff in os.listdir(partition_dir):
180+
if len(suff) == 3 and isdir(join(partition_dir, suff)):
181+
hashes.setdefault(suff, None)
182+
modified = True
183+
hashes.update((hash_, None) for hash_ in recalculate)
184+
for suffix, hash_ in hashes.items():
185+
if not hash_:
186+
suffix_dir = join(partition_dir, suffix)
187+
if isdir(suffix_dir):
188+
try:
189+
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
190+
hashed += 1
191+
except OSError:
192+
logging.exception(_('Error hashing suffix'))
193+
else:
194+
del hashes[suffix]
181195
modified = True
182-
for hash_ in recalculate:
183-
hashes[hash_] = None
184-
for suffix, hash_ in hashes.items():
185-
if not hash_:
186-
suffix_dir = join(partition_dir, suffix)
187-
if os.path.exists(suffix_dir):
188-
try:
189-
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
190-
hashed += 1
191-
except OSError:
192-
logging.exception(_('Error hashing suffix'))
193-
hashes[suffix] = None
194-
else:
195-
del hashes[suffix]
196-
modified = True
197-
sleep()
198-
if modified:
199-
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
196+
if modified:
197+
with lock_path(partition_dir):
198+
if not os.path.exists(hashes_file) or \
199+
os.path.getmtime(hashes_file) == mtime:
200+
write_pickle(
201+
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
202+
return hashed, hashes
203+
return get_hashes(partition_dir, recalculate, do_listdir,
204+
reclaim_age)
205+
else:
200206
return hashed, hashes
201207

202208

203-
def tpooled_get_hashes(*args, **kwargs):
209+
def tpool_reraise(func, *args, **kwargs):
204210
"""
205211
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
206-
We return the Timeout, Timeout if it's raised, the caller looks for it
207-
and reraises it if found.
208212
"""
209-
try:
210-
return get_hashes(*args, **kwargs)
211-
except Timeout, err:
212-
return err, err
213+
def inner():
214+
try:
215+
return func(*args, **kwargs)
216+
except BaseException, err:
217+
return err
218+
resp = tpool.execute(inner)
219+
if isinstance(resp, BaseException):
220+
raise resp
221+
return resp
213222

214223

215224
class ObjectReplicator(Daemon):
@@ -392,12 +401,9 @@ def update(self, job):
392401
self.logger.increment('partition.update.count.%s' % (job['device'],))
393402
begin = time.time()
394403
try:
395-
hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
404+
hashed, local_hash = tpool_reraise(get_hashes, job['path'],
396405
do_listdir=(self.replication_count % 10) == 0,
397406
reclaim_age=self.reclaim_age)
398-
# See tpooled_get_hashes "Hack".
399-
if isinstance(hashed, BaseException):
400-
raise hashed
401407
self.suffix_hash += hashed
402408
self.logger.update_stats('suffix.hashes', hashed)
403409
attempts_left = len(job['nodes'])
@@ -428,12 +434,9 @@ def update(self, job):
428434
local_hash[suffix] != remote_hash.get(suffix, -1)]
429435
if not suffixes:
430436
continue
431-
hashed, recalc_hash = tpool.execute(tpooled_get_hashes,
437+
hashed, recalc_hash = tpool_reraise(get_hashes,
432438
job['path'], recalculate=suffixes,
433439
reclaim_age=self.reclaim_age)
434-
# See tpooled_get_hashes "Hack".
435-
if isinstance(hashed, BaseException):
436-
raise hashed
437440
self.logger.update_stats('suffix.hashes', hashed)
438441
local_hash = recalc_hash
439442
suffixes = [suffix for suffix in local_hash if

swift/obj/server.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
check_float, check_utf8
4545
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
4646
DiskFileNotExist
47-
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \
48-
quarantine_renamer
47+
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
48+
quarantine_renamer, get_hashes
4949
from swift.common.http import is_success, HTTPInsufficientStorage, \
5050
HTTPClientDisconnect
5151

@@ -865,12 +865,7 @@ def REPLICATE(self, request):
865865
if not os.path.exists(path):
866866
mkdirs(path)
867867
suffixes = suffix.split('-') if suffix else []
868-
_junk, hashes = tpool.execute(tpooled_get_hashes, path,
869-
recalculate=suffixes)
870-
# See tpooled_get_hashes "Hack".
871-
if isinstance(hashes, BaseException):
872-
self.logger.increment('REPLICATE.errors')
873-
raise hashes
868+
_junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes)
874869
self.logger.timing_since('REPLICATE.timing', start_time)
875870
return Response(body=pickle.dumps(hashes))
876871

test/unit/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,25 @@ def __eq__(self, other):
235235

236236
def __ne__(self, other):
237237
return other is not True
238+
239+
240+
@contextmanager
241+
def mock(update):
242+
returns = []
243+
deletes = []
244+
for key, value in update.items():
245+
imports = key.split('.')
246+
attr = imports.pop(-1)
247+
module = __import__(imports[0], fromlist=imports[1:])
248+
for modname in imports[1:]:
249+
module = getattr(module, modname)
250+
if hasattr(module, attr):
251+
returns.append((module, attr, getattr(module, attr)))
252+
else:
253+
deletes.append((module, attr))
254+
setattr(module, attr, value)
255+
yield True
256+
for module, attr, value in returns:
257+
setattr(module, attr, value)
258+
for module, attr in deletes:
259+
delattr(module, attr)

test/unit/obj/test_replicator.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from contextlib import contextmanager
2828
from eventlet.green import subprocess
2929
from eventlet import Timeout, tpool
30-
from test.unit import FakeLogger
30+
from test.unit import FakeLogger, mock
3131
from swift.common import utils
3232
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
3333
from swift.common import ring
@@ -209,6 +209,41 @@ def test_get_hashes(self):
209209
self.assertEquals(hashed, 1)
210210
self.assert_('a83' in hashes)
211211

212+
def test_get_hashes_unmodified(self):
213+
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
214+
mkdirs(df.datadir)
215+
with open(os.path.join(df.datadir, normalize_timestamp(
216+
time.time()) + '.ts'), 'wb') as f:
217+
f.write('1234567890')
218+
part = os.path.join(self.objects, '0')
219+
hashed, hashes = object_replicator.get_hashes(part)
220+
i = [0]
221+
def getmtime(filename):
222+
i[0] += 1
223+
return 1
224+
with mock({'os.path.getmtime': getmtime}):
225+
hashed, hashes = object_replicator.get_hashes(
226+
part, recalculate=['a83'])
227+
self.assertEquals(i[0], 2)
228+
229+
def test_get_hashes_modified(self):
230+
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
231+
mkdirs(df.datadir)
232+
with open(os.path.join(df.datadir, normalize_timestamp(
233+
time.time()) + '.ts'), 'wb') as f:
234+
f.write('1234567890')
235+
part = os.path.join(self.objects, '0')
236+
hashed, hashes = object_replicator.get_hashes(part)
237+
i = [0]
238+
def getmtime(filename):
239+
if i[0] < 3:
240+
i[0] += 1
241+
return i[0]
242+
with mock({'os.path.getmtime': getmtime}):
243+
hashed, hashes = object_replicator.get_hashes(
244+
part, recalculate=['a83'])
245+
self.assertEquals(i[0], 3)
246+
212247
def test_hash_suffix_hash_dir_is_file_quarantine(self):
213248
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
214249
mkdirs(os.path.dirname(df.datadir))

test/unit/obj/test_server.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import cPickle as pickle
1919
import os
20-
import sys
21-
import shutil
2220
import unittest
2321
from shutil import rmtree
2422
from StringIO import StringIO
@@ -37,7 +35,6 @@
3735
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
3836
NullLogger, storage_directory
3937
from swift.common.exceptions import DiskFileNotExist
40-
from swift.obj import replicator
4138
from eventlet import tpool
4239

4340

@@ -2124,13 +2121,11 @@ def test_REPLICATE_works(self):
21242121
def fake_get_hashes(*args, **kwargs):
21252122
return 0, {1: 2}
21262123

2127-
def my_tpool_execute(*args, **kwargs):
2128-
func = args[0]
2129-
args = args[1:]
2124+
def my_tpool_execute(func, *args, **kwargs):
21302125
return func(*args, **kwargs)
21312126

2132-
was_get_hashes = replicator.get_hashes
2133-
replicator.get_hashes = fake_get_hashes
2127+
was_get_hashes = object_server.get_hashes
2128+
object_server.get_hashes = fake_get_hashes
21342129
was_tpool_exe = tpool.execute
21352130
tpool.execute = my_tpool_execute
21362131
try:
@@ -2143,20 +2138,18 @@ def my_tpool_execute(*args, **kwargs):
21432138
self.assertEquals(p_data, {1: 2})
21442139
finally:
21452140
tpool.execute = was_tpool_exe
2146-
replicator.get_hashes = was_get_hashes
2141+
object_server.get_hashes = was_get_hashes
21472142

21482143
def test_REPLICATE_timeout(self):
21492144

21502145
def fake_get_hashes(*args, **kwargs):
21512146
raise Timeout()
21522147

2153-
def my_tpool_execute(*args, **kwargs):
2154-
func = args[0]
2155-
args = args[1:]
2148+
def my_tpool_execute(func, *args, **kwargs):
21562149
return func(*args, **kwargs)
21572150

2158-
was_get_hashes = replicator.get_hashes
2159-
replicator.get_hashes = fake_get_hashes
2151+
was_get_hashes = object_server.get_hashes
2152+
object_server.get_hashes = fake_get_hashes
21602153
was_tpool_exe = tpool.execute
21612154
tpool.execute = my_tpool_execute
21622155
try:
@@ -2166,7 +2159,7 @@ def my_tpool_execute(*args, **kwargs):
21662159
self.assertRaises(Timeout, self.object_controller.REPLICATE, req)
21672160
finally:
21682161
tpool.execute = was_tpool_exe
2169-
replicator.get_hashes = was_get_hashes
2162+
object_server.get_hashes = was_get_hashes
21702163

21712164
if __name__ == '__main__':
21722165
unittest.main()

0 commit comments

Comments
 (0)