Skip to content

Commit

Permalink
Merge 3cf14fa into a8cbf4c
Browse files Browse the repository at this point in the history
  • Loading branch information
manishtomar committed Aug 28, 2013
2 parents a8cbf4c + 3cf14fa commit 4871142
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions silverberg/lock.py
Expand Up @@ -73,7 +73,7 @@ class BasicLock(object):
"""

def __init__(self, client, lock_table, lock_id, ttl=300, max_retry=0,
retry_wait=10, reactor=None):
retry_wait=10, reactor=None, log=None):
self._client = client
self._lock_table = lock_table
self._lock_id = lock_id
Expand All @@ -84,6 +84,10 @@ def __init__(self, client, lock_table, lock_id, ttl=300, max_retry=0,
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor
if log:
self._log = log.bind(lock_id=self._lock_id, claim_id=self._claim_id)
else:
self._log = None

def _read_lock(self, ignored):
query = 'SELECT * FROM {cf} WHERE "lockId"=:lockId ORDER BY "claimId";'
Expand All @@ -96,6 +100,11 @@ def _verify_lock(self, response):
NoLockClaimsError(self._lock_table, self._lock_id)))

if response[0]['claimId'] == self._claim_id:
if self._log:
self._lock_acquired_seconds = self._reactor.seconds()
seconds = self._lock_acquired_seconds - self._acquire_start_seconds
self._log.msg('Acquired lock in {lock_acquire_time} seconds',
lock_acquire_time=seconds)
return defer.succeed(True)
else:
return self.release().addCallback(lambda _: defer.fail(
Expand Down Expand Up @@ -151,7 +160,14 @@ def release(self):
d = self._client.execute(query.format(cf=self._lock_table),
{'lockId': self._lock_id, 'claimId': self._claim_id},
ConsistencyLevel.QUORUM)
return d

def _log_release_time(result):
seconds = self._reactor.seconds() - self._lock_acquired_seconds
self._log.msg('Released lock. Was held for {lock_held_time} seconds',
lock_held_time=seconds)
return result

return self._log and d.addBoth(_log_release_time) or d

def acquire(self):
"""
Expand All @@ -163,6 +179,7 @@ def acquire(self):
retries = [0]

def acquire_lock():
self._acquire_start_seconds = self._reactor.seconds()
d = self._write_lock()
d.addCallback(self._read_lock)
d.addCallback(self._verify_lock)
Expand All @@ -175,6 +192,10 @@ def lock_not_acquired(failure):
if retries[0] <= self._max_retry:
return task.deferLater(self._reactor, self._retry_wait, acquire_lock)
else:
if self._log:
seconds = self._reactor.seconds() - self._acquire_start_seconds
self._log.msg('Could not acquire lock in {lock_acquire_fail_time} seconds',
lock_acquire_fail_time=seconds)
return failure

return acquire_lock()
Expand Down

0 comments on commit 4871142

Please sign in to comment.