Permalink
Browse files

JAVA-381: Added MongosStatus class as a sibling to ReplicaSetStatus t…

…o handle mongos failover.

Refactored ReplicaSetStatus and MongosStatus to share a common base class ConnectionStatus.
Added DynamicConnectionStatus class to handle dynamic discovery of whether a list of seed nodes represents a bunch of mongos servers or a replicaset.
Is uses ExecutorService to handle multiple threads that communicate with each member of the seed list.
Changed DBTCPConnection to depend on DynamicConnectionStatus insteand of ReplicaSetStatus.
1 parent b857e44 commit d51b3648a8e1bf1a7b7886b7ceb343064c9e2225 @jyemin jyemin committed Jul 5, 2012
View
@@ -50,3 +50,4 @@ src/test/ed/webtests/webtest-local.bash
*.ipr
*.iws
+atlassian-ide-plugin.xml
@@ -0,0 +1,251 @@
+/**
+ * Copyright (c) 2008 - 2012 10gen, Inc. <http://10gen.com>
+ * <p/>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Base class for classes that manage connections to mongo instances as background tasks.
+ */
+abstract class ConnectionStatus {
+
+ ConnectionStatus(List<ServerAddress> mongosAddresses, Mongo mongo) {
+ _mongoOptions = mongoOptionsDefaults.copy();
+ _mongoOptions.socketFactory = mongo._options.socketFactory;
+ this._mongosAddresses = new ArrayList<ServerAddress>(mongosAddresses);
+ this._mongo = mongo;
+ }
+
+ protected BackgroundUpdater _updater;
+ protected final Mongo _mongo;
+ protected final List<ServerAddress> _mongosAddresses;
+ protected volatile boolean _closed;
+ protected final MongoOptions _mongoOptions;
+
+ protected static int updaterIntervalMS;
+ protected static int updaterIntervalNoMasterMS;
+ protected static final MongoOptions mongoOptionsDefaults = new MongoOptions();
+ protected static final float latencySmoothFactor;
+ protected static final DBObject isMasterCmd = new BasicDBObject("ismaster", 1);
+
+ /**
+ * Start the updater if there is one
+ */
+ void start() {
+ if (_updater != null) {
+ _updater.start();
+ }
+ }
+
+ /**
+ * Stop the updater if there is one
+ */
+ void close() {
+ _closed = true;
+ if (_updater != null) {
+ _updater.interrupt();
+ }
+ }
+
+ /**
+ * Gets the list of addresses for this connection.
+ */
+ abstract List<ServerAddress> getServerAddressList();
+
+
+ /**
+ * Whether there is least one server up.
+ */
+ abstract boolean hasServerUp();
+
+ /**
+ * Ensures that we have the current master, if there is one. If the current snapshot of the replica set
+ * has no master, this method waits one cycle to find a new master, and returns it if found, or null if not.
+ *
+ * @return address of the current master, or null if there is none
+ */
+ abstract Node ensureMaster();
+
+ /**
+ * Whether this connection has been closed.
+ */
+ void isClosed() {
+ if (_closed)
+ throw new IllegalStateException("ReplicaSetStatus closed");
+ }
+
+ static {
+ updaterIntervalMS = Integer.parseInt(System.getProperty("com.mongodb.updaterIntervalMS", "5000"));
+ updaterIntervalNoMasterMS = Integer.parseInt(System.getProperty("com.mongodb.updaterIntervalNoMasterMS", "10"));
+ mongoOptionsDefaults.connectTimeout = Integer.parseInt(System.getProperty("com.mongodb.updaterConnectTimeoutMS", "20000"));
+ mongoOptionsDefaults.socketTimeout = Integer.parseInt(System.getProperty("com.mongodb.updaterSocketTimeoutMS", "20000"));
+ latencySmoothFactor = Float.parseFloat(System.getProperty("com.mongodb.latencySmoothFactor", "4"));
+ }
+
+ static class Node {
+
+ Node(float pingTime, ServerAddress addr, int maxBsonObjectSize, boolean ok) {
+ this._pingTime = pingTime;
+ this._addr = addr;
+ this._maxBsonObjectSize = maxBsonObjectSize;
+ this._ok = ok;
+ }
+
+ public boolean isOk() {
+ return _ok;
+ }
+
+ public int getMaxBsonObjectSize() {
+ return _maxBsonObjectSize;
+ }
+
+ public ServerAddress getServerAddress() {
+ return _addr;
+ }
+
+ protected final ServerAddress _addr;
+ protected final float _pingTime;
+ protected final boolean _ok;
+ protected final int _maxBsonObjectSize;
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Node node = (Node) o;
+
+ if (_maxBsonObjectSize != node._maxBsonObjectSize) return false;
+ if (_ok != node._ok) return false;
+ if (Float.compare(node._pingTime, _pingTime) != 0) return false;
+ if (!_addr.equals(node._addr)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = _addr.hashCode();
+ result = 31 * result + (_pingTime != +0.0f ? Float.floatToIntBits(_pingTime) : 0);
+ result = 31 * result + (_ok ? 1 : 0);
+ result = 31 * result + _maxBsonObjectSize;
+ return result;
+ }
+
+ public String toJSON() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("{");
+ buf.append("address:'").append(_addr).append("', ");
+ buf.append("ok:").append(_ok).append(", ");
+ buf.append("ping:").append(_pingTime).append(", ");
+ buf.append("maxBsonObjectSize:").append(_maxBsonObjectSize).append(", ");
+ buf.append("}");
+
+ return buf.toString();
+ }
+ }
+
+ static class BackgroundUpdater extends Thread {
+ public BackgroundUpdater(final String name) {
+ super(name);
+ setDaemon(true);
+ }
+ }
+
+ static abstract class UpdatableNode {
+ UpdatableNode(final ServerAddress addr, Mongo mongo, MongoOptions mongoOptions) {
+ this._addr = addr;
+ this._mongo = mongo;
+ this._mongoOptions = mongoOptions;
+ this._port = new DBPort(addr, null, mongoOptions);
+ }
+
+ public CommandResult update() {
+ CommandResult res = null;
+ try {
+ long start = System.nanoTime();
+ res = _port.runCommand(_mongo.getDB("admin"), isMasterCmd);
+ long end = System.nanoTime();
+ float newPingMS = (end - start) / 1000000F;
+ if (!successfullyContacted)
+ _pingTimeMS = newPingMS;
+ else
+ _pingTimeMS = _pingTimeMS + ((newPingMS - _pingTimeMS) / latencySmoothFactor);
+
+ getLogger().log(Level.FINE, "Latency to " + _addr + " actual=" + newPingMS + " smoothed=" + _pingTimeMS);
+
+ successfullyContacted = true;
+
+ if (res == null) {
+ throw new MongoInternalException("Invalid null value returned from isMaster");
+ }
+
+ if (!_ok) {
+ getLogger().log(Level.INFO, "Server seen up: " + _addr);
+ }
+ _ok = true;
+
+ // max size was added in 1.8
+ if (res.containsField("maxBsonObjectSize")) {
+ _maxBsonObjectSize = (Integer) res.get("maxBsonObjectSize");
+ } else {
+ _maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
+ }
+ } catch (Exception e) {
+ if (!((_ok) ? true : (Math.random() > 0.1))) {
@Lacrymology
Lacrymology May 29, 2013

wat?

does somebody care to explain this?

@0xabad1dea
0xabad1dea May 29, 2013

Excuse me but

What the hell

⚠️ ⚠️ ⚠️

@cowtowncoder
cowtowncoder May 29, 2013

It would seem like a poster child for all those "Do NOT drink and commit" campaigns.... unless there is some truly stellar explanation that makes it make sense. A comment would be nice, regardless, unless it would ruin the joke.

@grasGendarme
grasGendarme May 29, 2013

This feel so right

@sartak
sartak May 29, 2013

I traced the provenance of this code to ee7543a. Wow.

@gareth-rees
gareth-rees May 29, 2013

Presumably the idea is to log only about 1/10 of the server failures (and so avoid massively spamming the log), without incurring the cost of maintaining a counter or timer. (But surely maintaining a timer would be affordable?)

@NightKev
NightKev May 29, 2013

Is this GitHub or TDWTF? Am I on the wrong website?

@Lacrymology
Lacrymology May 29, 2013

10% of failures, sounds like line noise emulation

@aloiscochard
aloiscochard May 30, 2013

This code is WebScale (c)

@jtimberman
jtimberman May 31, 2013

Instead of making fun of this code, or otherwise degrading it (or the entire project), why not:

  1. Ask questions about why it was written this way. Perhaps there's a reason you don't know. It might be non-obvious.
  2. Open a ticket in the upstream project bug tracker asking if a different approach is welcome.
  3. Submit a pull request or otherwise provide helpful insight as to why this is possibly less than ideal.

Arguably 3 is related to 2, but you get the idea.

@Lacrymology
Lacrymology May 31, 2013

I quote myself

does somebody care to explain this?

@raganwald
raganwald May 31, 2013

Well, @jtimberman, that is all very nice motherhood and apple pie sentiment, but it isn't an explanation, is it? But since you asked, here's a simple optimization that I recall writing into a Scheme interpreter a few decades ago. The JVM is probably doing this behind the scenes, but in this case we'll do it for readability instead of for performance.

Instead of:

if (!((_ok) ? true : (Math.random() > 0.1))) {

Let's distribute the bang:

if ( _ok ? false : (Math.random() <= 0.1) ) {

The logical ternary is a fine instrument, but if we say what this does aloud, we'd say "If it's not _ok then if Math.random() <= 0.1... So let's write that:

if ( !_ok && (Math.random() <= 0.1) ) {

This seems to my eyes to be simpler and it communicates our intent directly.

We can check our work with a truth table. Given A = _ok and B = (Math.random() > 0.1), the original expression's truth table is:

F F -> T
F T -> F
T F -> F
T T -> F

And our refactored truth table is:

F F -> T
F T -> F
T F -> F
T T -> F

LGTM!

@jgowdy
jgowdy May 31, 2013

Uhh, maybe use an integer with an increment and a mod 10?

@aloiscochard
aloiscochard Jun 1, 2013

IMO, the ideal approach would be to keep state of node and log exception only once. Then obviously reset the state once the node is back.

So instead of flooding log, you just add one entry when the node become unavailable, and an other one when the node is back.

+ return res;
+ }
+
+ final StringBuilder logError = (new StringBuilder("Server seen down: ")).append(_addr);
+
+ if (e instanceof IOException) {
+
+ logError.append(" - ").append(IOException.class.getName());
+
+ if (e.getMessage() != null) {
+ logError.append(" - message: ").append(e.getMessage());
+ }
+
+ getLogger().log(Level.WARNING, logError.toString());
+
+ } else {
+ getLogger().log(Level.WARNING, logError.toString(), e);
+ }
+ _ok = false;
+ }
+
+ return res;
+ }
+
+ protected abstract Logger getLogger();
+
+ final ServerAddress _addr;
+ final MongoOptions _mongoOptions;
+ final Mongo _mongo;
+
+ DBPort _port; // we have our own port so we can set different socket options and don't have to worry about the pool
+
+ boolean successfullyContacted = false;
+ boolean _ok = false;
+ float _pingTimeMS = 0;
+ int _maxBsonObjectSize;
+ }
+
+}
@@ -18,22 +18,17 @@
package com.mongodb;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
+import com.mongodb.util.SimplePool;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-
-import com.mongodb.util.SimplePool;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
public class DBPortPool extends SimplePool<DBPort> {
@@ -198,17 +193,18 @@ public DBPort get() {
return port;
}
- void gotError( Exception e ){
+ // return true if the exception is recoverable
+ boolean gotError( Exception e ){
if ( e instanceof java.nio.channels.ClosedByInterruptException ||
e instanceof InterruptedException ){
// this is probably a request that is taking too long
// so usually doesn't mean there is a real db problem
- return;
+ return true;
}
if ( e instanceof java.net.SocketTimeoutException ){
// we don't want to clear the port pool for a connection timing out
- return;
+ return true;
}
Bytes.LOGGER.log( Level.WARNING , "emptying DBPortPool to " + getServerAddress() + " b/c of error" , e );
@@ -227,6 +223,7 @@ void gotError( Exception e ){
done(p);
}
+ return false;
}
void close(){
Oops, something went wrong.

0 comments on commit d51b364

Please sign in to comment.