Skip to content

Commit

Permalink
Add lock renewal for KinesisChannelAdapter
Browse files Browse the repository at this point in the history
Related to spring-cloud/spring-cloud-stream-binder-aws-kinesis#148

The current implementation has a flaw when it uses a distributed lock
for an exclusive access to shard for consuming only once at start up.
Such a behavior cause the problem when we have a network glitch at
runtime, so the lock is broken, but consumer is still active to retry
consumption attempts

* Add `renewLockIfAny()` logic ot the `ShardConsumer`, so we ensure
that we are still a lock holder and don't consume otherwise
* Add `unlockFuture` logic to block the `ShardConsumer.stop()`
until we really got lock unlocked.
Otherwise we end up with the race condition when we are still
stopping, but already ready to start a new consumer for the same
shard
  • Loading branch information
artembilan committed Dec 18, 2020
1 parent c15e345 commit ac74dfd
Showing 1 changed file with 104 additions and 9 deletions.
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
Expand Down Expand Up @@ -915,7 +916,17 @@ void setNotifier(Runnable notifier) {
void stop() {
this.state = ConsumerState.STOP;
if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(this.key);
LockCompletableFuture unlockFuture = new LockCompletableFuture(this.key);
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(unlockFuture);
try {
unlockFuture.get(1, TimeUnit.SECONDS);
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.info("The lock for key '" + this.key + "' was not unlocked in time", ex);
}
}
if (this.notifier != null) {
this.notifier.run();
Expand All @@ -929,6 +940,10 @@ void close() {

void execute() {
if (this.task == null) {
if (!renewLockIfAny()) {
return;
}

switch (this.state) {
case NEW:
case EXPIRED:
Expand Down Expand Up @@ -1007,6 +1022,36 @@ void execute() {
}
}

private boolean renewLockIfAny() {
if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null && this.state == ConsumerState.CONSUME) {
LockCompletableFuture renewLockFuture = new LockCompletableFuture(this.key);
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.renewLock(renewLockFuture);
boolean lockRenewed = false;
try {
lockRenewed = renewLockFuture.get(1, TimeUnit.SECONDS);
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.info("The lock for key '" + this.key + "' was not renewed in time", ex);
}

if (!lockRenewed && this.state == ConsumerState.CONSUME) {
this.state = ConsumerState.STOP;
this.checkpointer.close();
if (this.notifier != null) {
this.notifier.run();
}
if (KinesisMessageDrivenChannelAdapter.this.active) {
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.addShardToConsume(this.shardOffset);
}
return false;
}
}
return true;
}

private Runnable processTask() {
return () -> {
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
Expand Down Expand Up @@ -1368,7 +1413,9 @@ private final class ShardConsumerManager implements SchedulingAwareRunnable {

private final Map<String, Lock> locks = new HashMap<>();

private final Queue<String> forUnlocking = new ConcurrentLinkedQueue<>();
private final Queue<LockCompletableFuture> forUnlocking = new ConcurrentLinkedQueue<>();

private final Queue<LockCompletableFuture> forRenewing = new ConcurrentLinkedQueue<>();

ShardConsumerManager() {
}
Expand All @@ -1379,15 +1426,18 @@ void addShardToConsume(KinesisShardOffset kinesisShardOffset) {
this.shardOffsetsToConsumer.put(lockKey, kinesisShardOffset);
}

void unlock(String lockKey) {
this.forUnlocking.add(lockKey);
void unlock(LockCompletableFuture unlockFuture) {
this.forUnlocking.add(unlockFuture);
}

void renewLock(LockCompletableFuture renewLockFuture) {
this.forRenewing.add(renewLockFuture);
}

@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {

this.shardOffsetsToConsumer
.entrySet()
.removeIf(
Expand Down Expand Up @@ -1418,9 +1468,9 @@ public void run() {
});

while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
String lockKey = this.forUnlocking.poll();
if (lockKey != null) {
Lock lock = this.locks.remove(lockKey);
LockCompletableFuture forUnlocking = this.forUnlocking.poll();
if (forUnlocking != null) {
Lock lock = this.locks.remove(forUnlocking.lockKey);
if (lock != null) {
try {
lock.unlock();
Expand All @@ -1429,13 +1479,48 @@ public void run() {
logger.error("Error during unlocking: " + lock, e);
}
}
forUnlocking.complete(true);
}
else {
break;
}
}

sleep(250, new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"), true);
while (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
LockCompletableFuture lockFuture = this.forRenewing.poll();
if (lockFuture != null) {
Lock lock = this.locks.get(lockFuture.lockKey);
if (lock != null) {
try {
if (lock.tryLock()) {
try {
lockFuture.complete(true);
}
finally {
lock.unlock();
}
}
else {
lockFuture.complete(false);
this.locks.remove(lockFuture.lockKey);
}
}
catch (Exception e) {
logger.error("Error during locking: " + lock, e);
}
}
else {
lockFuture.complete(false);
}
}
else {
break;
}
}

sleep(250,
new IllegalStateException("ShardConsumerManager Thread [" + this + "] has been interrupted"),
true);
}
}
finally {
Expand All @@ -1461,4 +1546,14 @@ public boolean isLongLived() {

}

private final static class LockCompletableFuture extends CompletableFuture<Boolean> {

private final String lockKey;

LockCompletableFuture(String lockKey) {
this.lockKey = lockKey;
}

}

}

0 comments on commit ac74dfd

Please sign in to comment.