Skip to content

Commit

Permalink
Support for SERVER_MORE_RESULTS_EXISTS #547
Browse files Browse the repository at this point in the history
  • Loading branch information
renecannao committed May 4, 2016
1 parent 3899356 commit ffd3970
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 5 deletions.
2 changes: 1 addition & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class MySQL_Session
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;

bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, of if proysql is still buffering everything
MySQL_Session();
// MySQL_Session(int);
~MySQL_Session();
Expand Down
1 change: 1 addition & 0 deletions include/mysql_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class MySQL_Connection {
bool has_prepared_statement;
bool processing_prepared_statement_prepare;
bool processing_prepared_statement_execute;
bool processing_multi_statement;
MySQL_Connection();
~MySQL_Connection();
// int assign_mshge(unsigned int);
Expand Down
3 changes: 3 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_QUERY_START,
ASYNC_QUERY_CONT,
ASYNC_QUERY_END,
ASYNC_NEXT_RESULT_START,
ASYNC_NEXT_RESULT_CONT,
ASYNC_NEXT_RESULT_END,
ASYNC_STORE_RESULT_START,
ASYNC_STORE_RESULT_CONT,
ASYNC_USE_RESULT_START,
Expand Down
5 changes: 5 additions & 0 deletions lib/MySQL_Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len,
case STATE_QUERY_SENT_NET:
(*myds)->DSS=STATE_ERR;
break;
case STATE_OK:
break;
default:
assert(0);
}
Expand Down Expand Up @@ -564,6 +566,8 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u
case STATE_QUERY_SENT_NET:
(*myds)->DSS=STATE_OK;
break;
case STATE_OK:
break;
default:
assert(0);
}
Expand Down Expand Up @@ -1333,6 +1337,7 @@ MySQL_ResultSet::~MySQL_ResultSet() {
free(buffer);
buffer=NULL;
}
myds->pkt_sid=sid-1;
}

unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
Expand Down
36 changes: 32 additions & 4 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ MySQL_Session::MySQL_Session() {
default_schema=NULL;
schema_locked=false;
session_fast_forward=false;
started_sending_data_to_client=false;
admin_func=NULL;
//client_fd=0;
//server_fd=0;
Expand Down Expand Up @@ -1145,10 +1146,32 @@ int MySQL_Session::handler() {
}
}
} else {
// rc==1 , query is still running
// start sending to frontend if mysql_thread___threshold_resultset_size is reached
if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) {
myconn->MyRS->get_resultset(client_myds->PSarrayOUT);
switch (rc) {
// rc==1 , query is still running
// start sending to frontend if mysql_thread___threshold_resultset_size is reached
case 1:
if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) {
myconn->MyRS->get_resultset(client_myds->PSarrayOUT);
}
break;
// rc==2 : a multi-resultset (or multi statement) was detected, and the current statement is completed
case 2:
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
if (myconn->MyRS) { // we also need to clear MyRS, so that the next staement will recreate it if needed
delete myconn->MyRS;
myconn->MyRS=NULL;
}
NEXT_IMMEDIATE(PROCESSING_QUERY);
break;
// rc==3 , a multi statement query is still running
// start sending to frontend if mysql_thread___threshold_resultset_size is reached
case 3:
if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) {
myconn->MyRS->get_resultset(client_myds->PSarrayOUT);
}
break;
default:
break;
}
}
}
Expand Down Expand Up @@ -1999,12 +2022,16 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My
unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
if (mysql->server_status & SERVER_MORE_RESULTS_EXIST)
setStatus += SERVER_MORE_RESULTS_EXIST;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info);
client_myds->pkt_sid++;
} else {
// error
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(mysql),sqlstate,mysql_error(mysql));
client_myds->pkt_sid++;
}
}
}
Expand Down Expand Up @@ -2132,4 +2159,5 @@ void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds) {
client_myds->DSS=STATE_SLEEP;
// finalize the query
CurrentQuery.end();
started_sending_data_to_client=false;
}
51 changes: 51 additions & 0 deletions lib/mysql_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ MySQL_Connection::MySQL_Connection() {
query.length=0;
largest_query_length=0;
MyRS=NULL;
processing_multi_statement=false;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this);
};

Expand Down Expand Up @@ -311,6 +312,7 @@ void MySQL_Connection::connect_start() {
client_flags += CLIENT_FOUND_ROWS;
if (parent->compression)
client_flags += CLIENT_COMPRESS;
client_flags += CLIENT_MULTI_STATEMENTS; // FIXME: add global variable
if (parent->port) {
async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, parent->address, userinfo->username, userinfo->password, userinfo->schemaname, parent->port, NULL, client_flags);
} else {
Expand Down Expand Up @@ -560,6 +562,36 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
#endif
}
break;

case ASYNC_NEXT_RESULT_START:
async_exit_status = mysql_next_result_start(&interr, mysql);
if (async_exit_status) {
next_event(ASYNC_NEXT_RESULT_CONT);
} else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;

case ASYNC_NEXT_RESULT_CONT:
async_exit_status = mysql_next_result_cont(&interr, mysql, mysql_status(event, true));
if (async_exit_status) {
next_event(ASYNC_NEXT_RESULT_CONT);
} else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;

case ASYNC_NEXT_RESULT_END:
break;

case ASYNC_STORE_RESULT_START:
if (mysql_errno(mysql)) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
Expand Down Expand Up @@ -619,6 +651,14 @@ MDB_ASYNC_ST MySQL_Connection::handler(short event) {
}
break;
case ASYNC_QUERY_END:
if (mysql_result) {
mysql_free_result(mysql_result);
mysql_result=NULL;
}
//if (mysql_next_result(mysql)==0) {
if (mysql->server_status & SERVER_MORE_RESULTS_EXIST) {
async_state_machine=ASYNC_NEXT_RESULT_START;
}
break;
case ASYNC_SET_AUTOCOMMIT_START:
set_autocommit_start();
Expand Down Expand Up @@ -805,6 +845,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length)
}
switch (async_state_machine) {
case ASYNC_QUERY_END:
processing_multi_statement=false; // no matter if we are processing a multi statement or not, we reached the end
return 0;
break;
case ASYNC_IDLE:
Expand All @@ -822,6 +863,16 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length)
return 0;
}
}
if (async_state_machine==ASYNC_NEXT_RESULT_START) {
// if we reached this point it measn we are processing a multi-statement
// and we need to exit to give control to MySQL_Session
processing_multi_statement=true;
return 2;
}
if (processing_multi_statement==true) {
// we are in the middle of processing a multi-statement
return 3;
}
return 1;
}

Expand Down

0 comments on commit ffd3970

Please sign in to comment.