Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MXS-4209: Close LocalClients on first response
When the response to the KILL command is received, the client can be
immediately closed. By using the cherry-picked LocalClient notification
callbacks from 6.4, the code can now properly close the connections even
when the TCP connection won't be closed. This also has the added benefit
of making the KILL commands faster as they are now fully event driven
and don't poll for the completion.
  • Loading branch information
markus456 committed Jul 21, 2022
1 parent 881117b commit 51eabc8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 57 deletions.
2 changes: 2 additions & 0 deletions include/maxscale/protocol/mariadb/client_connection.hh
Expand Up @@ -117,6 +117,8 @@ private:
SpecialCmdRes handle_query_kill(GWBUF* read_buffer, uint32_t packet_len);
void add_local_client(LocalClient* client);
bool have_local_clients();
void kill_complete(bool send_ok, LocalClient* client);
void maybe_send_kill_response(bool send_ok);

void track_transaction_state(MXS_SESSION* session, GWBUF* packetbuf);
void execute_kill_all_others(uint64_t target_id, uint64_t keep_protocol_thread_id, kill_type_t type);
Expand Down
137 changes: 80 additions & 57 deletions server/modules/protocol/MariaDB/mariadb_client.cc
Expand Up @@ -1735,61 +1735,53 @@ void MariaDBClientConnection::execute_kill(std::shared_ptr<KillInfo> info, bool
auto origin = mxs::RoutingWorker::get_current();

auto func = [this, info, ref, origin, send_ok]() {
// First, gather the list of servers where the KILL should be sent
mxs::RoutingWorker::execute_concurrently(
[info, ref]() {
dcb_foreach_local(info->cb, info.get());
});

// TODO: This doesn't handle the case where a session is moved from one worker to another while
// this was being executed on the MainWorker.

// Then move execution back to the original worker to keep all connections on the same thread
origin->call(
[this, info, ref, origin, send_ok]() {
for (const auto& a : info->targets)
// First, gather the list of servers where the KILL should be sent
mxs::RoutingWorker::execute_concurrently(
[info, ref]() {
dcb_foreach_local(info->cb, info.get());
});

// TODO: This doesn't handle the case where a session is moved from one worker to another while
// this was being executed on the MainWorker.

// Then move execution back to the original worker to keep all connections on the same thread
origin->call(
[this, info, ref, origin, send_ok]() {
for (const auto& a : info->targets)
{
if (LocalClient* client = LocalClient::create(info->session, a.first))
{
if (client->connect())
{
client->set_notify(
[this, send_ok, client](GWBUF*, const mxs::ReplyRoute&, const mxs::Reply&){
kill_complete(send_ok, client);
}, [this, send_ok, client](GWBUF*, mxs::Target*, const mxs::Reply&){
kill_complete(send_ok, client);
});

// TODO: There can be multiple connections to the same server. Currently only one
// connection per server is killed.
MXS_INFO("KILL on '%s': %s", a.first->name(), a.second.c_str());
client->queue_query(modutil_create_query(a.second.c_str()));

mxb_assert(ref->state() != MXS_SESSION::State::STOPPING);
add_local_client(client);
}
else
{
if (LocalClient* client = LocalClient::create(info->session, a.first))
{
client->connect();
// TODO: There can be multiple connections to the same server. Currently only one
// connection per server is killed.
MXS_INFO("KILL on '%s': %s", a.first->name(), a.second.c_str());
client->queue_query(modutil_create_query(a.second.c_str()));
client->queue_query(mysql_create_com_quit(NULL, 0));

mxb_assert(ref->state() != MXS_SESSION::State::STOPPING);
add_local_client(client);
}
delete client;
}
}
}

// Now wait for the COM_QUIT to close the connections.
auto wait_for_conns = [this, ref, send_ok](auto action){
bool rv = true;

if (action == mxb::Worker::Call::CANCEL || !this->have_local_clients())
{
// Check if the DCB is still open. If MaxScale is shutting down, the DCB is
// already closed when this callback is called and an error about a write to a
// closed DCB would be logged.
if (!m_dcb->is_closed() && send_ok)
{
this->write_ok_packet(1);
}

session_put_ref(ref);
MXS_INFO("All KILL commands finished");
rv = false;
}

return rv;
};

// TODO: Polling for this is slow. A callback in the LocalClient's destructor would be
// better as it would close the connection as soon as possible.
origin->dcall(100, wait_for_conns);
}, mxs::RoutingWorker::EXECUTE_AUTO);
};
// If we ended up not sending any KILL commands, the OK packet can be generated immediately.
maybe_send_kill_response(send_ok);

// The reference can now be freed as the execution is back on the worker that owns it
session_put_ref(ref);
}, mxs::RoutingWorker::EXECUTE_AUTO);
};

mxs::MainWorker::get()->execute(func, mxb::Worker::EXECUTE_QUEUED);
}
Expand Down Expand Up @@ -2531,15 +2523,46 @@ bool MariaDBClientConnection::send_mysql_err_packet(int packet_number, int in_af
void MariaDBClientConnection::add_local_client(LocalClient* client)
{
// Prune stale LocalClients before adding the new one
auto it = std::remove_if(m_local_clients.begin(), m_local_clients.end(), [](const auto& client) {
return !client->is_open();
});

m_local_clients.erase(it, m_local_clients.end());
m_local_clients.erase(
std::remove_if(m_local_clients.begin(), m_local_clients.end(), [](const auto& client) {
return !client->is_open();
}), m_local_clients.end());

m_local_clients.emplace_back(client);
}

void MariaDBClientConnection::kill_complete(bool send_ok, LocalClient* client)
{
// This needs to be executed once we return from the clientReply or the handleError callback of the
// LocalClient. In 6.4 this can be changed to use Worker::lcall to make it execute right after the
// function returns into epoll.
auto fn = [=]() {
m_local_clients.erase(
std::remove_if(m_local_clients.begin(), m_local_clients.end(), [&](const auto& c) {
return c.get() == client;
}), m_local_clients.end());
maybe_send_kill_response(send_ok);
};

m_session->worker()->execute(fn, mxb::Worker::EXECUTE_QUEUED);
}

void MariaDBClientConnection::maybe_send_kill_response(bool send_ok)
{
if (!have_local_clients())
{
// Check if the DCB is still open. If MaxScale is shutting down, the DCB is
// already closed when this callback is called and an error about a write to a
// closed DCB would be logged.
if (!m_dcb->is_closed() && send_ok)
{
write_ok_packet(1);
}

MXS_INFO("All KILL commands finished");
}
}

bool MariaDBClientConnection::have_local_clients()
{
return std::any_of(m_local_clients.begin(), m_local_clients.end(), std::mem_fn(&LocalClient::is_open));
Expand Down

0 comments on commit 51eabc8

Please sign in to comment.