Skip to content

Commit

Permalink
JAVA-2389: Make minimum value for max staleness either 90 seconds or …
Browse files Browse the repository at this point in the history
…the heartbeat frequency plus the idle write period, whichever is greatest
  • Loading branch information
jyemin committed Nov 21, 2016
1 parent f7a2779 commit 283ec1d
Show file tree
Hide file tree
Showing 40 changed files with 180 additions and 681 deletions.
13 changes: 7 additions & 6 deletions driver-core/src/main/com/mongodb/ConnectionString.java
Expand Up @@ -158,7 +158,9 @@
* <li>{@code maxStalenessSeconds=seconds}. The maximum staleness in seconds. For use with any non-primary read preference, the driver
* estimates the staleness of each secondary, based on lastWriteDate values provided in server isMaster responses, and selects only those
* secondaries whose staleness is less than or equal to maxStalenessSeconds. Not providing the parameter or explicitly setting it to -1
* indicates that there should be no max staleness check.
* indicates that there should be no max staleness check. The maximum staleness feature is designed to prevent badly-lagging servers from
* being selected. The staleness estimate is imprecise and shouldn't be used to try to select "up-to-date" secondaries. The minimum value
* is either 90 seconds, or the heartbeat frequency plus 10 seconds, whichever is greatest.
* </li>
* </ul>
* <p>Authentication configuration:</p>
Expand Down Expand Up @@ -448,7 +450,7 @@ private WriteConcern createWriteConcern(final Map<String, List<String>> optionsM
private ReadPreference createReadPreference(final Map<String, List<String>> optionsMap) {
String readPreferenceType = null;
List<TagSet> tagSetList = new ArrayList<TagSet>();
double maxStalenessSeconds = -1;
long maxStalenessSeconds = -1;

for (final String key : READ_PREFERENCE_KEYS) {
String value = getLastValue(optionsMap, key);
Expand All @@ -459,7 +461,7 @@ private ReadPreference createReadPreference(final Map<String, List<String>> opti
if (key.equals("readpreference")) {
readPreferenceType = value;
} else if (key.equals("maxstalenessseconds")) {
maxStalenessSeconds = Double.parseDouble(value);
maxStalenessSeconds = parseInteger(value, "maxstalenessseconds");
} else if (key.equals("readpreferencetags")) {
for (final String cur : optionsMap.get(key)) {
TagSet tagSet = getTags(cur.trim());
Expand Down Expand Up @@ -601,15 +603,14 @@ private Map<String, List<String>> parseOptions(final String optionsPart) {
}

private ReadPreference buildReadPreference(final String readPreferenceType,
final List<TagSet> tagSetList, final double maxStalenessSeconds) {
final List<TagSet> tagSetList, final long maxStalenessSeconds) {
if (readPreferenceType != null) {
if (tagSetList.isEmpty() && maxStalenessSeconds == -1) {
return ReadPreference.valueOf(readPreferenceType);
} else if (maxStalenessSeconds == -1) {
return ReadPreference.valueOf(readPreferenceType, tagSetList);
} else {
return ReadPreference.valueOf(readPreferenceType, tagSetList,
Math.round(maxStalenessSeconds * 1000), TimeUnit.MILLISECONDS);
return ReadPreference.valueOf(readPreferenceType, tagSetList, maxStalenessSeconds, TimeUnit.SECONDS);
}
} else if (!(tagSetList.isEmpty() && maxStalenessSeconds == -1)) {
throw new IllegalArgumentException("Read preference mode must be specified if "
Expand Down
52 changes: 39 additions & 13 deletions driver-core/src/main/com/mongodb/ReadPreference.java
Expand Up @@ -138,10 +138,12 @@ public static ReadPreference nearest() {
/**
* Gets a read preference that forces reads to the primary if available, otherwise to a secondary.
*
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads primary if available.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static ReadPreference primaryPreferred(final long maxStaleness, final TimeUnit timeUnit) {
return new PrimaryPreferredReadPreference(Collections.<TagSet>emptyList(), maxStaleness, timeUnit);
Expand All @@ -155,10 +157,12 @@ public static ReadPreference primaryPreferred(final long maxStaleness, final Tim
* and selects only those secondaries whose staleness is less than or equal to maxStaleness.
* </p>
*
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static ReadPreference secondary(final long maxStaleness, final TimeUnit timeUnit) {
return new SecondaryReadPreference(Collections.<TagSet>emptyList(), maxStaleness, timeUnit);
Expand All @@ -172,10 +176,12 @@ public static ReadPreference secondary(final long maxStaleness, final TimeUnit t
* The driver estimates the staleness of each secondary, based on lastWriteDate values provided in server isMaster responses,
* and selects only those secondaries whose staleness is less than or equal to maxStaleness.
* </p> *
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary if available, otherwise from primary.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static ReadPreference secondaryPreferred(final long maxStaleness, final TimeUnit timeUnit) {
return new SecondaryPreferredReadPreference(Collections.<TagSet>emptyList(), maxStaleness, timeUnit);
Expand All @@ -189,10 +195,12 @@ public static ReadPreference secondaryPreferred(final long maxStaleness, final T
* and selects only those secondaries whose staleness is less than or equal to maxStaleness.
* </p>
*
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads nearest
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static ReadPreference nearest(final long maxStaleness, final TimeUnit timeUnit) {
return new NearestReadPreference(Collections.<TagSet>emptyList(), maxStaleness, timeUnit);
Expand Down Expand Up @@ -252,10 +260,12 @@ public static TaggableReadPreference nearest(final TagSet tagSet) {
* </p>
*
* @param tagSet the set of tags to limit the list of secondaries to.
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads primary if available, otherwise a secondary respective of tags.\
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference primaryPreferred(final TagSet tagSet,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -271,10 +281,12 @@ public static TaggableReadPreference primaryPreferred(final TagSet tagSet,
* </p>
*
* @param tagSet the set of tags to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary respective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference secondary(final TagSet tagSet,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -290,10 +302,12 @@ public static TaggableReadPreference secondary(final TagSet tagSet,
* and selects only those secondaries whose staleness is less than or equal to maxStaleness.
* </p> *
* @param tagSet the set of tags to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary if available respective of tags, otherwise from primary irrespective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference secondaryPreferred(final TagSet tagSet,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -310,10 +324,12 @@ public static TaggableReadPreference secondaryPreferred(final TagSet tagSet,
* </p>
*
* @param tagSet the set of tags to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads nearest node respective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference nearest(final TagSet tagSet,
final long maxStaleness, final TimeUnit timeUnit) {
Expand Down Expand Up @@ -388,10 +404,12 @@ public static TaggableReadPreference nearest(final List<TagSet> tagSetList) {
* </p>
*
* @param tagSetList the list of tag sets to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads primary if available, otherwise a secondary respective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference primaryPreferred(final List<TagSet> tagSetList,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -413,10 +431,12 @@ public static TaggableReadPreference primaryPreferred(final List<TagSet> tagSetL
* </p>
*
* @param tagSetList the list of tag sets to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary respective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference secondary(final List<TagSet> tagSetList,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -438,10 +458,12 @@ public static TaggableReadPreference secondary(final List<TagSet> tagSetList,
* </p>
*
* @param tagSetList the list of tag sets to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads secondary if available respective of tags, otherwise from primary irrespective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference secondaryPreferred(final List<TagSet> tagSetList,
final long maxStaleness, final TimeUnit timeUnit) {
Expand All @@ -463,10 +485,12 @@ public static TaggableReadPreference secondaryPreferred(final List<TagSet> tagSe
* </p>
*
* @param tagSetList the list of tag sets to limit the list of secondaries to
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return ReadPreference which reads nearest node respective of tags.
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference nearest(final List<TagSet> tagSetList,
final long maxStaleness, final TimeUnit timeUnit) {
Expand Down Expand Up @@ -527,10 +551,12 @@ public static TaggableReadPreference valueOf(final String name, final List<TagSe
*
* @param name the name of the read preference
* @param tagSetList the list of tag sets
* @param maxStaleness the max allowable staleness of secondaries.
* @param maxStaleness the max allowable staleness of secondaries. The minimum value is either 90 seconds, or the heartbeat frequency
* plus 10 seconds, whichever is greatest.
* @param timeUnit the time unit of maxStaleness
* @return the taggable read preference
* @since 3.4
* @see TaggableReadPreference#getMaxStaleness(TimeUnit)
*/
public static TaggableReadPreference valueOf(final String name, final List<TagSet> tagSetList, final long maxStaleness,
final TimeUnit timeUnit) {
Expand Down
44 changes: 30 additions & 14 deletions driver-core/src/main/com/mongodb/TaggableReadPreference.java
Expand Up @@ -23,7 +23,7 @@
import com.mongodb.connection.ServerType;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt64;
import org.bson.BsonString;

import java.util.ArrayList;
Expand All @@ -36,12 +36,16 @@
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Abstract class for all preference which can be combined with tags
*/
@Immutable
public abstract class TaggableReadPreference extends ReadPreference {
private static final int SMALLEST_MAX_STALENESS_MS = 90000;
private static final int IDLE_WRITE_PERIOD_MS = 10000;

private final List<TagSet> tagSetList = new ArrayList<TagSet>();
private final Long maxStalenessMS;

Expand Down Expand Up @@ -73,7 +77,7 @@ public BsonDocument toDocument() {
}

if (maxStalenessMS != null) {
readPrefObject.put("maxStalenessSeconds", new BsonDouble(maxStalenessMS / 1000.0));
readPrefObject.put("maxStalenessSeconds", new BsonInt64(MILLISECONDS.toSeconds(maxStalenessMS)));
}
return readPrefObject;
}
Expand All @@ -90,12 +94,18 @@ public List<TagSet> getTagSetList() {

/**
* Gets the maximum acceptable staleness of a secondary in order to be considered for read operations.
*
* <p>
* The maximum staleness feature is designed to prevent badly-lagging servers from being selected. The staleness estimate is imprecise
* and shouldn't be used to try to select "up-to-date" secondaries.
* </p>
* <p>
* The driver estimates the staleness of each secondary, based on lastWriteDate values provided in server isMaster responses,
* and selects only those secondaries whose staleness is less than or equal to maxStaleness.
* </p>
* @param timeUnit the time unit in which to return the value
* @return the maximum acceptable staleness in the given time unit, or null if the value is not set
*
* @since 3.4
* @mongodb.server.release 3.4
* @since 3.4
*/
public Long getMaxStaleness(final TimeUnit timeUnit) {
notNull("timeUnit", timeUnit);
Expand Down Expand Up @@ -176,16 +186,22 @@ protected List<ServerDescription> selectFreshServers(final ClusterDescription cl
return servers;
}

if (servers.isEmpty()) {
return servers;
}

long heartbeatFrequencyMS = clusterDescription.getServerSettings().getHeartbeatFrequency(MILLISECONDS);

ServerDescription mostUpToDateServerDescription = getMostUpToDateServerDescription(clusterDescription);
if (mostUpToDateServerDescription != null
&& getMaxStaleness(MILLISECONDS) < heartbeatFrequencyMS + mostUpToDateServerDescription.getIdleWritePeriodMillis()) {
throw new MongoConfigurationException(format("Max staleness (%d ms) must be at least the heartbeat period (%d ms) "
+ "plus the idle write period (%d ms)",
getMaxStaleness(MILLISECONDS), heartbeatFrequencyMS, mostUpToDateServerDescription.getIdleWritePeriodMillis()));
if (getMaxStaleness(MILLISECONDS) < Math.max(SMALLEST_MAX_STALENESS_MS, heartbeatFrequencyMS + IDLE_WRITE_PERIOD_MS)) {
if (SMALLEST_MAX_STALENESS_MS > heartbeatFrequencyMS + IDLE_WRITE_PERIOD_MS){
throw new MongoConfigurationException(format("Max staleness (%d sec) must be at least 90 seconds",
getMaxStaleness(SECONDS)));
} else {
throw new MongoConfigurationException(format("Max staleness (%d ms) must be at least the heartbeat period (%d ms) "
+ "plus the idle write period (%d ms)",
getMaxStaleness(MILLISECONDS), heartbeatFrequencyMS, IDLE_WRITE_PERIOD_MS));
}
}

List<ServerDescription> freshServers = new ArrayList<ServerDescription>(servers.size());

ServerDescription primary = findPrimary(clusterDescription);
Expand All @@ -200,7 +216,7 @@ && getMaxStaleness(MILLISECONDS) < heartbeatFrequencyMS + mostUpToDateServerDesc
}
}
}
} else {
} else {
ServerDescription mostUpdateToDateSecondary = findMostUpToDateSecondary(clusterDescription);
for (ServerDescription cur : servers) {
if (mostUpdateToDateSecondary.getLastWriteDate().getTime() - cur.getLastWriteDate().getTime() + heartbeatFrequencyMS
Expand Down Expand Up @@ -363,7 +379,7 @@ public List<ServerDescription> chooseForReplicaSet(final ClusterDescription clus
}
}
return selectedServers;
}
}
}

/**
Expand Down

0 comments on commit 283ec1d

Please sign in to comment.