Skip to content

Commit

Permalink
CSHARP-315: Changed WrapQuery in MongoCursorEnumerator to send ReadPr…
Browse files Browse the repository at this point in the history
…eference over the wire when sending query to a mongos.
  • Loading branch information
rstam committed Jul 14, 2012
1 parent 359f498 commit f4ec867
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 27 deletions.
35 changes: 33 additions & 2 deletions Driver/Core/MongoCursorEnumerator.cs
Expand Up @@ -330,14 +330,45 @@ private void KillCursor()

private IMongoQuery WrapQuery()
{
if (_cursor.Options == null)
BsonDocument formattedReadPreference = null;
if (_serverInstance.InstanceType == MongoServerInstanceType.ShardRouter)
{
var readPreference = _cursor.ReadPreference;

BsonArray tagSetsArray = null;
if (readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary && readPreference.TagSets != null)
{
tagSetsArray = new BsonArray();
foreach (var tagSet in readPreference.TagSets)
{
var tagSetDocument = new BsonDocument();
foreach (var tag in tagSet)
{
tagSetDocument.Add(tag.Name, tag.Value);
}
tagSetsArray.Add(tagSetDocument);
}
}

formattedReadPreference = new BsonDocument
{
{ "mode", MongoUtils.ToCamelCase(readPreference.ReadPreferenceMode.ToString()) },
{ "tags", tagSetsArray, tagSetsArray != null } // optional
};
}

if (_cursor.Options == null && formattedReadPreference == null)
{
return _cursor.Query;
}
else
{
var query = (_cursor.Query == null) ? (BsonValue)new BsonDocument() : BsonDocumentWrapper.Create(_cursor.Query);
var wrappedQuery = new QueryDocument("$query", query);
var wrappedQuery = new QueryDocument
{
{ "$readPreference", formattedReadPreference, formattedReadPreference != null }, // only if sending query to a mongos
{ "$query", query }
};
wrappedQuery.Merge(_cursor.Options);
return wrappedQuery;
}
Expand Down
28 changes: 14 additions & 14 deletions Driver/Core/MongoServerInstance.cs
Expand Up @@ -83,7 +83,7 @@ public sealed class MongoServerInstance
private ReplicaSetInformation _replicaSetInformation;
private int _sequentialId;
private MongoServerState _state;
private MongoServerInstanceType _type;
private MongoServerInstanceType _instanceType;

// constructors
/// <summary>
Expand All @@ -99,7 +99,7 @@ internal MongoServerInstance(MongoServer server, MongoServerAddress address)
_maxDocumentSize = MongoDefaults.MaxDocumentSize;
_maxMessageLength = MongoDefaults.MaxMessageLength;
_state = MongoServerState.Disconnected;
_type = MongoServerInstanceType.Unknown;
_instanceType = MongoServerInstanceType.Unknown;
_connectionPool = new MongoConnectionPool(this);
_pingTimeAggregator = new PingTimeAggregator(5);
_stateVerificationTimer = new Timer(o => StateVerificationTimerCallback(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
Expand Down Expand Up @@ -127,15 +127,15 @@ internal ReplicaSetInformation ReplicaSetInformation
}

/// <summary>
/// Gets the type.
/// Gets the instance type.
/// </summary>
internal MongoServerInstanceType Type
internal MongoServerInstanceType InstanceType
{
get
{
lock (_serverInstanceLock)
{
return _type;
return _instanceType;
}
}
}
Expand Down Expand Up @@ -508,7 +508,7 @@ internal void SetState(MongoServerState state)
{
lock(_serverInstanceLock)
{
SetState(state, _type, _isPrimary, _isSecondary, _isPassive, _isArbiter, _replicaSetInformation);
SetState(state, _instanceType, _isPrimary, _isSecondary, _isPassive, _isArbiter, _replicaSetInformation);
}
}

Expand Down Expand Up @@ -546,17 +546,17 @@ private void LookupServerInformation(MongoConnection connection)
}

ReplicaSetInformation replicaSetInformation = null;
MongoServerInstanceType type = MongoServerInstanceType.StandAlone;
MongoServerInstanceType instanceType = MongoServerInstanceType.StandAlone;
if (isMasterResult.ReplicaSetName != null)
{
var tagSet = new ReplicaSetTagSet();
var peers = isMasterResult.Hosts.Concat(isMasterResult.Passives).Concat(isMasterResult.Arbiters).ToList();
replicaSetInformation = new ReplicaSetInformation(isMasterResult.ReplicaSetName, isMasterResult.Primary, peers, tagSet);
type = MongoServerInstanceType.ReplicaSetMember;
instanceType = MongoServerInstanceType.ReplicaSetMember;
}
else if (isMasterResult.Message != null && isMasterResult.Message == "isdbgrid")
{
type = MongoServerInstanceType.ShardRouter;
instanceType = MongoServerInstanceType.ShardRouter;
}

lock (_serverInstanceLock)
Expand All @@ -566,7 +566,7 @@ private void LookupServerInformation(MongoConnection connection)
_maxMessageLength = isMasterResult.MaxMessageLength;
_buildInfo = buildInfo;
this.SetState(MongoServerState.Connected,
type,
instanceType,
isMasterResult.IsPrimary,
isMasterResult.IsSecondary,
isMasterResult.IsPassive,
Expand All @@ -585,7 +585,7 @@ private void LookupServerInformation(MongoConnection connection)
_maxDocumentSize = MongoDefaults.MaxDocumentSize;
_maxMessageLength = MongoDefaults.MaxMessageLength;
_buildInfo = null;
this.SetState(MongoServerState.Disconnected, _type, false, false, false, false, null);
this.SetState(MongoServerState.Disconnected, _instanceType, false, false, false, false, null);
}
}
}
Expand Down Expand Up @@ -663,7 +663,7 @@ internal void StateVerificationTimerCallback()

