Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 27, 2017
1 parent 1a11c30 commit 89eff38
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 46 deletions.
59 changes: 25 additions & 34 deletions redisson/src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
Expand Down Expand Up @@ -318,7 +317,6 @@ public RFuture<Void> shutdownAsync() {
}
ChannelGroupFuture channelsFuture = channels.close();

AtomicInteger counter = new AtomicInteger(2);
RPromise<Void> result = new RedissonPromise<Void>();
channelsFuture.addListener(new FutureListener<Void>() {
@Override
Expand All @@ -328,40 +326,33 @@ public void operationComplete(Future<Void> future) throws Exception {
return;
}

if (counter.decrementAndGet() == 0) {
result.trySuccess(null);
}
}
});

Thread t = new Thread() {
@Override
public void run() {
try {
if (hasOwnTimer) {
timer.stop();
}

if (hasOwnExecutor) {
executor.shutdown();
executor.awaitTermination(15, TimeUnit.SECONDS);
}

if (hasOwnGroup) {
bootstrap.config().group().shutdownGracefully();
Thread t = new Thread() {
@Override
public void run() {
try {
if (hasOwnTimer) {
timer.stop();
}

if (hasOwnExecutor) {
executor.shutdown();
executor.awaitTermination(15, TimeUnit.SECONDS);
}

if (hasOwnGroup) {
bootstrap.config().group().shutdownGracefully();
}
} catch (Exception e) {
result.tryFailure(e);
return;
}

result.trySuccess(null);
}
} catch (Exception e) {
result.tryFailure(e);
return;
}

if (counter.decrementAndGet() == 0) {
result.trySuccess(null);
}
};
t.start();
}
};

t.start();
});

return result;
}
Expand Down
Expand Up @@ -723,11 +723,17 @@ private ClusterPartition getPartition(Map<String, ClusterPartition> partitions,
@Override
public void shutdown() {
monitorFuture.cancel(true);
super.shutdown();


List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisConnection connection : nodeConnections.values()) {
connection.getRedisClient().shutdown();
RFuture<Void> future = connection.getRedisClient().shutdownAsync();
futures.add(future);
}

for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
super.shutdown();
}

private HashSet<ClusterPartition> getLastPartitions() {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.redisson.Version;
import org.redisson.api.NodeType;
Expand Down Expand Up @@ -128,7 +129,7 @@ public boolean cancel() {

protected MasterSlaveServersConfig config;

private final Map<Integer, MasterSlaveEntry> slot2entry = PlatformDependent.newConcurrentHashMap();
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<MasterSlaveEntry>(MAX_SLOT);
private final Map<InetSocketAddress, MasterSlaveEntry> addr2entry = PlatformDependent.newConcurrentHashMap();

private final RPromise<Boolean> shutdownPromise;
Expand Down Expand Up @@ -691,13 +692,13 @@ protected final void changeMaster(int slot, URI address) {
}

protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
slot2entry.put(slot, entry);
slot2entry.set(slot, entry);
entry.addSlotRange(slot);
addr2entry.put(entry.getClient().getAddr(), entry);
}

protected MasterSlaveEntry removeMaster(Integer slot) {
MasterSlaveEntry entry = slot2entry.remove(slot);
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
addr2entry.remove(entry.getClient().getAddr());
Expand Down
Expand Up @@ -16,7 +16,9 @@
package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -211,11 +213,17 @@ public void operationComplete(Future<Map<String, String>> future)
@Override
public void shutdown() {
monitorFuture.cancel(true);
super.shutdown();


List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisConnection connection : nodeConnections.values()) {
connection.getRedisClient().shutdown();
RFuture<Void> future = connection.getRedisClient().shutdownAsync();
futures.add(future);
}

for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
super.shutdown();
}
}

Expand Up @@ -411,11 +411,17 @@ protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {

@Override
public void shutdown() {
super.shutdown();

List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisClient sentinel : sentinels.values()) {
sentinel.shutdown();
RFuture<Void> future = sentinel.shutdownAsync();
futures.add(future);
}

for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}

super.shutdown();
}
}

0 comments on commit 89eff38

Please sign in to comment.