Skip to content

Commit

Permalink
Fix the duplicate consumer_tag problem
Browse files Browse the repository at this point in the history
  • Loading branch information
taozhijiang committed Apr 19, 2018
1 parent 73480a1 commit 0c784ee
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.vpj
*.vpw
*.vpwhist
*.vpwhistu
*.vtg
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ Another RabbitMQ C++ client wrapper based on librabbitmq-c.
Already used in production environment.

No document, please read examples for reference, but I really recommend it to you!

Append: Better usage example, please refer to the project [tibank](https://github.com/taozhijiang/tibank).
22 changes: 13 additions & 9 deletions RabbitMQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ amqp_channel_t RabbitMQHelper::createChannel() {
return -1;
}

boost::shared_ptr<RabbitChannel> pChannel;
std::shared_ptr<RabbitChannel> pChannel;
pChannel.reset(new RabbitChannel(t, *this));
if (!pChannel || pChannel->initChannel() < 0) {
freeChannelId(t);
Expand All @@ -70,7 +70,7 @@ amqp_channel_t RabbitMQHelper::createChannel() {
}

int RabbitMQHelper::closeChannel(amqp_channel_t channel){
std::map<amqp_channel_t, boost::shared_ptr<RabbitChannel> >::iterator it;
std::map<amqp_channel_t, std::shared_ptr<RabbitChannel> >::iterator it;

it = channels_.find(channel);
if (it == channels_.end()) {
Expand All @@ -82,7 +82,7 @@ int RabbitMQHelper::closeChannel(amqp_channel_t channel){
}

int RabbitMQHelper::freeChannel(amqp_channel_t channel) {
std::map<amqp_channel_t, boost::shared_ptr<RabbitChannel> >::iterator it;
std::map<amqp_channel_t, std::shared_ptr<RabbitChannel> >::iterator it;

it = channels_.find(channel);
if (it == channels_.end()) {
Expand All @@ -96,7 +96,7 @@ int RabbitMQHelper::freeChannel(amqp_channel_t channel) {
}

bool RabbitMQHelper::isChannelOpen(amqp_channel_t channel) {
std::map<amqp_channel_t, boost::shared_ptr<RabbitChannel> >::iterator it;
std::map<amqp_channel_t, std::shared_ptr<RabbitChannel> >::iterator it;
it = channels_.find(channel);
if (it == channels_.end())
return false;
Expand All @@ -116,7 +116,7 @@ void RabbitMQHelper::closeConnection() {
//
// though when connection close, close all channel. though the lib did these implicity
//
std::map<amqp_channel_t, boost::shared_ptr<RabbitChannel> >::iterator it;
std::map<amqp_channel_t, std::shared_ptr<RabbitChannel> >::iterator it;
for (it=channels_.begin(); it!=channels_.end(); ++it) {
if (it->second)
it->second->closeChannel();
Expand Down Expand Up @@ -703,10 +703,10 @@ int RabbitChannel::basicPublish(const std::string &exchange_name,
* wasn't routed to a queue, so the message is returned */
// read the return message
{
amqp_message_t message;
amqp_rpc_reply_t res = amqp_read_message(mqHelper_.connection_, frame.channel, &message, 0);
amqp_message_t message_dummy;
amqp_rpc_reply_t res = amqp_read_message(mqHelper_.connection_, frame.channel, &message_dummy, 0);
if (AMQP_RESPONSE_NORMAL == res.reply_type)
amqp_destroy_message(&message);
amqp_destroy_message(&message_dummy);
}
LOG_API("basic.return called!");
goto connection_err;
Expand Down Expand Up @@ -843,7 +843,11 @@ int RabbitChannel::basicConsume(const std::string &queue,

amqp_basic_consume_t consume = {};
consume.queue = amqp_cstring_bytes(queue.c_str());
consume.consumer_tag = amqp_cstring_bytes(consumer_tag.c_str());
if (consumer_tag.empty() || consumer_tag == "*") {
consume.consumer_tag = amqp_empty_bytes;
} else {
consume.consumer_tag = amqp_cstring_bytes(consumer_tag.c_str());
}
consume.no_local = no_local;
consume.no_ack = no_ack;
consume.exclusive = exclusive;
Expand Down
6 changes: 3 additions & 3 deletions RabbitMQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <set>
#include <map>

#include <boost/shared_ptr.hpp>
#include <memory>
#include <boost/function.hpp>

#define LOG_API printf
Expand Down Expand Up @@ -76,7 +76,7 @@ struct RabbitMessage {
};


typedef boost::shared_ptr<RabbitChannel> RabbitChannelPtr;
typedef std::shared_ptr<RabbitChannel> RabbitChannelPtr;
typedef boost::function<bool (RabbitChannelPtr, void*)> RabbitChannelSetupFunc;

class RabbitMQHelper {
Expand Down Expand Up @@ -188,7 +188,7 @@ class RabbitMQHelper {

amqp_channel_t max_channel_id_;
std::set<amqp_channel_t> channel_ids_;
std::map<amqp_channel_t, boost::shared_ptr<RabbitChannel> > channels_;
std::map<amqp_channel_t, std::shared_ptr<RabbitChannel> > channels_;
};


Expand Down

0 comments on commit 0c784ee

Please sign in to comment.