Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

overcome a merge conflict which wasn't one

  • Loading branch information...
commit e4bb045695d34bdb83a68923d5ab1f65487ba858 2 parents 497be9c + 79f7429
@rdp rdp authored
View
2  ext/extconf.rb
@@ -87,4 +87,4 @@
end
end
-create_makefile("mysql")
+create_makefile("mysql")
View
325 ext/mysql.c
@@ -61,7 +61,10 @@ struct mysql {
MYSQL handler;
char connection;
char query_with_result;
+ char gc_disabled;
char blocking;
+ int async_in_progress;
+ char busy;
};
struct mysql_res {
@@ -181,7 +184,7 @@ static void mysql_raise(MYSQL* m)
rb_exc_raise(e);
}
-static VALUE mysqlres2obj(MYSQL_RES* res)
+static VALUE mysqlres2obj(MYSQL_RES* res, VALUE gc_disabled)
{
VALUE obj;
struct mysql_res* resp;
@@ -191,10 +194,16 @@ static VALUE mysqlres2obj(MYSQL_RES* res)
resp->res = res;
resp->freed = Qfalse;
rb_obj_call_init(obj, 0, NULL);
+<<<<<<< HEAD:ext/mysql.c
/* disabled until it can be reviewed further--rely on the normal GC for now.
if (++store_result_count > GC_STORE_RESULT_LIMIT)
rb_gc();
*/
+=======
+ if (++store_result_count > GC_STORE_RESULT_LIMIT && gc_disabled == Qfalse){
+ rb_gc();
+ }
+>>>>>>> with_async_validation:ext/mysql.c
return obj;
}
@@ -230,10 +239,12 @@ static VALUE init(VALUE klass)
mysql_init(&myp->handler);
myp->connection = Qfalse;
myp->query_with_result = Qtrue;
+ myp->gc_disabled = Qfalse;
rb_obj_call_init(obj, 0, NULL);
return obj;
}
+<<<<<<< HEAD:ext/mysql.c
#ifdef HAVE_TBR
typedef struct
@@ -297,6 +308,58 @@ static void call_single_function_rb_thread_blocking_region(void *arg_holder_in)
#endif
+=======
+static VALUE connection_identifier( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ return mysql_thread_id( m );
+}
+
+static VALUE async_in_progress( VALUE obj )
+{
+ struct mysql* m = GetMysqlStruct(obj);
+ return ( m->async_in_progress == connection_identifier(obj) ) ? Qtrue : Qfalse;
+}
+
+static VALUE async_in_progress_set( VALUE obj, VALUE flag )
+{
+ struct mysql* m = GetMysqlStruct(obj);
+ m->async_in_progress = (flag == Qnil || flag == Qfalse) ? 0 : connection_identifier(obj);
+ return flag;
+}
+
+static void optimize_for_async( VALUE obj )
+{
+ struct mysql* m = GetMysqlStruct(obj);
+ my_bool was_blocking;
+ vio_blocking(m->handler.net.vio, 0, &was_blocking);
+ m->blocking = vio_is_blocking( m->handler.net.vio );
+
+ vio_fastsend( m->handler.net.vio );
+ async_in_progress_set( obj, Qfalse );
+}
+
+static void schedule_connect(VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ fd_set read;
+
+ struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 };
+
+ if (rb_thread_select(0, NULL, NULL, NULL, &tv) < 0) {
+ rb_raise(eMysql, "connect: timeout");
+ }
+/*
+ FD_ZERO(&read);
+ FD_SET(m->net.fd, &read);
+
+ if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
+ rb_raise(eMysql, "connect: timeout");
+ }
+*/
+}
+
+>>>>>>> with_async_validation:ext/mysql.c
/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets run */
{
@@ -336,19 +399,16 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) /* actually gets r
if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL)
#endif
mysql_raise(&myp->handler);
-
+
myp->handler.reconnect = 0;
myp->connection = Qtrue;
- my_bool was_blocking;
-
- vio_blocking(myp->handler.net.vio, 0, &was_blocking);
- myp->blocking = vio_is_blocking( myp->handler.net.vio );
-
- vio_fastsend( myp->handler.net.vio );
+ optimize_for_async(obj);
myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv);
+
+ schedule_connect(obj);
return obj;
}
@@ -411,6 +471,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj)
mysql_raise(m);
m->reconnect = 0;
GetMysqlStruct(obj)->connection = Qtrue;
+
+ optimize_for_async(obj);
+ schedule_connect(obj);
return obj;
}
@@ -665,7 +728,7 @@ static VALUE list_fields(int argc, VALUE* argv, VALUE obj)
res = mysql_list_fields(m, StringValuePtr(table), NILorSTRING(field));
if (res == NULL)
mysql_raise(m);
- return mysqlres2obj(res);
+ return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
}
/* list_processes() */
@@ -675,7 +738,7 @@ static VALUE list_processes(VALUE obj)
MYSQL_RES* res = mysql_list_processes(m);
if (res == NULL)
mysql_raise(m);
- return mysqlres2obj(res);
+ return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
}
/* list_tables(table=nil) */
@@ -793,8 +856,12 @@ static VALUE store_result(VALUE obj)
if (res == NULL)
mysql_raise(m);
+<<<<<<< HEAD:ext/mysql.c
return mysqlres2obj(res);
+=======
+ return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
+>>>>>>> with_async_validation:ext/mysql.c
}
/* thread_id() */
@@ -810,7 +877,7 @@ static VALUE use_result(VALUE obj)
MYSQL_RES* res = mysql_use_result(m);
if (res == NULL)
mysql_raise(m);
- return mysqlres2obj(res);
+ return mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
}
static VALUE res_free(VALUE);
@@ -857,7 +924,7 @@ static VALUE query(VALUE obj, VALUE sql)
if (mysql_field_count(m) != 0)
mysql_raise(m);
} else {
- VALUE robj = mysqlres2obj(res);
+ VALUE robj = mysqlres2obj(res, GetMysqlStruct(obj)->gc_disabled);
rb_ensure(rb_yield, robj, res_free, robj);
}
#if MYSQL_VERSION_ID >= 40101
@@ -893,6 +960,7 @@ static VALUE socket(VALUE obj)
MYSQL* m = GetHandler(obj);
return INT2NUM(m->net.fd);
}
+<<<<<<< HEAD:ext/mysql.c
/* socket_type --currently returns true or false, needs some work */
static VALUE socket_type(VALUE obj)
{
@@ -905,10 +973,38 @@ static VALUE socket_type(VALUE obj)
else
return Qnil;
}
+=======
+>>>>>>> with_async_validation:ext/mysql.c
/* blocking */
static VALUE blocking(VALUE obj){
- return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse );
+ return ( GetMysqlStruct(obj)->blocking ? Qtrue : Qfalse );
+}
+
+/* is_busy */
+static VALUE is_busy(VALUE obj){
+ return ( GetMysqlStruct(obj)->busy ? Qtrue : Qfalse );
+}
+
+static VALUE is_idle(VALUE obj){
+ return ( is_busy(obj) == Qtrue ) ? Qfalse : Qtrue;
+}
+
+/* busy(true|false) */
+static VALUE busy_set(VALUE obj, VALUE flag)
+{
+ if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE)
+ rb_raise(rb_eTypeError, "invalid type, required true or false.");
+ GetMysqlStruct(obj)->busy = flag;
+ return flag;
+}
+
+static void busy( VALUE obj ){
+ busy_set( obj, Qtrue );
+}
+
+static void idle( VALUE obj ){
+ busy_set( obj, Qfalse );
}
/* readable(timeout=nil) */
@@ -927,65 +1023,214 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj )
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
}
+/* retry */
+static VALUE retry( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ return ( vio_should_retry( m->net.vio ) == 1 ? Qtrue : Qfalse );
+}
+
+/* interrupted */
+static VALUE interrupted( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ return ( vio_was_interrupted( m->net.vio ) == 1 ? Qtrue : Qfalse );
+}
+
+/* reconnected */
+static VALUE reconnected( VALUE obj ){
+ MYSQL* m = GetHandler(obj);
+ int current_connection_id = mysql_thread_id( m );
+ mysql_ping(m);
+ return ( current_connection_id == mysql_thread_id( m ) ) ? Qfalse : Qtrue;
+}
+
+/* disable_gc(true|false) */
+static VALUE disable_gc_set(VALUE obj, VALUE flag)
+{
+ if (TYPE(flag) != T_TRUE && TYPE(flag) != T_FALSE)
+ rb_raise(rb_eTypeError, "invalid type, required true or false.");
+ GetMysqlStruct(obj)->gc_disabled = flag;
+ return flag;
+}
+
+/* gc_disabled */
+static VALUE gc_disabled( VALUE obj ){
+ return GetMysqlStruct(obj)->gc_disabled ? Qtrue: Qfalse;
+}
+
+static void validate_async_query( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+
+ if( async_in_progress(obj) == Qtrue ){
+ async_in_progress_set(obj, Qfalse);
+ rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result.");
+ }
+}
+
+static void simulate_disconnect( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ mysql_library_end();
+}
+
+static int begins_with_insensitive(char *candidate, char *check_for_in_upper_case)
+{
+ /* skip opening whitespace --tab is 11, newline is 12, cr is 15, space 32 */
+ char *where_at = candidate;
+ while( ((*where_at >= 11 && *where_at <= 15) || (*where_at == 32)) && (where_at != 0))
+ where_at++;
+
+ char *where_at_in_test = check_for_in_upper_case;
+ while(*where_at_in_test)
+ {
+ int candidate_char = *where_at;
+ if(candidate_char == 0)
+ return 0; /* end of line */
+ if(candidate_char >= 97 && candidate_char < 122) /* then it's upper case --lower case ify it */
+ candidate_char -= 32;
+ if(candidate_char != *where_at_in_test)
+ return 0;
+ where_at_in_test++;
+ where_at++;
+ }
+ return 1;
+}
+
/* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql)
{
MYSQL* m = GetHandler(obj);
-
+
Check_Type(sql, T_STRING);
- if (GetMysqlStruct(obj)->connection == Qfalse) {
- rb_raise(eMysql, "query: not connected");
+
+ if (GetMysqlStruct(obj)->connection == Qfalse && async_in_progress(obj) == Qtrue ) {
+ idle( obj );
+ rb_raise(eMysql, "query: not connected");
}
- if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0)
- mysql_raise(m);
- return Qnil;
+
+ validate_async_query(obj);
+
+ if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){
+ idle( obj );
+ mysql_raise(m);
+ }
+
+ /* what about http://dev.mysql.com/doc/refman/5.0/en/implicit-commit.html and more? */
+ if(
+ begins_with_insensitive(RSTRING_PTR(sql), "SET ") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "BEGIN") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "START TRANSACTION") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "ROLLBACK") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "LOCK ") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "UNLOCK ") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "USE ") ||
+ begins_with_insensitive(RSTRING_PTR(sql), "COMMIT") )
+ {
+ /* do not mark an async in progress --they used send_query for something that doesn't necessarily have a result--is this allowable? */
+ async_in_progress_set( obj, Qfalse );
+ } else {
+ async_in_progress_set( obj, Qtrue );
+ }
+
+ return Qnil;
}
/* get_result */
static VALUE get_result(VALUE obj)
{
MYSQL* m = GetHandler(obj);
+
+ async_in_progress_set( obj, Qfalse );
+
if (GetMysqlStruct(obj)->connection == Qfalse) {
- rb_raise(eMysql, "query: not connected");
+ idle( obj );
+ rb_raise(eMysql, "query: not connected");
}
- if (mysql_read_query_result(m) != 0)
- mysql_raise(m);
+ if (mysql_read_query_result(m) != 0){
+ idle( obj );
+ mysql_raise(m);
+ }
+
if (GetMysqlStruct(obj)->query_with_result == Qfalse)
return obj;
if (mysql_field_count(m) == 0)
- return Qnil;
- return store_result(obj);
+ return Qnil;
+ return store_result(obj);
}
+<<<<<<< HEAD:ext/mysql.c
static void schedule(VALUE obj, VALUE timeout)
+=======
+static void schedule_query(VALUE obj, VALUE timeout)
+>>>>>>> with_async_validation:ext/mysql.c
{
MYSQL* m = GetHandler(obj);
fd_set read;
-
+ int ret;
+
timeout = ( NIL_P(timeout) ? m->net.read_timeout : INT2NUM(timeout) );
struct timeval tv = { tv_sec: timeout, tv_usec: 0 };
- FD_ZERO(&read);
- FD_SET(m->net.fd, &read);
+ for(;;){
+ FD_ZERO(&read);
+ FD_SET(m->net.fd, &read);
+
+ ret = rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv);
+ if (ret < 0) {
+ idle( obj );
+ rb_raise(eMysql, "query: timeout");
+ }
+
+ if (ret == 0) {
+ continue;
+ }
- if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) {
- rb_raise(eMysql, "query: timeout");
+ if (m->status == MYSQL_STATUS_READY){
+ break;
+ }
}
+<<<<<<< HEAD:ext/mysql.c
+=======
+}
+
+static int should_schedule_query(){
+ return rb_thread_alone() != 1;
+>>>>>>> with_async_validation:ext/mysql.c
}
/* async_query(sql,timeout=nil) */
static VALUE async_query(int argc, VALUE* argv, VALUE obj)
{
+<<<<<<< HEAD:ext/mysql.c
VALUE sql, timeout;
+=======
+ MYSQL* m = GetHandler(obj);
+ VALUE sql, timeout;
+>>>>>>> with_async_validation:ext/mysql.c
+
+ rb_scan_args(argc, argv, "11", &sql, &timeout);
- rb_scan_args(argc, argv, "11", &sql, &timeout);
+ async_in_progress_set( obj, Qfalse );
- send_query(obj,sql);
+ busy(obj);
- schedule(obj, timeout);
+ send_query( obj, sql );
- return get_result(obj);
+ if ( should_schedule_query() ){
+ schedule_query(obj, timeout);
+ }
+
+ if (rb_block_given_p()) {
+ rb_yield( get_result(obj) );
+ idle( obj );
+ return obj;
+ }else{
+ idle( obj );
+ return get_result(obj);
+ }
}
#if MYSQL_VERSION_ID >= 40100
@@ -2020,7 +2265,7 @@ static VALUE stmt_result_metadata(VALUE obj)
mysql_stmt_raise(s->stmt);
return Qnil;
}
- return mysqlres2obj(res);
+ return mysqlres2obj(res, Qfalse);
}
/* row_seek(offset) */
@@ -2278,12 +2523,22 @@ void Init_mysql(void)
rb_define_method(cMysql, "query", query, 1);
rb_define_method(cMysql, "real_query", query, 1);
rb_define_method(cMysql, "c_async_query", async_query, -1);
+ rb_define_method(cMysql, "async_in_progress?", async_in_progress, 0);
+ rb_define_method(cMysql, "async_in_progress=", async_in_progress_set, 1);
rb_define_method(cMysql, "send_query", send_query, 1);
+ rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0);
+ rb_define_method(cMysql, "reconnected?", reconnected, 0);
rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1);
+ rb_define_method(cMysql, "retry?", retry, 0);
+ rb_define_method(cMysql, "interrupted?", interrupted, 0);
rb_define_method(cMysql, "blocking?", blocking, 0);
+ rb_define_method(cMysql, "gc_disabled?", gc_disabled, 0);
+ rb_define_method(cMysql, "disable_gc=", disable_gc_set, 1);
+ rb_define_method(cMysql, "busy?", is_busy, 0);
+ rb_define_method(cMysql, "idle?", is_idle, 0);
+ rb_define_method(cMysql, "busy=", busy_set, 1);
rb_define_method(cMysql, "socket", socket, 0);
- rb_define_method(cMysql, "socket_type", socket_type, 0);
rb_define_method(cMysql, "refresh", refresh, 1);
rb_define_method(cMysql, "reload", reload, 0);
rb_define_method(cMysql, "select_db", select_db, 1);
View
7 test/async_query_with_block_test.rb
@@ -0,0 +1,7 @@
+require File.dirname(__FILE__) + '/test_helper'
+
+m = Mysql.real_connect('localhost','root','','mysql')
+
+m.c_async_query( 'SELECT * FROM user' ) do |result|
+ puts result.inspect
+end
View
41 test/gc_benchmark.rb
@@ -0,0 +1,41 @@
+require 'rubygems'
+require 'mysqlplus'
+require 'benchmark'
+
+with_gc = Mysql.real_connect('localhost','root','','mysql')
+without_gc = Mysql.real_connect('localhost','root','','mysql')
+without_gc.disable_gc = true
+
+$gc_stats = []
+
+def countable_gc?
+ GC.respond_to? :count
+end
+
+def gc_counts( label, scope )
+ $gc_stats << "Objects #{scope} ( #{label} ) #{GC.count}"
+end
+
+def with_gc_counts( label )
+ gc_counts( label, 'before' ) if countable_gc?
+ yield
+ gc_counts( label, 'after' ) if countable_gc?
+end
+
+n = 1000
+
+Benchmark.bmbm do |x|
+ x.report( 'With GC' ) do
+ with_gc_counts( 'With GC' ) do
+ n.times{ with_gc.c_async_query( 'SELECT * FROM user' ) }
+ end
+ end
+ GC.start
+ x.report( 'Without GC' ) do
+ with_gc_counts( 'Without GC' ) do
+ n.times{ without_gc.c_async_query( 'SELECT * FROM user' ) }
+ end
+ end
+end
+
+puts $gc_stats.join( ' | ' )
View
31 test/out_of_sync_test.rb
@@ -0,0 +1,31 @@
+require File.dirname(__FILE__) + '/test_helper'
+
+m = Mysql.real_connect('localhost','root')
+m.reconnect = true
+
+class << m
+
+ def safe_query( query )
+ begin
+ send_query( query )
+ rescue => e
+ puts e.message
+ end
+ end
+
+end
+
+m.safe_query( 'select sleep(1)' )
+m.safe_query( 'select sleep(1)' )#raises
+m.simulate_disconnect #fires mysql_library_end
+m.safe_query( 'select sleep(1)' )
+m.safe_query( 'select sleep(1)' )#raises
+m.close
+m.connect('localhost','root')
+m.safe_query( 'select sleep(1)' )
+m.safe_query( 'select sleep(1)' )#raises
+m.simulate_disconnect
+m.safe_query( 'BEGIN' )
+m.safe_query( 'select sleep(1)' )
+m.get_result()
+m.safe_query( 'COMMIT' )
View
18 test/reconnected_test.rb
@@ -0,0 +1,18 @@
+require File.dirname(__FILE__) + '/test_helper'
+
+$m = Mysql.real_connect('localhost','root')
+#$m.reconnect = true
+
+def assert_reconnected
+ puts $m.reconnected?().inspect
+ sleep 1
+ yield
+ puts $m.reconnected?().inspect
+end
+
+assert_reconnected do
+ $m.simulate_disconnect
+end
+assert_reconnected do
+ $m.close
+end
Please sign in to comment.
Something went wrong with that request. Please try again.