Skip to content

Commit

Permalink
Redis transaction support. (#12)
Browse files Browse the repository at this point in the history
* change redis txn and support watch

* update redis multi unit test
  • Loading branch information
MrGuin committed Jan 10, 2024
1 parent f99612d commit 85c5c3a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 40 deletions.
39 changes: 35 additions & 4 deletions src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class RedisConnContext : public Destroyable {
public:
explicit RedisConnContext(const RedisService* rs)
: redis_service(rs)
, in_transaction(false)
, batched_size(0) {}

~RedisConnContext();
Expand All @@ -68,7 +69,10 @@ class RedisConnContext : public Destroyable {
const RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler;
std::unique_ptr<TransactionHandler> transaction_handler;
// Whether this connection has begun a transaction. If true, the commands
// received will be handled by transaction_handler.
bool in_transaction;
// >0 if command handler is run in batched mode.
int batched_size;

Expand All @@ -82,15 +86,33 @@ int ConsumeCommand(RedisConnContext* ctx,
butil::IOBufAppender* appender) {
RedisReply output(&ctx->arena);
RedisCommandHandlerResult result = REDIS_CMD_HANDLED;
if (ctx->transaction_handler) {
if (ctx->in_transaction) {
assert(ctx->transaction_handler != nullptr);
result = ctx->transaction_handler->Run(args, &output, flush_batched);
if (result == REDIS_CMD_HANDLED) {
ctx->transaction_handler.reset(NULL);
ctx->in_transaction = false;
} else if (result == REDIS_CMD_BATCHED) {
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
} else {
}
else if (args[0] == "watch" || args[0] == "unwatch") {
if (!ctx->transaction_handler) {
ctx->transaction_handler.reset(ctx->redis_service->NewTransactionHandler());
ctx->in_transaction = false;
}
if (!ctx->transaction_handler) {
output.SetError("ERR Transaction not supported.");
} else {
result = ctx->transaction_handler->Run(args, &output, flush_batched);
if (result == REDIS_CMD_BATCHED) {
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
}
}
else {
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(args[0]);
if (!ch) {
char buf[64];
Expand All @@ -103,7 +125,16 @@ int ConsumeCommand(RedisConnContext* ctx,
LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
return -1;
}
ctx->transaction_handler.reset(ch->NewTransactionHandler());
if (ctx->transaction_handler == nullptr) {
ctx->transaction_handler.reset(ctx->redis_service->NewTransactionHandler());
}
if (ctx->transaction_handler != nullptr) {
ctx->transaction_handler->Begin();
ctx->in_transaction = true;
}
else {
output.SetError("ERR Transaction not supported.");
}
} else if (result == REDIS_CMD_BATCHED) {
ctx->batched_size++;
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ RedisCommandHandler* RedisService::FindCommandHandler(const butil::StringPiece&
return NULL;
}

TransactionHandler* RedisService::NewTransactionHandler() const {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
}

RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
Expand Down
11 changes: 11 additions & 0 deletions src/brpc/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&);

class RedisCommandHandler;
class TransactionHandler;

// Container of CommandHandlers.
// Assign an instance to ServerOption.redis_service to enable redis support.
Expand All @@ -231,6 +232,9 @@ class RedisService {
// Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);

// Create a transaction handler to handle commands inside a transaction.
virtual TransactionHandler* NewTransactionHandler() const;

// This function should not be touched by user and used by brpc deverloper only.
RedisCommandHandler* FindCommandHandler(const butil::StringPiece& name) const;

Expand All @@ -243,6 +247,8 @@ enum RedisCommandHandlerResult {
REDIS_CMD_HANDLED = 0,
REDIS_CMD_CONTINUE = 1,
REDIS_CMD_BATCHED = 2,
REDIS_CMD_TXN_START = 3,
REDIS_CMD_TXN_FINISH = 4,
};

// The Command handler for a redis request. User should impletement Run().
Expand Down Expand Up @@ -289,6 +295,11 @@ class RedisCommandHandler {
virtual RedisCommandHandler* NewTransactionHandler();
};

class TransactionHandler : public RedisCommandHandler {
public:
virtual bool Begin() = 0;
};

} // namespace brpc

#endif // BRPC_REDIS_H
82 changes: 46 additions & 36 deletions test/brpc_redis_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,47 @@ butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;

class MultiTransactionHandler : public brpc::TransactionHandler {
public:
brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args[0] == "multi") {
output->SetError("ERR duplicate multi");
return brpc::REDIS_CMD_CONTINUE;
}
if (args[0] != "exec") {
std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i].as_string());
}
_commands.push_back(comm);
output->SetStatus("QUEUED");
return brpc::REDIS_CMD_CONTINUE;
}
output->SetArray(_commands.size());
s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") {
int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} else {
(*output)[i].SetStatus("unknown command");
}
}
s_mutex.unlock();
return brpc::REDIS_CMD_HANDLED;
}

bool Begin() override {
return true;
}

private:
std::vector<std::vector<std::string> > _commands;
};

class RedisServiceImpl : public brpc::RedisService {
public:
RedisServiceImpl()
Expand Down Expand Up @@ -862,6 +903,11 @@ class RedisServiceImpl : public brpc::RedisService {
}
}

brpc::TransactionHandler* NewTransactionHandler() const override {

return new MultiTransactionHandler;
}

std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
};
Expand Down Expand Up @@ -1088,42 +1134,6 @@ class MultiCommandHandler : public brpc::RedisCommandHandler {
RedisCommandHandler* NewTransactionHandler() override {
return new MultiTransactionHandler;
}

class MultiTransactionHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args[0] == "multi") {
output->SetError("ERR duplicate multi");
return brpc::REDIS_CMD_CONTINUE;
}
if (args[0] != "exec") {
std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i].as_string());
}
_commands.push_back(comm);
output->SetStatus("QUEUED");
return brpc::REDIS_CMD_CONTINUE;
}
output->SetArray(_commands.size());
s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") {
int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} else {
(*output)[i].SetStatus("unknown command");
}
}
s_mutex.unlock();
return brpc::REDIS_CMD_HANDLED;
}
private:
std::vector<std::vector<std::string> > _commands;
};
};

TEST_F(RedisTest, server_command_continue) {
Expand Down

0 comments on commit 85c5c3a

Please sign in to comment.