Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Corrected version of improved BSONDecoder #10

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion eclipse-java-code-formatters.xml
Expand Up @@ -88,7 +88,7 @@
<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="0"/>
<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="tab"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
Expand Down
8 changes: 6 additions & 2 deletions src/main/com/mongodb/BasicDBObject.java
Expand Up @@ -20,10 +20,10 @@

import java.util.*;

import com.mongodb.util.*;

import org.bson.*;

import com.mongodb.util.*;

/**
* A simple implementation of <code>DBObject</code>.
* A <code>DBObject</code> can be created as follows, using this class:
Expand All @@ -39,6 +39,10 @@ public class BasicDBObject extends BasicBSONObject implements DBObject {
*/
public BasicDBObject(){
}

public BasicDBObject(int size){
super(size);
}

/**
* Convenience CTOR
Expand Down
6 changes: 4 additions & 2 deletions src/main/com/mongodb/BasicDBObjectBuilder.java
Expand Up @@ -68,8 +68,7 @@ public BasicDBObjectBuilder append( String key , Object val ){
* @return returns itself so you can chain .add( "a" , 1 ).add( "b" , 1 )
*/
public BasicDBObjectBuilder add( String key , Object val ){
_cur().put( key , val );
return this;
return append( key, val );
}

public BasicDBObjectBuilder push( String key ){
Expand All @@ -90,6 +89,9 @@ public DBObject get(){
return _stack.getFirst();
}

public boolean isEmpty(){
return ((BasicDBObject) _stack.getFirst()).size() == 0;
}
private DBObject _cur(){
return _stack.getLast();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/com/mongodb/DB.java
Expand Up @@ -480,7 +480,7 @@ public DB getSisterDB( String name ){
}

/**
* makes thisq query ok to run on a slave node
* makes this query ok to run on a slave node
*/
public void slaveOk(){
addOption( Bytes.QUERYOPTION_SLAVEOK );
Expand Down
2 changes: 1 addition & 1 deletion src/main/com/mongodb/DBCollection.java
Expand Up @@ -967,7 +967,7 @@ public WriteConcern getWriteConcern(){
}

/**
* makes thisq query ok to run on a slave node
* makes this query ok to run on a slave node
*/
public void slaveOk(){
addOption( Bytes.QUERYOPTION_SLAVEOK );
Expand Down
2 changes: 1 addition & 1 deletion src/main/com/mongodb/DBCursor.java
Expand Up @@ -230,7 +230,7 @@ public DBCursor skip( int n ){
}

/**
* makes thisq query ok to run on a slave node
* makes this query ok to run on a slave node
*/
public void slaveOk(){
addOption( Bytes.QUERYOPTION_SLAVEOK );
Expand Down
34 changes: 33 additions & 1 deletion src/main/com/mongodb/DBPort.java
Expand Up @@ -62,6 +62,11 @@ void say( OutMessage msg )

private synchronized Response go( OutMessage msg , DBCollection coll )
throws IOException {
return go( msg , coll , false );
}

private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse )
throws IOException {

if ( _processingResponse ){
if ( coll == null ){
Expand All @@ -88,7 +93,7 @@ private synchronized Response go( OutMessage msg , DBCollection coll )
if ( _pool != null )
_pool._everWorked = true;

if ( coll == null )
if ( coll == null && ! forceReponse )
return null;

_processingResponse = true;
Expand Down Expand Up @@ -132,6 +137,33 @@ synchronized CommandResult runCommand( DB db , DBObject cmd ) {
return (CommandResult)res;
}

synchronized DBObject findOne( String ns , DBObject q ){
OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null );

try {
Response res = go( msg , null , true );
if ( res.size() == 0 )
return null;
if ( res.size() > 1 )
throw new MongoInternalException( "something is wrong. size:" + res.size() );
return res.get(0);
}
catch ( IOException ioe ){
throw new MongoInternalException( "DBPort.findOne failed" , ioe );
}

}

synchronized CommandResult runCommand( String db , DBObject cmd ) {
DBObject res = findOne( db + ".$cmd" , cmd );
if ( res == null )
throw new MongoInternalException( "something is wrong, no command result" );
CommandResult cr = new CommandResult();
cr.putAll( res );
return cr;
}


synchronized CommandResult tryGetLastError( DB db , long last, WriteConcern concern){
if ( last != _calls )
return null;
Expand Down
7 changes: 2 additions & 5 deletions src/main/com/mongodb/DBTCPConnector.java
Expand Up @@ -32,15 +32,14 @@ class DBTCPConnector implements DBConnector {

public DBTCPConnector( Mongo m , ServerAddress addr )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( addr );

_createLogger.info( addr.toString() );

if ( addr.isPaired() ){
_allHosts = new ArrayList<ServerAddress>( addr.explode() );
_rsStatus = new ReplicaSetStatus( m , _allHosts , this );
_rsStatus = new ReplicaSetStatus( _allHosts );
_createLogger.info( "switching to replica set mode : " + _allHosts + " -> " + _curMaster );
}
else {
Expand All @@ -58,12 +57,11 @@ public DBTCPConnector( Mongo m , ServerAddress ... all )

public DBTCPConnector( Mongo m , List<ServerAddress> all )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( all );

_allHosts = new ArrayList<ServerAddress>( all ); // make a copy so it can't be modified
_rsStatus = new ReplicaSetStatus( m , _allHosts , this );
_rsStatus = new ReplicaSetStatus( _allHosts );

_createLogger.info( all + " -> " + _curMaster );
}
Expand Down Expand Up @@ -353,7 +351,6 @@ public void close(){
_rsStatus.close();
}

final Mongo _mongo;
private ServerAddress _curMaster;
private DBPortPool _curPortPool;
private DBPortPool.Holder _portHolder;
Expand Down
98 changes: 70 additions & 28 deletions src/main/com/mongodb/Mongo.java
Expand Up @@ -22,7 +22,6 @@
import java.util.*;
import java.util.concurrent.*;

import org.bson.util.*;
import org.bson.io.*;

/**
Expand Down Expand Up @@ -77,31 +76,6 @@ public static DB connect( DBAddress addr ){
return new Mongo( addr ).getDB( addr.getDBName() );
}

public static Mongo getStaticMongo( String host )
throws UnknownHostException , MongoException {
return getStaticMongo( host , null );
}

private static final MongoOptions _defaultOptions = new MongoOptions();

public static Mongo getStaticMongo( String host , MongoOptions options )
throws UnknownHostException , MongoException {

final String key = host + "-" + options;

Mongo m = _mongos.get( key );
if ( m != null )
return m;

m = new Mongo( host , options == null ? _defaultOptions : options );
Mongo temp = _mongos.putIfAbsent( key , m );
if ( temp != null ){
m.close();
return temp;
}
return m;
}

public Mongo()
throws UnknownHostException , MongoException {
this( new ServerAddress() );
Expand Down Expand Up @@ -215,6 +189,30 @@ public Mongo( List<ServerAddress> replicaSetSeeds , MongoOptions options )
_connector.checkMaster();
}

public Mongo( MongoURI uri )
throws MongoException , UnknownHostException {

_options = uri.getOptions();

if ( uri.getHosts().size() == 1 ){
_addr = new ServerAddress( uri.getHosts().get(0) );
_addrs = null;
_connector = new DBTCPConnector( this , _addr );
}
else {
List<ServerAddress> replicaSetSeeds = new ArrayList<ServerAddress>( uri.getHosts().size() );
for ( String host : uri.getHosts() )
replicaSetSeeds.add( new ServerAddress( host ) );
_addr = null;
_addrs = replicaSetSeeds;
_connector = new DBTCPConnector( this , replicaSetSeeds );
}

_connector.checkMaster();


}

public DB getDB( String dbname ){

DB db = _dbs.get( dbname );
Expand Down Expand Up @@ -316,7 +314,7 @@ public WriteConcern getWriteConcern(){
}

/**
* makes thisq query ok to run on a slave node
* makes this query ok to run on a slave node
*/
public void slaveOk(){
addOption( Bytes.QUERYOPTION_SLAVEOK );
Expand Down Expand Up @@ -357,5 +355,49 @@ protected PoolOutputBuffer createNew(){
};


private static final ConcurrentMap<String,Mongo> _mongos = new ConcurrentHashMap<String,Mongo>();
// -------


/**
* Mongo.Holder is if you want to have a static place to hold instances of Mongo
* security is not enforced at this level, so need to do on your side
*/
public static class Holder {

public Mongo connect( MongoURI uri )
throws MongoException , UnknownHostException {

String key = _toKey( uri );

Mongo m = _mongos.get(key);
if ( m != null )
return m;

m = new Mongo( uri );

Mongo temp = _mongos.putIfAbsent( key , m );
if ( temp == null ){
// ours got in
return m;
}

// there was a race and we lost
// close ours and return the other one
m.close();
return temp;
}

String _toKey( MongoURI uri ){
StringBuilder buf = new StringBuilder();
for ( String h : uri.getHosts() )
buf.append( h ).append( "," );
buf.append( uri.getOptions() );
buf.append( uri.getUsername() );
return buf.toString();
}


private static final ConcurrentMap<String,Mongo> _mongos = new ConcurrentHashMap<String,Mongo>();

}
}
14 changes: 8 additions & 6 deletions src/main/com/mongodb/MongoOptions.java
Expand Up @@ -18,6 +18,8 @@

package com.mongodb;

import java.net.*;

/**
* Various settings for the driver
*/
Expand All @@ -37,8 +39,8 @@ public void reset(){
}

/**
number of connections allowed per host
will block if run out
<p>The number of connections allowed per host (the pool size, per host)</p>
<p>Once the pool is exhausted, this will block. See {@linkplain MongoOptions.threadsAllowedToBlockForConnectionMultiplier}</p>
*/
public int connectionsPerHost;

Expand All @@ -51,22 +53,22 @@ public void reset(){
public int threadsAllowedToBlockForConnectionMultiplier;

/**
* max wait time of a blocking thread for a connection
* The max wait time for a blocking thread for a connection from the pool
*/
public int maxWaitTime;

/**
connect timeout in milliseconds. 0 is default and infinite
The connection timeout in milliseconds; this is for establishing the socket connections (open). 0 is default and infinite
*/
public int connectTimeout;

/**
socket timeout. 0 is default and infinite
The socket timeout; this value is passed to {@link Socket.setSoTimeout}. 0 is default and infinite
*/
public int socketTimeout;

/**
this controls whether or not on a connect, the system retries automatically. defaults to false
This controls whether the system retries automatically on connection errors. defaults to false
*/
public boolean autoConnectRetry;

Expand Down