Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MXS-1203: Better handling of batch queries
When batched queries are done through readwritesplit, it will now handle
them one by one. This allows batched queries to be used with
readwritesplit but it does impose a performance penalty when compared to
direct execution on the backend.
  • Loading branch information
markus456 committed Mar 31, 2017
1 parent 51a7642 commit 7eff0f9
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 68 deletions.
3 changes: 3 additions & 0 deletions include/maxscale/protocol/mysql.h
Expand Up @@ -432,4 +432,7 @@ bool mxs_mysql_is_ok_packet(GWBUF *buffer);
/** Check for result set */
bool mxs_mysql_is_result_set(GWBUF *buffer);

/** Get current command for a session */
mysql_server_cmd_t mxs_mysql_current_command(MXS_SESSION* session);

MXS_END_DECLS
2 changes: 1 addition & 1 deletion server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c
Expand Up @@ -755,7 +755,7 @@ gw_read_and_write(DCB *dcb)
* If protocol has session command set, concatenate whole
* response into one buffer.
*/
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MYSQL_COM_UNDEFINED)
{
stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer));
/**
Expand Down
6 changes: 6 additions & 0 deletions server/modules/protocol/MySQL/mysql_common.c
Expand Up @@ -1573,3 +1573,9 @@ bool mxs_mysql_is_result_set(GWBUF *buffer)

return rval;
}

mysql_server_cmd_t mxs_mysql_current_command(MXS_SESSION* session)
{
MySQLProtocol* proto = (MySQLProtocol*)session->client_dcb->protocol;
return proto->current_command;
}
2 changes: 1 addition & 1 deletion server/modules/routing/readwritesplit/CMakeLists.txt
@@ -1,4 +1,4 @@
add_library(readwritesplit SHARED readwritesplit.c rwsplit_mysql.c rwsplit_route_stmt.c rwsplit_select_backends.c rwsplit_session_cmd.c rwsplit_tmp_table_multi.c)
target_link_libraries(readwritesplit maxscale-common)
target_link_libraries(readwritesplit maxscale-common MySQLCommon)
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
install_module(readwritesplit core)
213 changes: 163 additions & 50 deletions server/modules/routing/readwritesplit/readwritesplit.c
Expand Up @@ -331,6 +331,8 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
client_rses->client_dcb = session->client_dcb;
client_rses->have_tmp_tables = false;
client_rses->forced_node = NULL;
client_rses->expected_responses = 0;
client_rses->query_queue = NULL;
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));

int router_nservers = router->service->n_dbref;
Expand Down Expand Up @@ -529,12 +531,44 @@ void close_failed_bref(backend_ref_t *bref, bool fatal)
{
sescmd_cursor_set_active(&bref->bref_sescmd_cur, false);
}
}

bool route_stored_query(ROUTER_CLIENT_SES *rses)
{
bool rval = true;

if (bref->bref_pending_cmd)
if (rses->query_queue)
{
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
query_queue = gwbuf_make_contiguous(query_queue);

/** Store the query queue locally for the duration of the routeQuery call.
* This prevents recursive calls into this function. */
GWBUF *temp_storage = rses->query_queue;
rses->query_queue = NULL;

if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue))
{
rval = false;
char* sql = modutil_get_SQL(query_queue);

if (sql)
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Failed to route query.");
}
gwbuf_free(query_queue);
}

ss_dassert(rses->query_queue == NULL);
rses->query_queue = temp_storage;
}

return rval;
}

/**
Expand Down Expand Up @@ -565,7 +599,21 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
else
{
live_session_reply(&querybuf, rses);
if (route_single_stmt(inst, rses, querybuf))

if (rses->expected_responses || rses->query_queue)
{
/** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
querybuf = NULL;
rval = 1;

if (rses->expected_responses == 0 && !route_stored_query(rses))
{
rval = 0;
}
}
else if (route_single_stmt(inst, rses, querybuf))
{
rval = 1;
}
Expand Down Expand Up @@ -652,6 +700,57 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
}
}

/**
* @brief Check if we have received a complete reply from the backend
*
* @param bref Backend reference
* @param buffer Buffer containing the response
*
* @return True if the complete response has been received
*/
bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
{

if (bref->reply_state == REPLY_STATE_START &&
!mxs_mysql_is_result_set(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
}
else
{
int more;
int n_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
n_eof += modutil_count_signal_packets(buffer, 0, n_eof, &more);

mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);

if (n_eof == 0)
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
bref->reply_state = REPLY_STATE_RSET_COLDEF;
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
bref->reply_state = REPLY_STATE_RSET_ROWS;
}
else
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
}
}

