Skip to content

Commit

Permalink
Validate async queries
Browse files Browse the repository at this point in the history
  • Loading branch information
methodmissing committed Oct 2, 2008
1 parent c186184 commit 85141ab
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 29 deletions.
2 changes: 1 addition & 1 deletion ext/extconf.rb
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@
end end
end end


create_makefile("mysql") create_makefile("mysql")
171 changes: 144 additions & 27 deletions ext/mysql.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct mysql {
char connection; char connection;
char query_with_result; char query_with_result;
char blocking; char blocking;
int async_in_progress;
}; };


struct mysql_res { struct mysql_res {
Expand Down Expand Up @@ -231,6 +232,57 @@ static VALUE init(VALUE klass)
return obj; return obj;
} }


static VALUE connection_identifier( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return mysql_thread_id( m );
}

static VALUE async_in_progress( VALUE obj )
{
struct mysql* m = GetMysqlStruct(obj);
return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse;
}

static VALUE async_in_progress_set( VALUE obj, VALUE flag )
{
struct mysql* m = GetMysqlStruct(obj);
m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj);
return flag;
}

static void optimize_for_async( VALUE obj )
{
struct mysql* m = GetMysqlStruct(obj);
my_bool was_blocking;
vio_blocking(m->handler.net.vio, 0, &was_blocking);
m->blocking = vio_is_blocking( m->handler.net.vio );

vio_fastsend( m->handler.net.vio );
async_in_progress_set( obj, Qfalse );
/*last_connection_identifier(obj);*/
}

static void schedule_connect(VALUE obj )
{
MYSQL* m = GetHandler(obj);
fd_set read;

struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 };

if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) {
rb_raise(eMysql, "connect: timeout");
}
/*
FD_ZERO(&read);
FD_SET(m->net.fd, &read);
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
rb_raise(eMysql, "connect: timeout");
}
*/
}

/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */ /* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
static VALUE real_connect(int argc, VALUE* argv, VALUE klass) static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
{ {
Expand Down Expand Up @@ -266,19 +318,16 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL) if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL)
#endif #endif
mysql_raise(&myp->handler); mysql_raise(&myp->handler);

myp->handler.reconnect = 0; myp->handler.reconnect = 0;
myp->connection = Qtrue; myp->connection = Qtrue;


my_bool was_blocking; optimize_for_async(obj);

vio_blocking(myp->handler.net.vio, 0, &was_blocking);
myp->blocking = vio_is_blocking( myp->handler.net.vio );

vio_fastsend( myp->handler.net.vio );


myp->query_with_result = Qtrue; myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv); rb_obj_call_init(obj, argc, argv);

schedule_connect(obj);


return obj; return obj;
} }
Expand Down Expand Up @@ -341,6 +390,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj)
mysql_raise(m); mysql_raise(m);
m->reconnect = 0; m->reconnect = 0;
GetMysqlStruct(obj)->connection = Qtrue; GetMysqlStruct(obj)->connection = Qtrue;

optimize_for_async(obj);
schedule_connect(obj);


return obj; return obj;
} }
Expand Down Expand Up @@ -764,13 +816,6 @@ static VALUE socket(VALUE obj)
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
return INT2NUM(m->net.fd); return INT2NUM(m->net.fd);
} }
/* socket_type */
static VALUE socket_type(VALUE obj)
{
MYSQL* m = GetHandler(obj);
VALUE description = vio_description( m->net.vio );
return NILorSTRING( description );
}


/* blocking */ /* blocking */
static VALUE blocking(VALUE obj){ static VALUE blocking(VALUE obj){
Expand All @@ -793,18 +838,81 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj )
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
} }


/* ready */
static VALUE ready( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return ( m->status == MYSQL_STATUS_READY ) ? Qtrue : Qfalse;
}

/* retry */
static VALUE retry( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse );
}

/* interrupted */
static VALUE interrupted( VALUE obj )
{
MYSQL* m = GetHandler(obj);
return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse );
}

static void validate_async_query( VALUE obj )
{
MYSQL* m = GetHandler(obj);

if( async_in_progress(obj) == Qtrue ){
async_in_progress_set(obj, Qfalse);
rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result.");
}
}

/*
static VALUE connection_dropped( VALUE obj )
{
MYSQL* m = GetHandler(obj);
if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){
return Qtrue;
}else{
return Qfalse;
}
}
*/

static void async_reconnect( VALUE obj )
{
connect(obj);
}

static VALUE simulate_disconnect( VALUE obj )
{
MYSQL* m = GetHandler(obj);
mysql_library_end();
return Qnil;
}

