Skip to content

Commit

Permalink
JAVA-994, JAVA-996, JAVA-998, JAVA-1000, JAVA-1001: Ported Cluster im…
Browse files Browse the repository at this point in the history
…plementation from 3.0.x branch and replaced ConnectionStatus/ReplicaSetStatus/MongosStatus/DynamicConnectionStatus with it.
  • Loading branch information
jyemin committed Nov 5, 2013
1 parent 9efe02e commit 0375c98
Show file tree
Hide file tree
Showing 63 changed files with 4,568 additions and 2,052 deletions.
68 changes: 68 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2008 - 2013 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'java'
apply plugin: 'groovy'
apply plugin: 'jacoco'
apply plugin: 'idea'

sourceCompatibility = '1.5'
targetCompatibility = '1.5'

repositories {
mavenCentral()
}

dependencies {
testCompile 'junit:junit:4.11'
testCompile 'org.testng:testng:6.3.1'
testCompile 'org.codehaus.groovy:groovy-all:2.0.8'
testCompile ('org.spockframework:spock-core:0.7-groovy-2.0') {
exclude group: 'junit', module: 'junit-dep'
}
}

sourceSets {
main {
java { srcDirs = ['src/main'] }
}
test {
groovy { srcDirs = ['src/test'] }
java { srcDirs = ['src/test'] }
}
}

test {
useTestNG()
jacoco { enabled = false }

beforeTest { descr ->
logger.info("[Test ${descr.className} > ${descr.name}]")
}
}

task testAll(dependsOn: tasks.withType(Test))

gradle.taskGraph.whenReady { taskGraph ->
if (taskGraph.hasTask(check)) {
tasks.withType(Test) { jacoco { enabled = true } }
}
}

task wrapper(type: Wrapper) {
gradleVersion = '1.7'
}

217 changes: 217 additions & 0 deletions src/main/com/mongodb/BaseCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright (c) 2008 - 2013 10gen, Inc. <http://10gen.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

import static java.lang.String.format;
import static org.bson.util.Assertions.isTrue;
import static org.bson.util.Assertions.notNull;

abstract class BaseCluster implements Cluster {

private static final Logger LOGGER = Loggers.getLogger("cluster");

private final AtomicReference<CountDownLatch> phase = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
private final ClusterableServerFactory serverFactory;
private final ThreadLocal<Random> random = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
private final String clusterId;
private final ClusterSettings settings;
private final ClusterListener clusterListener;

private volatile boolean isClosed;
private volatile ClusterDescription description;

public BaseCluster(final String clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory,
final ClusterListener clusterListener) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = notNull("clusterListener", clusterListener);
clusterListener.clusterOpened(new ClusterEvent(clusterId));
}

@Override
public Server getServer(final ServerSelector serverSelector) {
isTrue("open", !isClosed());

try {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
List<ServerDescription> serverDescriptions = serverSelector.choose(curDescription);
final TimeUnit timeUnit = TimeUnit.NANOSECONDS;
final long endTime = System.nanoTime() + timeUnit.convert(20, TimeUnit.SECONDS); // TODO: configurable
while (true) {
if (!serverDescriptions.isEmpty()) {
ClusterableServer server = getRandomServer(new ArrayList<ServerDescription>(serverDescriptions));
if (server != null) {
return new WrappedServer(server);
}
}

if (!curDescription.isConnecting()) {
throw new MongoServerSelectionFailureException(
format("Unable to connect to any server that satisfies the selector " +
"%s", serverSelector));
}

final long timeout = endTime - System.nanoTime();

LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out",
serverSelector, curDescription, TimeUnit.MILLISECONDS.convert(timeout, timeUnit)));

if (!currentPhase.await(timeout, timeUnit)) {
throw new MongoTimeoutException(format("Timed out while waiting for a server that satisfies the selector: %s "
+ "after %d %s", serverSelector, timeout, timeUnit));
}
currentPhase = phase.get();
curDescription = description;
serverDescriptions = serverSelector.choose(curDescription);
}
} catch (InterruptedException e) {
throw new MongoInterruptedException(format("Interrupted while waiting for a server that satisfies server selector %s ",
serverSelector), e);
}
}