return bref->reply_state == REPLY_STATE_DONE;
}

/**
* @brief Client Reply routine (API)
*
Expand Down Expand Up @@ -699,7 +798,15 @@ static void clientReply(MXS_ROUTER *instance,

/** Statement was successfully executed, free the stored statement */
session_clear_stmt(backend_dcb->session);
ss_dassert(bref->reply_state != REPLY_STATE_DONE);

if (reply_is_complete(bref, writebuf))
{
/** Got a complete reply, decrement expected response count */
router_cli_ses->expected_responses--;
ss_dassert(router_cli_ses->expected_responses >= 0);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
}
/**
* Active cursor means that reply is from session command
* execution.
Expand Down Expand Up @@ -751,14 +858,26 @@ static void clientReply(MXS_ROUTER *instance,
bref_clear_state(bref, BREF_WAITING_RESULT);
}

if (router_cli_ses->expected_responses == 0)
{
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
{
ss_dassert(router_cli_ses->rses_backend_ref[i].reply_state == REPLY_STATE_DONE);
}

route_stored_query(router_cli_ses);
}
else
{
ss_dassert(router_cli_ses->expected_responses > 0);
}
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
}

/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
else if (sescmd_cursor_is_active(scur))
{
bool succp;

Expand All @@ -773,40 +892,6 @@ static void clientReply(MXS_ROUTER *instance,
bref->ref->server->name, bref->ref->server->port);
}
}
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
int ret;

CHK_GWBUF(bref->bref_pending_cmd);

if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
gwbuf_clone(bref->bref_pending_cmd))) == 1)
{
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
atomic_add_uint64(&inst->stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
char* sql = modutil_get_SQL(bref->bref_pending_cmd);

if (sql)
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Failed to route query.");
}
}
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
}
}


Expand All @@ -821,7 +906,7 @@ static void clientReply(MXS_ROUTER *instance,
*/
static uint64_t getCapabilities(MXS_ROUTER* instance)
{
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING;
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
}

/*
Expand Down Expand Up @@ -1467,6 +1552,10 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
rses->expected_responses++;
success = true;
break;
}
Expand All @@ -1484,6 +1573,10 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
LOG_RS(bref, REPLY_STATE_START);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_START;
rses->expected_responses++;
success = true;
}
}
Expand Down Expand Up @@ -1534,14 +1627,14 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
}
CHK_BACKEND_REF(bref);

/**
* If query was sent through the bref and it is waiting for reply from
* the backend server it is necessary to send an error to the client
* because it is waiting for reply.
*/
if (BREF_IS_WAITING_RESULT(bref))
{
GWBUF *stored;
/**
* A query was sent through the backend and it is waiting for a reply.
* Try to reroute the statement to a working server or send an error
* to the client.
*/
GWBUF *stored = NULL;
const SERVER *target;

if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
Expand All @@ -1550,12 +1643,31 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
{
/**
* We failed to route the stored statement or no statement was
* stored for this server. Either way we can safely free the buffer.
* stored for this server. Either way we can safely free the buffer
* and decrement the expected response count.
*/
gwbuf_free(stored);
myrses->expected_responses--;

if (!sescmd_cursor_is_active(&bref->bref_sescmd_cur))
{
/**
* The backend was executing a command that requires a reply.
* Send an error to the client to let it know the query has
* failed.
*/
DCB *client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
}

DCB *client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
if (myrses->expected_responses == 0)
{
/**
* The response from this server was the last one, try to
* route any stored queries
*/
route_stored_query(myrses);
}
}
}

Expand Down Expand Up @@ -1698,6 +1810,7 @@ static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int*
#endif
backend_ref[i].bref_state = 0;
backend_ref[i].ref = sref;
backend_ref[i].reply_state = REPLY_STATE_DONE;
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = rses;
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
Expand Down

0 comments on commit 7eff0f9

Please sign in to comment.