Skip to content

Commit

Permalink
++
Browse files Browse the repository at this point in the history
  • Loading branch information
ValeriyBushenev committed Aug 19, 2011
1 parent 7e4dd5e commit 53d3e7f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 130 deletions.
253 changes: 124 additions & 129 deletions src/mongo.d
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -115,22 +115,6 @@ void mongo_init( mongo *conn ) {
conn.op_timeout_ms = 0; conn.op_timeout_ms = 0;
} }


int mongo_connect( mongo *conn , char *host, int port ) {
conn.primary = cast (mongo_host_port *)bson_malloc( mongo_host_port.sizeof );
strncpy( conn.primary.host, host, strlen( host ) + 1 );
conn.primary.port = port;
conn.primary.next = null;

mongo_init( conn );
if( mongo_socket_connect( conn, host, port ) != MONGO_OK )
return MONGO_ERROR;

if( mongo_check_is_master( conn ) != MONGO_OK )
return MONGO_ERROR;
else
return MONGO_OK;
}

void mongo_replset_init( mongo *conn, char *name ) { void mongo_replset_init( mongo *conn, char *name ) {
mongo_init( conn ); mongo_init( conn );


Expand All @@ -144,22 +128,6 @@ void mongo_replset_init( mongo *conn, char *name ) {
conn.primary = cast (mongo_host_port *)bson_malloc( mongo_host_port.sizeof ); conn.primary = cast (mongo_host_port *)bson_malloc( mongo_host_port.sizeof );
} }


static void mongo_replset_add_node( mongo_host_port **list, char *host, int port ) {
mongo_host_port *host_port = cast(mongo_host_port*)bson_malloc( mongo_host_port.sizeof );
host_port.port = port;
host_port.next = null;
strncpy( host_port.host, host, strlen( host ) + 1 );

if( *list == null )
*list = host_port;
else {
mongo_host_port *p = *list;
while( p.next != null )
p = p.next;
p.next = host_port;
}
}

static void mongo_replset_free_list( mongo_host_port **list ) { static void mongo_replset_free_list( mongo_host_port **list ) {
mongo_host_port *node = *list; mongo_host_port *node = *list;
mongo_host_port *prev; mongo_host_port *prev;
Expand All @@ -173,80 +141,6 @@ static void mongo_replset_free_list( mongo_host_port **list ) {
*list = null; *list = null;
} }


void mongo_replset_add_seed( mongo *conn, char *host, int port ) {
mongo_replset_add_node( &conn.replset.seeds, host, port );
}

void mongo_parse_host( char *host_string, mongo_host_port *host_port ) {
int len, idx, split;
len = split = idx = 0;

/* Split the host_port string at the ':' */
while( 1 ) {
if( *( host_string + len ) == '\0' )
break;
if( *( host_string + len ) == ':' )
split = len;

len++;
}

/* If 'split' is set, we know the that port exists;
* Otherwise, we set the default port. */
idx = split ? split : len;
memcpy( host_port.host, host_string, idx );
memcpy( cast(char*)host_port.host + idx, cast(char*)"\0", 1 );
if( split )
host_port.port = atoi( host_string + idx + 1 );
else
host_port.port = MONGO_DEFAULT_PORT;
}

static void mongo_replset_check_seed( mongo *conn ) {
bson _out;
bson hosts;
char *data;
bson_iterator it;
bson_iterator it_sub;
char *host_string;
mongo_host_port *host_port = null;

_out.data = null;

hosts.data = null;

if( mongo_simple_int_command( conn, cast(char*) "admin", cast(char*) "ismaster", 1, &_out ) == MONGO_OK ) {

if( bson_find( &it, &_out, cast(char*) "hosts" ) ) {
data = bson_iterator_value( &it );
bson_iterator_from_buffer( &it_sub, data );

/* Iterate over host list, adding each host to the
* connection's host list. */
while( bson_iterator_next( &it_sub ) ) {
host_string = bson_iterator_string( &it_sub );

host_port = cast(mongo_host_port*)bson_malloc( mongo_host_port.sizeof );
mongo_parse_host( host_string, host_port );

if( host_port ) {
mongo_replset_add_node( &conn.replset.hosts,
host_port.host, host_port.port );

bson_free( host_port );
host_port = null;
}
}
}
}

bson_destroy( &_out );
bson_destroy( &hosts );
mongo_close_socket( conn.sock );
conn.sock = null;
conn.connected = 0;

}


/* Find _out whether the current connected node is master, and /* Find _out whether the current connected node is master, and
* verify that the node's replica set name matched the provided name * verify that the node's replica set name matched the provided name
Expand Down Expand Up @@ -298,7 +192,7 @@ int mongo_replset_connect( mongo *conn ) {
*/ */
node = conn.replset.seeds; node = conn.replset.seeds;
while( node != null ) { while( node != null ) {
res = mongo_socket_connect( conn, cast( char * )&node.host, node.port ); res = mongo_socket_connect( conn, node.host, node.port );
if( res != MONGO_OK ) if( res != MONGO_OK )
return MONGO_ERROR; return MONGO_ERROR;


Expand All @@ -318,7 +212,7 @@ int mongo_replset_connect( mongo *conn ) {
node = conn.replset.hosts; node = conn.replset.hosts;


while( node != null ) { while( node != null ) {
res = mongo_socket_connect( conn, cast( char * )&node.host, node.port ); res = mongo_socket_connect( conn, node.host, node.port );


if( res == MONGO_OK ) { if( res == MONGO_OK ) {
if( mongo_replset_check_host( conn ) != MONGO_OK ) if( mongo_replset_check_host( conn ) != MONGO_OK )
Expand Down Expand Up @@ -1216,8 +1110,8 @@ int mongo_read_response(mongo* conn, mongo_reply** reply, bool retry = false)
{ {
if(res != MONGO_OK) if(res != MONGO_OK)
{ {
bson_free(_out); bson_free(_out);
} }
} }
} }
} }
Expand Down Expand Up @@ -1296,25 +1190,6 @@ void mongo_close_socket(Socket sock)
sock.close(); sock.close();
} }


int mongo_socket_connect(mongo* conn, char* host, int port)
{
conn.sock = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
InternetAddress addr = new InternetAddress(cast(string) host[0 .. strlen(host)], cast(ushort) port);
conn.sock.connect(addr);

if(conn.sock.isAlive())
{
conn.sock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
return MONGO_OK;
} else
{
conn.sock = null;
conn.err = mongo_error_t.MONGO_CONN_FAIL;
printf("mongo_socket_connect is fail\n");
return MONGO_ERROR;
}
}

int mongo_write_socket(mongo* conn, void* buf, int len) int mongo_write_socket(mongo* conn, void* buf, int len)
{ {
char* cbuf = cast(char*) buf; char* cbuf = cast(char*) buf;
Expand Down Expand Up @@ -1378,6 +1253,42 @@ int mongo_set_socket_op_timeout(mongo* conn, int millis)
return MONGO_OK; return MONGO_OK;
} }


int mongo_socket_connect(mongo* conn, string host, int port)
{
conn.sock = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
InternetAddress addr = new InternetAddress(host, cast(ushort) port);
conn.sock.connect(addr);

if(conn.sock.isAlive())
{
conn.sock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
return MONGO_OK;
} else
{
conn.sock = null;
conn.err = mongo_error_t.MONGO_CONN_FAIL;
printf("mongo_socket_connect is fail\n");
return MONGO_ERROR;
}
}

int mongo_connect(mongo* conn, string host, int port)
{
conn.primary = cast(mongo_host_port*) bson_malloc(mongo_host_port.sizeof);
conn.primary.host = host;
conn.primary.port = port;
conn.primary.next = null;

mongo_init(conn);
if(mongo_socket_connect(conn, host, port) != MONGO_OK)
return MONGO_ERROR;

if(mongo_check_is_master(conn) != MONGO_OK)
return MONGO_ERROR;
else
return MONGO_OK;
}

int mongo_reconnect(mongo* conn) int mongo_reconnect(mongo* conn)
{ {
int res; int res;
Expand Down Expand Up @@ -1412,3 +1323,87 @@ void mongo_disconnect(mongo* conn)
conn.connected = 0; conn.connected = 0;
} }


static void mongo_replset_add_node( mongo_host_port **list, string host, int port ) {
mongo_host_port *host_port = cast(mongo_host_port*)bson_malloc( mongo_host_port.sizeof );
host_port.port = port;
host_port.host = host;
host_port.next = null;

if( *list == null )
*list = host_port;
else {
mongo_host_port *p = *list;
while( p.next != null )
p = p.next;
p.next = host_port;
}
}

void mongo_parse_host( string host_string, mongo_host_port *host_port ) {

string[] host_port_s = std.string.split(host_string,":");

if (host_port_s.length == 2)
{
host_port.host = host_port_s[0];
host_port.port = atoi( cast(char*)host_port_s[1]);
}
else if (host_port_s.length == 1)
{
host_port.host = host_string;
host_port.port = MONGO_DEFAULT_PORT;
}

}

void mongo_replset_add_seed( mongo *conn, string host, int port )
{
mongo_replset_add_node( &conn.replset.seeds, host, port );
}

static void mongo_replset_check_seed( mongo *conn ) {
bson _out;
bson hosts;
char *data;
bson_iterator it;
bson_iterator it_sub;
char *host_string;
mongo_host_port *host_port = null;

_out.data = null;

hosts.data = null;

if( mongo_simple_int_command( conn, cast(char*) "admin", cast(char*) "ismaster", 1, &_out ) == MONGO_OK ) {

if( bson_find( &it, &_out, cast(char*) "hosts" ) ) {
data = bson_iterator_value( &it );
bson_iterator_from_buffer( &it_sub, data );

/* Iterate over host list, adding each host to the
* connection's host list. */
while( bson_iterator_next( &it_sub ) ) {
host_string = bson_iterator_string( &it_sub );

host_port = cast(mongo_host_port*)bson_malloc( mongo_host_port.sizeof );
mongo_parse_host( cast(string)host_string[0..strlen (host_string)], host_port );

if( host_port ) {
mongo_replset_add_node( &conn.replset.hosts,
host_port.host, host_port.port );

bson_free( host_port );
host_port = null;
}
}
}
}

bson_destroy( &_out );
bson_destroy( &hosts );
mongo_close_socket( conn.sock );
conn.sock = null;
conn.connected = 0;

}

2 changes: 1 addition & 1 deletion src/mongo_h.d
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct mongo_reply{
//#pragma pack() //#pragma pack()


struct mongo_host_port { struct mongo_host_port {
char host[255]; string host;
int port; int port;
mongo_host_port *next; mongo_host_port *next;
} ; } ;
Expand Down

0 comments on commit 53d3e7f

Please sign in to comment.