Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread safe guard to throw exception when share DBConnection cross threads #635

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ libswsscommon_la_SOURCES = \
warm_restart.cpp \
luatable.cpp \
countertable.cpp \
redisutility.cpp
redisutility.cpp \
threadsafeguard.cpp

libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)
Expand Down
7 changes: 7 additions & 0 deletions common/dbconnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,12 @@ RedisContext::~RedisContext()

RedisContext::RedisContext()
: m_conn(NULL)
, m_running(false)
{
}

RedisContext::RedisContext(const RedisContext &other)
: m_running(false)
{
auto octx = other.getContext();
const char *unixPath = octx->unix_sock.path;
Expand Down Expand Up @@ -478,6 +480,11 @@ void RedisContext::setClientName(const string& clientName)
r.checkStatusOK();
}

std::atomic<bool> &RedisContext::getRunningFlag()
{
return m_running;
}

string RedisContext::getClientName()
{
string command("CLIENT GETNAME");
Expand Down
9 changes: 9 additions & 0 deletions common/dbconnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
#include <utility>
#include <memory>
#include <mutex>
#include <atomic>

#include <hiredis/hiredis.h>
#include "rediscommand.h"
#include "redisreply.h"
#include "threadsafeguard.h"
#define EMPTY_NAMESPACE std::string()

namespace swss {
Expand Down Expand Up @@ -127,6 +129,8 @@ class RedisContext

std::string getClientName();

std::atomic<bool> &getRunningFlag();

protected:
RedisContext();
void initContext(const char *host, int port, const timeval *tv);
Expand All @@ -135,6 +139,11 @@ class RedisContext

private:
redisContext *m_conn;

/*
* Use this flag and ThreadSafeGuard to check and throw exception when multiple thread run command on same redis context.
*/
std::atomic<bool> m_running;
};

class DBConnector : public RedisContext
Expand Down
2 changes: 2 additions & 0 deletions common/redisreply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ inline void guard(FUNC func, const char* command)

RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
{
ThreadSafeGuard safeguard(ctx->getRunningFlag(), string(command.c_str()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use std::mutex ? and mutex guard ? instead of implementing yourself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I know the libswsscommon is not thread safe by design, so we add this class to catch the multi-thread scenario and throw exception.

If we add lock here, it will impact the critical performance, like route sync or warm reboot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think std::mutext will impact performace here, mutext is really quick, implementation internal is on atomic exchange, its 1 assembly instruction, so this is not performance critical here, and using already given tools make it simpler and better understanding for someone that is new to the code

if you are throwing here then how this will be handled in production scenario? just crash ?

Copy link
Contributor Author

@liuh-80 liuh-80 Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with QI, I have better undertsand about whu not use mutex, mutext need context switch which take microseconds, and there a some critical scenario can't accept this delay for example update thousands of routes.

Here is my closed PR for add a mutex, which contains more information:
#634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that depends on the mutex, you could have potentially spin lock since this operation could be very fast, and you dont need to use recursive_mutex as in showed PR, since you will not have recursive call here, still for me using std::mutex here is better than implementing it from scratch, also why crash on multiple thread access? and not wait ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ThreadSafeGuard is not a lock, it's just check race condition and throw exception when it happens.
Also, the code inside RedisContext ctor sometimes very slowly, for example run a LUA script.

Here is an example how crash happen:
caclmgrd currently share same RedisContext with multiple thread:
Thread 1: run 'keys' command
Thread 2: run 'hgetall' command

Then when thread 1 read command result, it read the 'keys' result, however the thread 1 process the result as 'hgetall' result, so the code crash.

int rc = redisAppendFormattedCommand(ctx->getContext(), command.c_str(), command.length());
if (rc != REDIS_OK)
{
Expand All @@ -51,6 +52,7 @@ RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)

RedisReply::RedisReply(RedisContext *ctx, const string& command)
{
ThreadSafeGuard safeguard(ctx->getRunningFlag(), command);
int rc = redisAppendCommand(ctx->getContext(), command.c_str());
if (rc != REDIS_OK)
{
Expand Down
26 changes: 26 additions & 0 deletions common/threadsafeguard.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <iostream>
#include <sstream>
#include <system_error>

#include "common/logger.h"
#include "common/threadsafeguard.h"

using namespace std;
using namespace swss;

ThreadSafeGuard::ThreadSafeGuard(atomic<bool> &running, const string &info)
:m_running(running)
{
auto currentlyRunning = m_running.exchange(true);
if (currentlyRunning)
{
string errmsg = "Current thread '" + info + "' conflict with other thread.";
SWSS_LOG_ERROR("%s", errmsg.c_str());
throw system_error(make_error_code(errc::operation_in_progress), errmsg.c_str());
}
}

ThreadSafeGuard::~ThreadSafeGuard()
{
m_running = false;
}
15 changes: 15 additions & 0 deletions common/threadsafeguard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

namespace swss {

class ThreadSafeGuard
{
public:
ThreadSafeGuard(std::atomic<bool> &running, const std::string &info);
~ThreadSafeGuard();

private:
std::atomic<bool> &m_running;
};

}
18 changes: 18 additions & 0 deletions tests/redis_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1057,3 +1057,21 @@ TEST(Connector, hmset)
// test empty multi hash
db.hmset({});
}

TEST(Connector, ThreadSafeGuard)
{
DBConnector db("TEST_DB", 0, true);

string exceptionMessage;
try
{
ThreadSafeGuard safeguard1(db.getRunningFlag(), "command 1");
ThreadSafeGuard safeguard2(db.getRunningFlag(), "command 2");
}
catch (const system_error& ex)
{
exceptionMessage = ex.what();
}

ASSERT_EQ(exceptionMessage, "Current thread 'command 2' conflict with other thread.: Operation now in progress");
}