private void SetState(
MongoServerState state,
MongoServerInstanceType type,
MongoServerInstanceType instanceType,
bool isPrimary,
bool isSecondary,
bool isPassive,
Expand All @@ -678,11 +678,11 @@ internal void StateVerificationTimerCallback()
{
replicaSetInformationIsDifferent = true;
}
if (_state != state || _type != type || replicaSetInformationIsDifferent || _isPrimary != isPrimary || _isSecondary != isSecondary || _isPassive != isPassive || _isArbiter != isArbiter)
if (_state != state || _instanceType != instanceType || replicaSetInformationIsDifferent || _isPrimary != isPrimary || _isSecondary != isSecondary || _isPassive != isPassive || _isArbiter != isArbiter)
{
changed = true;
_state = state;
_type = type;
_instanceType = instanceType;
if (_replicaSetInformation != replicaSetInformation)
{
_replicaSetInformation = replicaSetInformation;
Expand Down
5 changes: 3 additions & 2 deletions Driver/Core/ReadPreference.cs
Expand Up @@ -354,7 +354,7 @@ public bool MatchesInstance(MongoServerInstance instance)
throw new MongoInternalException("Invalid ReadPreferenceMode");
}

if (_tagSets != null && instance.Type == MongoServerInstanceType.ReplicaSetMember)
if (_tagSets != null && instance.InstanceType == MongoServerInstanceType.ReplicaSetMember)
{
var someSetMatches = false;
foreach (var tagSet in _tagSets)
Expand Down Expand Up @@ -387,7 +387,8 @@ public override string ToString()
}
else
{
return string.Format("{0}(tags: {1})}", _readPreferenceMode, _tagSets);
var tagSets = string.Format("[{0}]", string.Join(", ", _tagSets.Select(ts => ts.ToString()).ToArray()));
return string.Format("{0} (tags = {1})", _readPreferenceMode, tagSets);
}
}

Expand Down
2 changes: 1 addition & 1 deletion Driver/Core/ReplicaSetTag.cs
Expand Up @@ -134,7 +134,7 @@ public override int GetHashCode()
/// <returns>A string representation of the user.</returns>
public override string ToString()
{
return string.Format("{0}={1}", _name, _value);
return string.Format("{0}:{1}", _name, _value);
}

// private methods
Expand Down
4 changes: 2 additions & 2 deletions Driver/Core/ReplicaSetTagSet.cs
Expand Up @@ -223,7 +223,7 @@ public bool IsFrozen
public bool MatchesInstance(MongoServerInstance instance)
{
// an empty tag set matches anything
if (instance.Type != MongoServerInstanceType.ReplicaSetMember || _tags.Count == 0)
if (instance.InstanceType != MongoServerInstanceType.ReplicaSetMember || _tags.Count == 0)
{
return true;
}
Expand All @@ -246,7 +246,7 @@ public bool MatchesInstance(MongoServerInstance instance)
/// <returns>A string representation of the user.</returns>
public override string ToString()
{
return string.Join(", ", _tags.Select(t => t.ToString()).ToArray());
return string.Format("{{{0}}}", string.Join(", ", _tags.Select(t => t.ToString()).ToArray()));
}

// private methods
Expand Down
2 changes: 1 addition & 1 deletion Driver/Internal/DirectMongoServerProxy.cs
Expand Up @@ -160,7 +160,7 @@ public void Connect(TimeSpan timeout, ReadPreference readPreference)
}

if (_server.Settings.ReplicaSetName != null &&
(_instance.Type != MongoServerInstanceType.ReplicaSetMember || _instance.ReplicaSetInformation.Name != _server.Settings.ReplicaSetName))
(_instance.InstanceType != MongoServerInstanceType.ReplicaSetMember || _instance.ReplicaSetInformation.Name != _server.Settings.ReplicaSetName))
{
exceptions.Add(new MongoConnectionException(string.Format("The server '{0}' is not a member of replica set '{1}'.", address, _server.Settings.ReplicaSetName)));
continue;
Expand Down
6 changes: 3 additions & 3 deletions Driver/Internal/DiscoveringMongoServerProxy.cs
Expand Up @@ -308,15 +308,15 @@ private void Discover(TimeSpan timeout)
private void CreateActualProxy(MongoServerInstance instance, BlockingQueue<MongoServerInstance> connectionQueue)
{
// we are already in a write lock here...
if (instance.Type == MongoServerInstanceType.ReplicaSetMember)
if (instance.InstanceType == MongoServerInstanceType.ReplicaSetMember)
{
_serverProxy = new ReplicaSetMongoServerProxy(_server, _instances, connectionQueue, _connectionAttempt);
}
else if (instance.Type == MongoServerInstanceType.ShardRouter)
else if (instance.InstanceType == MongoServerInstanceType.ShardRouter)
{
_serverProxy = new ShardedMongoServerProxy(_server, _instances, connectionQueue, _connectionAttempt);
}
else if (instance.Type == MongoServerInstanceType.StandAlone)
else if (instance.InstanceType == MongoServerInstanceType.StandAlone)
{
var otherInstances = _instances.Where(x => x != instance).ToList();
foreach (var otherInstance in otherInstances)
Expand Down
2 changes: 1 addition & 1 deletion Driver/Internal/ReplicaSetMongoServerProxy.cs
Expand Up @@ -133,7 +133,7 @@ protected override MongoServerState DetermineServerState(MongoServerState curren
/// </returns>
protected override bool IsValidInstance(MongoServerInstance instance)
{
if (instance.Type != MongoServerInstanceType.ReplicaSetMember)
if (instance.InstanceType != MongoServerInstanceType.ReplicaSetMember)
{
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion Driver/Internal/ShardedMongoServerProxy.cs
Expand Up @@ -114,7 +114,7 @@ protected override MongoServerState DetermineServerState(MongoServerState curren
/// </returns>
protected override bool IsValidInstance(MongoServerInstance instance)
{
return instance.Type == MongoServerInstanceType.ShardRouter;
return instance.InstanceType == MongoServerInstanceType.ShardRouter;
}
}
}

0 comments on commit f4ec867

Please sign in to comment.