Navigation Menu

Skip to content

Commit

Permalink
JAVA-404: slaveOk support for inline mapreduce (routes to secondaries…
Browse files Browse the repository at this point in the history
…); changed CommandResult to include serverUsed, made readPref-Secondary set slaveOk query flag, toString/toJSON on Node/ReplicaSetStatus.
  • Loading branch information
scotthernandez committed Oct 22, 2011
1 parent a1692e0 commit 9573f64
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 82 deletions.
12 changes: 11 additions & 1 deletion src/main/com/mongodb/CommandResult.java
Expand Up @@ -18,12 +18,18 @@

package com.mongodb;


/**
* A simple wrapper for the result of getLastError() calls and other commands
*/
public class CommandResult extends BasicDBObject {

CommandResult() { }
CommandResult(ServerAddress srv) {
super();
_host = srv;
//so it is shown in toString/debug
put("serverUsed", srv.toString());
}

/**
* gets the "ok" field which is the result of the command
Expand Down Expand Up @@ -116,8 +122,12 @@ public void throwOnError() throws MongoException {
}
}

public ServerAddress getServerUsed() {
return _host;
}

DBObject _cmd;
ServerAddress _host = null;
private static final long serialVersionUID = 1L;

static class CommandFailure extends MongoException {
Expand Down
34 changes: 25 additions & 9 deletions src/main/com/mongodb/DB.java
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Set;

import com.mongodb.DBApiLayer.Result;
import com.mongodb.util.Util;

/**
Expand Down Expand Up @@ -139,33 +140,48 @@ public DBCollection getCollectionFromString( String s ){
* @throws MongoException
* @dochub commands
*/
public CommandResult command( DBObject cmd )
throws MongoException {
return command( cmd , 0 );
public CommandResult command( DBObject cmd ) throws MongoException{
return command( cmd, 0 );
}

/**
* Executes a database command.
* @see <a href="http://mongodb.onconfluence.com/display/DOCS/List+of+Database+Commands">List of Commands</a>
* @param cmd dbobject representing the command to execute
* @param options query options to use
* @param readPrefs ReadPreferences for this command (nodes selection is the biggest part of this)
* @return result of command from the database
* @dochub commands
* @throws MongoException
*/
public CommandResult command( DBObject cmd , int options )
public CommandResult command( DBObject cmd , int options, ReadPreference readPrefs )
throws MongoException {

// TODO - Is ReadPreference OK for commands?
Iterator<DBObject> i = getCollection("$cmd").__find(cmd, new BasicDBObject(), 0, -1, 0, options, null, DefaultDBDecoder.FACTORY.create() );
Iterator<DBObject> i = getCollection("$cmd").__find(cmd, new BasicDBObject(), 0, -1, 0, options, readPrefs , DefaultDBDecoder.FACTORY.create());
if ( i == null || ! i.hasNext() )
return null;

CommandResult res = (CommandResult)i.next();
res._cmd = cmd;
return res;
DBObject res = i.next();
ServerAddress sa = (i instanceof Result) ? ((Result) i).getServerAddress() : null;
CommandResult cr = new CommandResult(sa);
cr.putAll( res );
cr._cmd = cmd;
return cr;
}

/**
* Executes a database command.
* @see <a href="http://mongodb.onconfluence.com/display/DOCS/List+of+Database+Commands">List of Commands</a>
* @param cmd dbobject representing the command to execute
* @param options query options to use
* @return result of command from the database
* @dochub commands
* @throws MongoException
*/
public CommandResult command( DBObject cmd , int options )
throws MongoException {
return command(cmd, options, null);
}
/**
* Executes a database command.
* This method constructs a simple dbobject and calls {@link DB#command(com.mongodb.DBObject) }
Expand Down
2 changes: 1 addition & 1 deletion src/main/com/mongodb/DBCollection.java
Expand Up @@ -1049,7 +1049,7 @@ public MapReduceOutput mapReduce( MapReduceCommand command ) throws MongoExcepti
// if type in inline, then query options like slaveOk is fine
CommandResult res = null;
if (command.getOutputType() == MapReduceCommand.OutputType.INLINE)
res = _db.command( cmd, getOptions() );
res = _db.command( cmd, getOptions(), command.getReadPreference() != null ? command.getReadPreference() : getReadPreference() );
else
res = _db.command( cmd );
res.throwOnError();
Expand Down
77 changes: 41 additions & 36 deletions src/main/com/mongodb/DBPort.java
Expand Up @@ -67,16 +67,16 @@ public DBPort( ServerAddress addr ){
_decoder = _options.dbDecoderFactory.create();
}

Response call( OutMessage msg , DBCollection coll ) throws IOException {
return go( msg , coll );
Response call( OutMessage msg , DBCollection coll ) throws IOException{
return go( msg, coll );
}

Response call( OutMessage msg , DBCollection coll , DBDecoder decoder ) throws IOException {
return go( msg , coll , false , null , decoder );
Response call( OutMessage msg , DBCollection coll , DBDecoder decoder) throws IOException{
return go( msg, coll, false, null, decoder);
}

Response call( OutMessage msg , DBCollection coll , ReadPreference readPref, DBDecoder decoder ) throws IOException {
return go( msg , coll , false , readPref , decoder );
Response call( OutMessage msg , DBCollection coll , ReadPreference readPref , DBDecoder decoder) throws IOException{
return go( msg, coll, false, readPref, decoder);
}

void say( OutMessage msg )
Expand All @@ -86,15 +86,14 @@ void say( OutMessage msg )

private synchronized Response go( OutMessage msg , DBCollection coll )
throws IOException {
return go( msg , coll , false , null , null );
return go( msg , coll , false, null, null );
}

private synchronized Response go( OutMessage msg , DBCollection coll , DBDecoder decoder )
throws IOException {
return go( msg , coll , false , null , decoder );
private synchronized Response go( OutMessage msg , DBCollection coll , DBDecoder decoder ) throws IOException{
return go( msg, coll, false, null, decoder );
}

private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse , ReadPreference readPref , DBDecoder decoder )
private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse , ReadPreference readPref, DBDecoder decoder)
throws IOException {

if ( _processingResponse ){
Expand Down Expand Up @@ -137,49 +136,48 @@ private synchronized Response go( OutMessage msg , DBCollection coll , boolean f
}
}

synchronized CommandResult getLastError( DB db , WriteConcern concern) throws IOException {
DBApiLayer dbAL = (DBApiLayer) db;
return runCommand( dbAL , concern.getCommand() );
synchronized CommandResult getLastError( DB db , WriteConcern concern ) throws IOException{
DBApiLayer dbAL = (DBApiLayer) db;
return runCommand( dbAL, concern.getCommand() );
}

synchronized DBObject findOne( DB db , String coll , DBObject q ) throws IOException {
synchronized private Response findOne( DB db , String coll , DBObject q ) throws IOException {
OutMessage msg = OutMessage.query( db._mongo , 0 , db.getName() + "." + coll , 0 , -1 , q , null );

Response res = go( msg , db.getCollection( coll ) , DefaultDBDecoder.FACTORY.create() );
if ( res.size() == 0 )
return null;
if ( res.size() > 1 )
throw new MongoInternalException( "something is wrong. size:" + res.size() );
return res.get(0);
return res;
}

synchronized private Response findOne( String ns , DBObject q ) throws IOException{
OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null );
Response res = go( msg , null , true, null, DefaultDBDecoder.FACTORY.create() );
return res;
}

synchronized CommandResult runCommand( DB db , DBObject cmd ) throws IOException {
DBObject res = findOne( db , "$cmd" , cmd );
if ( res == null )
throw new MongoInternalException( "something is wrong, no command result" );
return (CommandResult)res;
Response res = findOne( db , "$cmd" , cmd );
return convertToCR( res );
}

synchronized DBObject findOne( String ns , DBObject q ) throws IOException{
OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null );
Response res = go( msg , null , true , null, DefaultDBDecoder.FACTORY.create() );
synchronized CommandResult runCommand( String db , DBObject cmd ) throws IOException {
Response res = findOne( db + ".$cmd" , cmd );
return convertToCR( res );
}

private CommandResult convertToCR(Response res) {
if ( res.size() == 0 )
return null;
if ( res.size() > 1 )
throw new MongoInternalException( "something is wrong. size:" + res.size() );
return res.get(0);
}

synchronized CommandResult runCommand( String db , DBObject cmd ) throws IOException {
DBObject res = findOne( db + ".$cmd" , cmd );
if ( res == null )
DBObject data = res.get(0);
if ( data == null )
throw new MongoInternalException( "something is wrong, no command result" );
CommandResult cr = new CommandResult();
cr.putAll( res );

CommandResult cr = new CommandResult(res.serverUsed());
cr.putAll( data );
return cr;
}


synchronized CommandResult tryGetLastError( DB db , long last, WriteConcern concern) throws IOException {
if ( last != _calls )
return null;
Expand Down Expand Up @@ -263,6 +261,13 @@ public String host(){
return _addr.toString();
}

/**
* @return the server address for this port
*/
public ServerAddress serverAddress() {
return _sa;
}

@Override
public String toString(){
return "{DBPort " + host() + "}";
Expand Down
6 changes: 3 additions & 3 deletions src/main/com/mongodb/DBTCPConnector.java
Expand Up @@ -18,8 +18,6 @@

package com.mongodb;

import com.mongodb.ReadPreference.*;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
Expand All @@ -28,6 +26,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import com.mongodb.ReadPreference.TaggedReadPreference;

public class DBTCPConnector implements DBConnector {

static Logger _logger = Logger.getLogger( Bytes.LOGGER.getName() + ".tcp" );
Expand Down Expand Up @@ -165,7 +165,7 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
if ( concern.raiseNetworkErrors() )
throw new MongoException.Network( "can't say something" , ioe );

CommandResult res = new CommandResult();
CommandResult res = new CommandResult(port.serverAddress());
res.put( "ok" , false );
res.put( "$err" , "NETWORK ERROR" );
return new WriteResult( res , concern );
Expand Down
2 changes: 0 additions & 2 deletions src/main/com/mongodb/DefaultDBCallback.java
Expand Up @@ -121,8 +121,6 @@ private DBObject _create( List<String> path ){
}
}

if ( _collection != null && _collection._name.equals( "$cmd" ) )
return new CommandResult();
return new BasicDBObject();
}

Expand Down
19 changes: 14 additions & 5 deletions src/main/com/mongodb/MapReduceOutput.java
Expand Up @@ -25,8 +25,8 @@
public class MapReduceOutput {

@SuppressWarnings("unchecked")
public MapReduceOutput( DBCollection from , DBObject cmd, BasicDBObject raw ){
_raw = raw;
public MapReduceOutput( DBCollection from , DBObject cmd, CommandResult raw ){
_commandResult = raw;
_cmd = cmd;

if ( raw.containsField( "results" ) ) {
Expand Down Expand Up @@ -80,19 +80,28 @@ public DBCollection getOutputCollection(){
return _coll;
}

@Deprecated
public BasicDBObject getRaw(){
return _raw;
return _commandResult;
}

public CommandResult getCommandResult(){
return _commandResult;
}

public DBObject getCommand() {
return _cmd;
}

public ServerAddress getServerUsed() {
return _commandResult.getServerUsed();
}

public String toString(){
return _raw.toString();
return _commandResult.toString();
}

final BasicDBObject _raw;
final CommandResult _commandResult;

final String _collname;
String _dbname = null;
Expand Down
25 changes: 11 additions & 14 deletions src/main/com/mongodb/OutMessage.java
Expand Up @@ -18,17 +18,13 @@

package com.mongodb;

import static org.bson.BSON.EOO;
import static org.bson.BSON.OBJECT;
import static org.bson.BSON.REF;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;

import org.bson.*;
import org.bson.BSONObject;
import org.bson.BasicBSONEncoder;
import org.bson.io.PoolOutputBuffer;
import org.bson.types.ObjectId;

class OutMessage extends BasicBSONEncoder {

Expand All @@ -44,8 +40,7 @@ static OutMessage query( Mongo m , int options , String ns , int numToSkip , int

static OutMessage query( Mongo m , int options , String ns , int numToSkip , int batchSize , DBObject query , DBObject fields, ReadPreference readPref, DBEncoder enc ){
OutMessage out = new OutMessage( m , 2004, enc );
out._appendQuery( options , ns , numToSkip , batchSize , query , fields );
out.setReadPreference( readPref );
out._appendQuery( options , ns , numToSkip , batchSize , query , fields, readPref);
return out;
}

Expand All @@ -69,9 +64,15 @@ static OutMessage query( Mongo m , int options , String ns , int numToSkip , int
this( m , enc );
reset( op );
}
private void _appendQuery( int options , String ns , int numToSkip , int batchSize , DBObject query , DBObject fields ){
private void _appendQuery( int options , String ns , int numToSkip , int batchSize , DBObject query , DBObject fields, ReadPreference readPref){
_queryOptions = options;
writeInt( options );
_readPref = readPref;

//If the readPrefs are non-null and non-primary, set slaveOk query option
if (!(_readPref instanceof ReadPreference.PrimaryReadPreference))
_queryOptions |= Bytes.QUERYOPTION_SLAVEOK;

writeInt( _queryOptions );
writeCString( ns );

writeInt( numToSkip );
Expand Down Expand Up @@ -149,10 +150,6 @@ public ReadPreference getReadPreference(){
return _readPref;
}

public void setReadPreference( ReadPreference readPref ){
_readPref = readPref;
}

private Mongo _mongo;
private PoolOutputBuffer _buffer;
private int _id;
Expand Down
2 changes: 1 addition & 1 deletion src/main/com/mongodb/ReadPreference.java
Expand Up @@ -13,7 +13,7 @@

package com.mongodb;

import java.util.*;
import java.util.Map;

public class ReadPreference {
public static class PrimaryReadPreference extends ReadPreference {
Expand Down

0 comments on commit 9573f64

Please sign in to comment.