@Override
public ClusterDescription getDescription() {
isTrue("open", !isClosed());

try {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
final TimeUnit timeUnit = TimeUnit.NANOSECONDS;
final long endTime = System.nanoTime() + timeUnit.convert(20, TimeUnit.SECONDS); // TODO: configurable
while (curDescription.getType() == ClusterType.Unknown) {

if (!curDescription.isConnecting()) {
throw new MongoServerSelectionFailureException(format("Unable to connect to any servers"));
}

final long timeout = endTime - System.nanoTime();

LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
TimeUnit.MILLISECONDS.convert(timeout, timeUnit)));

if (!currentPhase.await(timeout, timeUnit)) {
throw new MongoTimeoutException(format("Timed out while waiting for the cluster description after waiting %d %s",
timeout, timeUnit));
}
currentPhase = phase.get();
curDescription = description;
}
return curDescription;
} catch (InterruptedException e) {
throw new MongoInterruptedException(format("Interrupted while waiting for the cluster description"), e);
}
}

public ClusterSettings getSettings() {
return settings;
}

@Override
public void close() {
if (!isClosed()) {
isClosed = true;
serverFactory.close();
phase.get().countDown();
clusterListener.clusterClosed(new ClusterEvent(clusterId));
}
}

@Override
public boolean isClosed() {
return isClosed;
}

/**
* Return the server at the given address.
*
* @param serverAddress the address
* @return the server, or null if the cluster no longer contains a server at this address.
*/
protected abstract ClusterableServer getServer(final ServerAddress serverAddress);

protected synchronized void updateDescription(final ClusterDescription newDescription) {
LOGGER.fine(format("Updating cluster description to %s", newDescription.getShortDescription()));

description = newDescription;
final CountDownLatch current = phase.getAndSet(new CountDownLatch(1));
current.countDown();
}

// this method is necessary so that subclasses can call fireChangeEvent with the old value of description.
protected ClusterDescription getDescriptionNoWaiting() {
return description;
}

protected void fireChangeEvent() {
clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description));
}

// gets a random server that still exists in the cluster. Returns null if there are none.
private ClusterableServer getRandomServer(final List<ServerDescription> serverDescriptions) {
while (!serverDescriptions.isEmpty()) {
int serverPos = getRandom().nextInt(serverDescriptions.size());
ClusterableServer server = getServer(serverDescriptions.get(serverPos).getAddress());
if (server != null) {
return server;
}
else {
serverDescriptions.remove(serverPos);
}
}
return null;
}

protected Random getRandom() {
return random.get();
}

protected ClusterableServer createServer(final ServerAddress serverAddress, final ChangeListener<ServerDescription>
serverStateListener) {
final ClusterableServer server = serverFactory.create(serverAddress);
server.addChangeListener(serverStateListener);
return server;
}

private static final class WrappedServer implements Server {
private final ClusterableServer wrapped;

public WrappedServer(final ClusterableServer server) {
wrapped = server;
}

@Override
public ServerDescription getDescription() {
return wrapped.getDescription();
}
}
}
40 changes: 40 additions & 0 deletions src/main/com/mongodb/ChainingServerSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2008 - 2013 10gen, Inc. <http://10gen.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb;

import java.util.List;

abstract class ChainingServerSelector implements ServerSelector {
private final ServerSelector chainedSelector;

protected ChainingServerSelector(final ServerSelector chainedSelector) {
this.chainedSelector = chainedSelector;
}

@Override
public final List<ServerDescription> choose(final ClusterDescription clusterDescription) {
List<ServerDescription> serverDescriptions = chooseStep(clusterDescription);
if (chainedSelector == null) {
return serverDescriptions;
}

return chainedSelector.choose(new ClusterDescription(clusterDescription.getConnectionMode(), clusterDescription.getType(),
serverDescriptions));
}

protected abstract List<ServerDescription> chooseStep(final ClusterDescription clusterDescription);
}

0 comments on commit 0375c98

Please sign in to comment.