Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

use queue instead of eio_nop to keep message order

  • Loading branch information...
commit b06abaa2983a2557cec97723b3dd190a162d78a0 1 parent 48f04bc
@hangqi hangqi authored
Showing with 69 additions and 28 deletions.
  1. +58 −27 src/hedwig.cpp
  2. +11 −1 src/hedwig.h
View
85 src/hedwig.cpp
@@ -21,7 +21,6 @@
#include <log4cxx/basicconfigurator.h>
#include <log4cxx/propertyconfigurator.h>
#include <log4cxx/helpers/exception.h>
-#include <set>
#include <hedwig/exceptions.h>
@@ -48,7 +47,7 @@ String::New(var)
#define NUM_LH(var) \
Number::New(var)
-// convert Hedwig::Message (protobuf message) to JS Object
+// Helper to convert Hedwig::Message (protobuf message) to JS Object
static Local<Object> ToJSObject(Hedwig::Message& message)
{
HandleScope scope;
@@ -130,9 +129,11 @@ bool HedwigConfiguration::getBool(const std::string & key, bool defaultVal) cons
inline void HedwigConfiguration::setStr(const std::string &key, const std::string &val) {
strMap.insert(std::pair<std::string, std::string>(key, val));
}
+
inline void HedwigConfiguration::setInt(const std::string &key, int val) {
intMap.insert(std::pair<std::string, int>(key, val));
}
+
inline void HedwigConfiguration::setBool(const std::string &key, bool val) {
boolMap.insert(std::pair<std::string, bool>(key, val));
}
@@ -217,6 +218,16 @@ HedwigMessageHandler::HedwigMessageHandler(JSWPtr &jsCallback)
HedwigMessageHandler::~HedwigMessageHandler() {
}
+std::queue<EIOConsumeData*> HedwigMessageHandler::req_msg_queue = std::queue<EIOConsumeData*>();
+pthread_mutex_t HedwigMessageHandler::queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+ev_async HedwigMessageHandler::ev_hedwig_consume_notifier;
+
+void HedwigMessageHandler::Init() {
+ ev_async_init(&ev_hedwig_consume_notifier, HedwigMessageHandler::AfterConsume);
+ ev_async_start(EV_DEFAULT_UC_ &ev_hedwig_consume_notifier);
+ ev_unref(EV_DEFAULT_UC);
+}
+
void HedwigMessageHandler::consume(const std::string& topic, const std::string& subscriber, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
EIOConsumeData *consumeData = new EIOConsumeData();
consumeData->callback = jsCallback;
@@ -225,39 +236,58 @@ void HedwigMessageHandler::consume(const std::string& topic, const std::string&
consumeData->message = msg;
consumeData->consumeCb = callback;
- LOG4CXX_INFO(logger, "consume called, start eio: " << consumeData->topic << " " << consumeData->subscriber << consumeData->message.msgid().localcomponent());
- eio_nop(EIO_PRI_DEFAULT, EIO_AfterConsume, consumeData);
+ LOG4CXX_INFO(logger, "consume called, push to queue, and notify ev_loop: " << consumeData->topic << " " << consumeData->subscriber << " " << consumeData->message.msgid().localcomponent());
+
+ pthread_mutex_lock(&queue_mutex);
+ req_msg_queue.push(consumeData);
+ pthread_mutex_unlock(&queue_mutex);
+
+ // notify v8 main thread that there is a new message on the queue.
ev_ref(EV_DEFAULT_UC);
+ ev_async_send(EV_DEFAULT_UC_ &ev_hedwig_consume_notifier);
}
-int HedwigMessageHandler::EIO_AfterConsume(eio_req *req) {
+void HedwigMessageHandler::AfterConsume(EV_P_ ev_async *watcher, int revents) {
HandleScope scope;
- ev_unref(EV_DEFAULT_UC);
- EIOConsumeData *consumeData = static_cast<EIOConsumeData *>(req->data);
-
- LOG4CXX_INFO(logger, "about consume callback to JS: " << consumeData->topic << " " << consumeData->subscriber << consumeData->message.msgid().localcomponent());
- Local<Value> argv[4];
- argv[0] = String::New(consumeData->topic.c_str());
- argv[1] = String::New(consumeData->subscriber.c_str());
- argv[2] = HedwigMessage::ToJSObject(consumeData->message);
- // wrap callback
- Local<Value> cbArgv[1];
- cbArgv[0] = External::New(&(consumeData->consumeCb));
- Persistent<Object> jsConsumeCallback(
- OperationCallbackWrapper::constructor_template->GetFunction()->NewInstance(1, cbArgv));
- argv[3] = Local<Object>::New(jsConsumeCallback);
-
- TryCatch try_catch;
+ assert(watcher == &ev_hedwig_consume_notifier);
+ assert(revents == EV_ASYNC);
+
+ // using while here because the ev_async_send is level trigger
+ while (true) {
+ EIOConsumeData *consumeData = NULL;
+ pthread_mutex_lock(&queue_mutex);
+ if (req_msg_queue.size() > 0) {
+ consumeData = req_msg_queue.front();
+ req_msg_queue.pop();
+ }
+ pthread_mutex_unlock(&queue_mutex);
+
+ if (! consumeData)
+ return;
+
+ ev_unref(EV_DEFAULT_UC);
+ LOG4CXX_INFO(logger, "about consume callback to JS: " << consumeData->topic << " " << consumeData->subscriber << " " << consumeData->message.msgid().localcomponent());
+ Local<Value> argv[4];
+ argv[0] = String::New(consumeData->topic.c_str());
+ argv[1] = String::New(consumeData->subscriber.c_str());
+ argv[2] = HedwigMessage::ToJSObject(consumeData->message);
+ // wrap callback
+ Local<Value> cbArgv[1];
+ cbArgv[0] = External::New(&(consumeData->consumeCb));
+ Persistent<Object> jsConsumeCallback(
+ OperationCallbackWrapper::constructor_template->GetFunction()->NewInstance(1, cbArgv));
+ argv[3] = Local<Object>::New(jsConsumeCallback);
- consumeData->callback->Call(Context::GetCurrent()->Global(), 4, argv);
+ TryCatch try_catch;
- if (try_catch.HasCaught()) {
- node::FatalException(try_catch);
- }
+ consumeData->callback->Call(Context::GetCurrent()->Global(), 4, argv);
- delete consumeData;
+ if (try_catch.HasCaught()) {
+ node::FatalException(try_catch);
+ }
- return 0;
+ delete consumeData;
+ }
}
// OperationCallback Wrapper
@@ -592,6 +622,7 @@ extern "C" {
static void init(Handle<Object> target) {
OperationCallbackWrapper::Init(target);
HedwigClient::Init(target);
+ HedwigMessageHandler::Init();
}
NODE_MODULE(hedwig, init);
}
View
12 src/hedwig.h
@@ -24,7 +24,9 @@
#include <hedwig/client.h>
#include <string>
+#include <queue>
#include <map>
+#include <pthread.h>
#define ADD_PROTOTYPE_METHOD(class, name, method) \
class ## _ ## name ## _symbol = NODE_PSYMBOL(#name); \
@@ -137,10 +139,18 @@ class HedwigMessageHandler : public Hedwig::MessageHandlerCallback {
virtual void consume(const std::string& topic, const std::string& subscriber, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback);
- static int EIO_AfterConsume(eio_req *req);
+ static void AfterConsume(EV_P_ ev_async *watcher, int revents);
+
+ static void Init();
protected:
JSWPtr jsCallback;
+
+ // ev_async for consume called in hedwig thread to main ev loop
+ static ev_async ev_hedwig_consume_notifier;
+ // message queue, and related lock
+ static std::queue<EIOConsumeData*> req_msg_queue;
+ static pthread_mutex_t queue_mutex;
};
// OperationCallback Wrapper
Please sign in to comment.
Something went wrong with that request. Please try again.