Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 12, 2016
1 parent 72afcc4 commit e4d2a06
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
27 changes: 13 additions & 14 deletions redisson/src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -41,7 +41,6 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;

/**
Expand Down Expand Up @@ -540,12 +539,12 @@ public void operationComplete(Future<Long> future) throws Exception {
if (entry.getLatch().tryAcquire()) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<ScheduledFuture<?>>();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
if (futureRef.get() != null) {
futureRef.get().cancel(false);
futureRef.get().cancel();
}
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
}
Expand All @@ -554,9 +553,9 @@ public void run() {
entry.addListener(listener);

if (ttl >= 0) {
ScheduledFuture<?> scheduledFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
Expand Down Expand Up @@ -615,7 +614,7 @@ public void operationComplete(Future<Long> future) throws Exception {
}

final long current = System.currentTimeMillis();
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<ScheduledFuture<?>>();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Future<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
Expand All @@ -626,7 +625,7 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
}

if (futureRef.get() != null) {
futureRef.get().cancel(false);
futureRef.get().cancel();
}

long elapsed = System.currentTimeMillis() - current;
Expand All @@ -642,9 +641,9 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
}
});
if (!subscribeFuture.isDone()) {
ScheduledFuture<?> scheduledFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
subscribeFuture.cancel(false);
result.trySuccess(false);
Expand Down Expand Up @@ -694,13 +693,13 @@ public void operationComplete(Future<Long> future) throws Exception {
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<ScheduledFuture<?>>();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel(false);
futureRef.get().cancel();
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
Expand All @@ -715,14 +714,14 @@ public void run() {
t = ttl;
}
if (!executed.get()) {
ScheduledFuture<?> scheduledFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);

tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
}
}
Expand Down
21 changes: 11 additions & 10 deletions redisson/src/main/java/org/redisson/RedissonSemaphore.java
Expand Up @@ -31,10 +31,11 @@
import org.redisson.misc.RPromise;
import org.redisson.pubsub.SemaphorePubSub;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;

/**
* Distributed and concurrent implementation of {@link java.util.concurrent.Semaphore}.
Expand Down Expand Up @@ -165,13 +166,13 @@ public void operationComplete(Future<Boolean> future) throws Exception {
tryAcquireAsync(time, permits, subscribeFuture, result);
} else {
final AtomicBoolean executed = new AtomicBoolean();
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<ScheduledFuture<?>>();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Runnable listener = new Runnable() {
@Override
public void run() {
executed.set(true);
if (futureRef.get() != null) {
futureRef.get().cancel(false);
futureRef.get().cancel();
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
Expand All @@ -183,14 +184,14 @@ public void run() {

long t = time.get();
if (!executed.get()) {
ScheduledFuture<?> scheduledFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
synchronized (entry) {
if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);

tryAcquireAsync(time, permits, subscribeFuture, result);
}
}
Expand Down Expand Up @@ -331,7 +332,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
}

final long current = System.currentTimeMillis();
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<ScheduledFuture<?>>();
final AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
final Future<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
Expand All @@ -342,7 +343,7 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
}

if (futureRef.get() != null) {
futureRef.get().cancel(false);
futureRef.get().cancel();
}

long elapsed = System.currentTimeMillis() - current;
Expand All @@ -359,9 +360,9 @@ public void operationComplete(Future<RedissonLockEntry> future) throws Exception
});

if (!subscribeFuture.isDone()) {
ScheduledFuture<?> scheduledFuture = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
if (!subscribeFuture.isDone()) {
result.trySuccess(false);
}
Expand Down
Expand Up @@ -556,13 +556,13 @@ public void operationComplete(Future<Boolean> future) throws Exception {
};

final AtomicBoolean canceledByScheduler = new AtomicBoolean();
final ScheduledFuture<?> scheduledFuture;
final Timeout scheduledFuture;
if (popTimeout != 0) {
// to handle cases when connection has been lost
final Channel orignalChannel = connection.getChannel();
scheduledFuture = connectionManager.getGroup().schedule(new Runnable() {
scheduledFuture = connectionManager.newTimeout(new TimerTask() {
@Override
public void run() {
public void run(Timeout timeout) throws Exception {
// re-connection wasn't made
// and connection is still active
if (orignalChannel == connection.getChannel()
Expand All @@ -582,7 +582,7 @@ public void run() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduledFuture.cancel();
}

synchronized (listener) {
Expand Down

0 comments on commit e4d2a06

Please sign in to comment.