New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-8069 Clustered Lock first implementation #5513
Conversation
@belaban your review is more than welcome ! 😄 |
nextRequestor = pendingRequests.poll(); | ||
|
||
final LockRequestHolder requestor = nextRequestor; | ||
clusteredLockManager.execute(() -> lock(requestor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor can be remove once https://issues.jboss.org/browse/ISPN-8336 is fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here with tryLock() with timer.
when a lock is released, the timer can be triggered and complete the future while the lock-request is in the network or in progress.
The lock is be blocked by this node in the cache and it can only be released from a unlock invocation of this node.
API suggestion
basic functional tests:
other functional tests
Partition tests
Concurrency and stress test
|
@pruivo concerning partition tests, I'm using a replicated cache. Are they still necessary in this case (beside the test that if a node leaves and the lock was acquired by this node, then it has to be released) |
* @param name, the name of the lock | ||
* @return {@code true} if the lock has been released | ||
*/ | ||
CompletableFuture<Boolean> release(String name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call this break
, to make sure people don't use it as unlock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break is a reserved keyword
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed to forceRelease
@karesti yes they are. If you have CP enabled (assuming you allow configure the lock as AP or CP), it would be nice to test if the partition doesn't release the lock (otherwise you will violate the consistency :)) edit: if the cache is replicated, only the second point is needed |
ac8365f
to
fc64234
Compare
@rvansa which was the alternative to use the randomUUID ? Another thing that we should consider is releasing "lock" calls after an amount of time, for example, 10 minutes. So the lock won't starve forever. In case of split-brain, all the "lock" requests should be released too (or auto-release them with the scheduler after 5 minutes waiting). |
@wburns I asked for a review, if you have time, of course 💃 |
Tested in vert-x3/vertx-infinispan#39 |
@karesti Just use ThreadLocalRandom.current().nextLong() to generate 2 random longs and then |
Use Util.threadLocalRandomUUID() |
* @param name, the name of the lock | ||
* @return {@code true} if the lock has been released | ||
*/ | ||
CompletableFuture<Boolean> release(String name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break is a reserved keyword
* @see <a href="http://infinispan.org/documentation/">Infinispan documentation</a> | ||
* @since 9.2 | ||
*/ | ||
public interface ClusteredLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to see this in commons, with potentially an EmbeddedClusteredLock interface in the lock module.
* @since 9.2 | ||
*/ | ||
@Experimental | ||
public interface ClusteredLockManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to commons, with an EmbeddedClusteredLockManager interface here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will move to commons those interfaces !
* @since 9.2 | ||
*/ | ||
@Experimental | ||
public class ClusteredLockConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like configuration can be common to both embedded and remote implementations
return; | ||
} | ||
|
||
if (request == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug: should result == null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed ! I could not reproduce this case in tests haha thank you pointing this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another concurrency issue with the tryLock()
with timer.
This test fails:
public void testTryLockWithTimeout11() throws Throwable {
assertTrue(await(lock.tryLock()));
CompletableFuture<Boolean> tryLock = lock.tryLock(1, TimeUnit.NANOSECONDS);
System.out.println("try-lock result=" + await(tryLock));
System.out.println("is-locked result=" + await(lock.isLocked()));
System.out.println("is-locked-by-me result=" + await(lock.isLockedByMe()));
assertTrue(await(tryLock));
assertTrue(await(lock.isLocked()));
assertTrue(await(lock.isLockedByMe()));
}
the tryLock()
reports false but the lock is acquired.
try-lock result=false
is-locked result=true
is-locked-by-me result=true
@Override | ||
protected void handle(Boolean result) { | ||
if (!wait) { | ||
log.trace("Wait is " + wait + "on request" + this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait
is always false
.
Also, start using log.tracef()
when you have arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
// The answer has to be returned without holding the CompletableFuture | ||
request.complete(result); | ||
} else if (result) { | ||
if (request.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you still have concurrency issues here. the schedule can be triggered between request.isDone()
and request.complete(true)
my suggestion
request.complete(true)
if (request.getNow(false)) unlock()
the CompletableFuture
only register the first complete()
. getNow()
only returns false
if the schedule arrived first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pruivo I did another thing, your comment's are isolated in a commit
return; | ||
} | ||
|
||
if (request.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. the schedule executor can complete the request but the unlock never happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to check the status at the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and unlock if needed
|
||
@Override | ||
public CompletableFuture<Boolean> tryLock(long time, TimeUnit unit) { | ||
log.trace("tryLock with timeout (" + time + "," + unit + ") called from " + originator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't invoke tryLock()
if time <= 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, indeed.
|
||
await(lock0.lock() | ||
.thenRun(() -> { | ||
assertTrue("we locked with node A", true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird assertTrue(true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a way to document, but I can change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do something smarter
import org.testng.annotations.Test; | ||
|
||
@Test(groups = "functional", testName = "clusteredLock.ClusteredLockSplitBrainTest") | ||
public class ClusteredLockSplitBrainTest extends BasePartitionHandlingTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are missing tests with splitting the cluster with lock acquired.
also, it would be nice to test the tryLock()
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to go to more tests without a check on this one first.
|
||
@Override | ||
public void writeObject(ObjectOutput output, ClusteredLockValue object) throws IOException { | ||
output.writeObject(object.requestId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MarshallUtil.marshallString(object.requestId, output);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
public boolean defineLock(String name, ClusteredLockConfiguration configuration) { | ||
// TODO: Configuration is not used because we don't support any other mode for now. For that : ISPN-8413 | ||
CacheHolder cacheHolder = extractCacheHolder(cacheHolderFuture); | ||
cache = cacheHolder.getClusteredLockCache(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can throw NullPointerException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the cacheHolder is null you mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'm throwing an exception if the caches could not be started instead of returning null
|
||
@Override | ||
public void writeObject(ObjectOutput output, LockFunction object) throws IOException { | ||
output.writeObject(object.requestId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MarshallUtil.marshallString(object.requestId, output);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@pruivo Concerning the There is another concurrency issue with the tryLock() with timer. This test fails: public void testTryLockWithTimeout11() throws Throwable { try-lock result=false This is working properly. The lock is not reentrant. The lock is locked by the first call, but the second tryLock fails because it is already acquired |
@karesti there is only one edit, ah the first was commented. copy-paste error. The test still fails :) |
204c58a
to
3a11488
Compare
@danberindei yes, I know, split brain fails in jenkins. I have to check tests and code ... but I'm in another fight before that !! |
<dependency> | ||
<groupId>org.infinispan</groupId> | ||
<artifactId>infinispan-clustered-lock</artifactId> | ||
<version>...</version> <!-- 9.2.0.Final or higher --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tristantarrant Maybe we should have a placeholder like {infinispanversion}
but with the full version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. @karesti please add a <fullinfinispanversion>${project.version}</fullinfinispanversion>
to documentation/pom.xml
in the <attributes>
section so that we can substitute it correctly when docs are generated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I don't want this value to be replaced each time. This is something from 9.2.0.Final. I did the same thing as in counters where it is said
I will do this in a separate PR you want
`ClusteredLockManager.get("lockName")` is called. | ||
|
||
This means that each time we get a `ClusteredLock` instance with the `ClusteredCacheManager`, this instance will be | ||
a new instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm commenting so late, but I find the change in the ClusteredLockManager.get(name)
behavior confusing.
Personally I like the vert.x API, where acquiring the lock gives you a Lock
instance, and you can only release it through that Lock
instance. Being able to release a lock that some other application component has acquired without any communication with that component sounds wrong to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry, input is always welcome Dan ! So as I understand, the ownership level "NODE" is not ok and we should support "INSTANCE" level locking (rentrant or not rentrant) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About this point, @rvansa considered that we might only need "object" level (at least for now). We discussed that briefly during the F2F
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, I'm not enunciating rules here, I'm just thinking aloud :) But I think even with the vert.x API, node-ownership is still be useful if the lock is reentrant: that way multiple logical threads on the same node can acquire the lock concurrently, and only the final release actually allows another node to acquire it.
Thinking about it some more, maybe we should allow the application to use its own lock owner, as in clusteredLock.lock(lockOwner)
or clusteredLock.withOwner(lockOwner).lock()
(to match the not-yet-public API in Cache). That way, different application components that deal with a single request could call get(name).lock(requestId)
without the need to explicitly pass a lock instance between them. Regular lock()
would use the local node as the lock owner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karesti It's you who came with the notion of node-level locks; I haven't seen that in Vert.X but the fact that the lock is not tied to a thread. To me a node-level ownership with non-reentrancy seems the same as instance-level.
@danberindei Sounds good, but the owner should be provided when the lock is retrieved from manager rather than after that (to enforce const owner by api).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rvansa it's not accurate to say that was me who came to that idea of ownership or reentrancy levels, but this is not the discussion here.
I think there is a misunderstanding here and probably is due to something I did not fully get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danberindei concerning the "with owner" this is something I was considering too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for withOwner
|
||
The `ClusteredLockManager` interface, *marked as experimental*, is the entry point to define, retrieve and remove a lock. | ||
It automatically listen to the creation of `EmbeddedCacheManager` and proceeds with the registration of an | ||
instance of it per `EmbeddedCacheManager`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the interface really listen to events? And should the user care if the lock manager is created at the same time as the cache manager, or only on the first call to EmbeddedClusteredLockManagerFactory.from(EmbeddedCacheManager)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very clear, indeed, I have to rework this part.
|
||
private static CompletableFuture<CacheHolder> startCaches(EmbeddedCacheManager cacheManager) { | ||
final CompletableFuture<CacheHolder> future = new CompletableFuture<>(); | ||
new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please set a thread name that includes the node name for easier debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
d3a96d3
to
9123700
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty good job here. A few minor changes are needed
public interface ClusteredLock { | ||
|
||
/** | ||
* Acquires the lock. If the lock is not available then the {@link CompletableFuture} waits til the lock has been acquired. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/til/until/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
/** | ||
* If the lock is available this method returns immediately with the {@link CompletableFuture} holding the value {@code true}. | ||
* If the lock is not available then the {@link CompletableFuture} waits til : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/til/until/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
== Clustered Lock | ||
|
||
Clustered lock is a new building block introduced in Infinispan 9.2 | ||
It is a lock distributed and shared among all nodes in the Infinispan cluster and currently provides a way to execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"It is a lock which is distributed and shared..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
<dependency> | ||
<groupId>org.infinispan</groupId> | ||
<artifactId>infinispan-clustered-lock</artifactId> | ||
<version>...</version> <!-- 9.2.0.Final or higher --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. @karesti please add a <fullinfinispanversion>${project.version}</fullinfinispanversion>
to documentation/pom.xml
in the <attributes>
section so that we can substitute it correctly when docs are generated
`ClusteredLockManager.get("lockName")` is called. | ||
|
||
This means that each time we get a `ClusteredLock` instance with the `ClusteredCacheManager`, this instance will be | ||
a new instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for withOwner
---- | ||
|
||
* `lock` : | ||
Acquires the lock. If the lock is not available then call blocks til the lock is acquired. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/til/until/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
* `tryLock(long time, TimeUnit unit)` | ||
If the lock is available this method returns immediately with `true`. | ||
If the lock is not available then the call waits til : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/til/until/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Returns `true` when the lock is locked and `false` when the lock is released. | ||
|
||
* `isLockedByMe` | ||
Returns a `true` when the lock is owned by the caller and `false` when the lock is owned by someone else or it's released. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Returns a/Returns/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@MetaInfServices(value = ModuleLifecycle.class) | ||
public class ClusteredLockModuleLifecycle implements ModuleLifecycle { | ||
|
||
public static final String CLUSTERED_LOCK_CACHE_NAME = "___clustered_locks"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use org.infinispan.LOCKS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
|
||
import static org.infinispan.functional.FunctionalTestUtils.await; | ||
import static org.infinispan.test.TestingUtil.killCacheManagers; | ||
import static org.testng.Assert.assertEquals; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tend to favor the AssertJUnit versions of these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong import by mistake. Let me change and push
* Non reentrant lock * Lock Owner is the Node
@tristantarrant I reported your modifications, minus the substitution of the version at least for now. if jenkins is ok, I would like to merge |
@tristantarrant jenkins seems ok ! |
Merged. Thanks ! |
@tristantarrant thank you 😭 |
https://issues.jboss.org/browse/ISPN-8069