Skip to content

Commit

Permalink
Refactoring. #680
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 25, 2016
1 parent b58c058 commit 937215e
Showing 1 changed file with 4 additions and 10 deletions.
Expand Up @@ -109,7 +109,8 @@ private void createConnection(final boolean checkFreezed, final AtomicInteger re


@Override @Override
public void run() { public void run() {
RFuture<T> promise = createConnection(entry, null); RPromise<T> promise = connectionManager.newPromise();
createConnection(entry, promise);
promise.addListener(new FutureListener<T>() { promise.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
Expand Down Expand Up @@ -159,7 +160,7 @@ protected ClientConnectionsEntry getEntry() {


public RFuture<T> get() { public RFuture<T> get() {
for (int j = entries.size() - 1; j >= 0; j--) { for (int j = entries.size() - 1; j >= 0; j--) {
ClientConnectionsEntry entry = getEntry(); final ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed() if (!entry.isFreezed()
&& tryAcquireConnection(entry)) { && tryAcquireConnection(entry)) {
final RPromise<T> result = connectionManager.newPromise(); final RPromise<T> result = connectionManager.newPromise();
Expand Down Expand Up @@ -238,13 +239,7 @@ private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
createConnection(entry, promise); createConnection(entry, promise);
} }


private RFuture<T> createConnection(final ClientConnectionsEntry entry, RPromise<T> ppromise) { private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
final RPromise<T> promise;
if (ppromise != null) {
promise = ppromise;
} else {
promise = connectionManager.newPromise();
}
RFuture<T> connFuture = connect(entry); RFuture<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() { connFuture.addListener(new FutureListener<T>() {
@Override @Override
Expand All @@ -263,7 +258,6 @@ public void operationComplete(Future<T> future) throws Exception {
connectedSuccessful(entry, promise, conn); connectedSuccessful(entry, promise, conn);
} }
}); });
return promise;
} }


private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) { private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
Expand Down

0 comments on commit 937215e

Please sign in to comment.