Skip to content

Commit

Permalink
Synchronize access to transport buffer #324
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jul 27, 2016
1 parent 207ca0d commit 41c7727
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Expand Up @@ -405,7 +405,7 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?>
throws Exception {

if (command.isCancelled()) {
transportBuffer.remove(command);
removeFromTransportBuffer(command);
return;
}

Expand Down Expand Up @@ -434,7 +434,7 @@ private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V,
for (RedisCommand<K, V, ?> command : commands) {

if (command.isCancelled()) {
transportBuffer.remove(command);
removeFromTransportBuffer(command);
continue;
}

Expand Down Expand Up @@ -465,7 +465,7 @@ private void queueCommand(RedisCommand<K, V, ?> command, ChannelPromise promise)
sentTimes.put(command, new SentReceived(nanoTime()));
queue.add(command);
}
transportBuffer.remove(command);
removeFromTransportBuffer(command);
} catch (Exception e) {
command.setException(e);
command.cancel(true);
Expand All @@ -474,6 +474,15 @@ private void queueCommand(RedisCommand<K, V, ?> command, ChannelPromise promise)
}
}

private void removeFromTransportBuffer(RedisCommand<K, V, ?> command) {
try {
writeLock.lock();
transportBuffer.remove(command);
} finally {
writeLock.unlock();
}
}

private long nanoTime() {
return System.nanoTime();
}
Expand Down

0 comments on commit 41c7727

Please sign in to comment.