/* send_query(sql) */ /* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql) static VALUE send_query(VALUE obj, VALUE sql)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);

Check_Type(sql, T_STRING); Check_Type(sql, T_STRING);
if (GetMysqlStruct(obj)->connection == Qfalse) {
rb_raise(eMysql, "query: not connected"); if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) {
rb_raise(eMysql, "query: not connected");
} }
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0)
mysql_raise(m); validate_async_query(obj);
return Qnil;
if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){
mysql_raise(m);
/*( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m);*/
}

async_in_progress_set( obj, Qtrue );
return Qnil;
} }


/* get_result */ /* get_result */
Expand All @@ -814,16 +922,17 @@ static VALUE get_result(VALUE obj)
if (GetMysqlStruct(obj)->connection == Qfalse) { if (GetMysqlStruct(obj)->connection == Qfalse) {
rb_raise(eMysql, "query: not connected"); rb_raise(eMysql, "query: not connected");
} }
if (mysql_read_query_result(m) != 0) if (mysql_read_query_result(m) != 0)
mysql_raise(m); mysql_raise(m);
if (GetMysqlStruct(obj)->query_with_result == Qfalse) if (GetMysqlStruct(obj)->query_with_result == Qfalse)
return obj; return obj;
if (mysql_field_count(m) == 0) if (mysql_field_count(m) == 0)
return Qnil; return Qnil;
async_in_progress_set( obj, Qfalse );
return store_result(obj); return store_result(obj);
} }


static VALUE schedule(VALUE obj, VALUE timeout) static void schedule_query(VALUE obj, VALUE timeout)
{ {
MYSQL* m = GetHandler(obj); MYSQL* m = GetHandler(obj);
fd_set read; fd_set read;
Expand All @@ -838,7 +947,6 @@ static VALUE schedule(VALUE obj, VALUE timeout)
if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
rb_raise(eMysql, "query: timeout"); rb_raise(eMysql, "query: timeout");
} }

} }


/* async_query(sql,timeout=nil) */ /* async_query(sql,timeout=nil) */
Expand All @@ -849,9 +957,13 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj)


rb_scan_args(argc, argv, "11", &sql, &timeout); rb_scan_args(argc, argv, "11", &sql, &timeout);


/*last_connection_identifier( obj );*/

async_in_progress_set( obj, Qfalse );

send_query(obj,sql); send_query(obj,sql);


schedule(obj, timeout); schedule_query(obj, timeout);


return get_result(obj); return get_result(obj);
} }
Expand Down Expand Up @@ -2144,12 +2256,17 @@ void Init_mysql(void)
rb_define_method(cMysql, "query", query, 1); rb_define_method(cMysql, "query", query, 1);
rb_define_method(cMysql, "real_query", query, 1); rb_define_method(cMysql, "real_query", query, 1);
rb_define_method(cMysql, "c_async_query", async_query, -1); rb_define_method(cMysql, "c_async_query", async_query, -1);
rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0);
rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1);
rb_define_method(cMysql, "send_query", send_query, 1); rb_define_method(cMysql, "send_query", send_query, 1);
rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0);
rb_define_method(cMysql, "get_result", get_result, 0); rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1); rb_define_method(cMysql, "readable?", readable, -1);
rb_define_method(cMysql, "retry?", retry, 0);
rb_define_method(cMysql, "ready?", ready, 0);
rb_define_method(cMysql, "interrupted?", interrupted, 0);
rb_define_method(cMysql, "blocking?", blocking, 0); rb_define_method(cMysql, "blocking?", blocking, 0);
rb_define_method(cMysql, "socket", socket, 0); rb_define_method(cMysql, "socket", socket, 0);
rb_define_method(cMysql, "socket_type", socket_type, 0);
rb_define_method(cMysql, "refresh", refresh, 1); rb_define_method(cMysql, "refresh", refresh, 1);
rb_define_method(cMysql, "reload", reload, 0); rb_define_method(cMysql, "reload", reload, 0);
rb_define_method(cMysql, "select_db", select_db, 1); rb_define_method(cMysql, "select_db", select_db, 1);
Expand Down
2 changes: 1 addition & 1 deletion lib/mysqlplus.rb
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def all_hashes
each_hash { |row| rows << row } each_hash { |row| rows << row }
rows rows
end end
end end
26 changes: 26 additions & 0 deletions test/out_of_sync_test.rb
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,26 @@
require File.dirname(__FILE__) + '/test_helper'

m = Mysql.real_connect('localhost','root')
m.reconnect = true

class << m

def safe_query( query )
begin
send_query( query )
rescue => e
puts e.message
end
end

end

m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.simulate_disconnect #fires mysql_library_end
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.close
m.connect('localhost','root')
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises

1 comment on commit 85141ab

@rdp
Copy link
Collaborator

@rdp rdp commented on 85141ab Mar 7, 2009

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this one. Preserve sanity for end users by disallowing more than one query :P

Please sign in to comment.