-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for progressive call results #9
Comments
Thank you for your support and interest. Progressive call results is a key feature and is fairly high on my list. I suspect that it is at most a fews days of coding and testing to add to bonefish. I should have some spare time over the next few weeks to make it happen. |
\o/ |
Looks like I will have time on Saturday to start looking into this. |
We're going to hold you to that, Dave! My recommendation is underpromise & overdeliver :) |
I got this work started on Sat. I should be able to wrap it up next weekend. |
Dave, if this requires more work still, can you push your (presumably unfinished) code to your branch so potentially interested parties could take it from there? |
Sure thing. At some point tomorrow I should be able to push what I have. On Thu, Jan 7, 2016 at 5:35 PM, Jakob Petsovits notifications@github.com
|
Awesome, thanks. |
Still working on it. Just need a bit of time over the weekend to finish On Fri, Jan 8, 2016 at 12:50 PM, Pramukta Kumar notifications@github.com
|
@pramukta @estan I have pushed a patch to https://github.com/davidchappelle/bonefish.git for progressive call result support. Once you clone that repo just checkout the feature/progressive_call_results branch. I haven't had a chance to do much in terms of testing as I am on the road now until 11:30pm. I have a corresponding autobahn-cpp commit in progress as well and should have that in tomorrow. If someone wants to test things out with autobahn-python in the meantime that would be great. |
This is embarrassing, but I'm actually having trouble checking out the branch. It's not visible in
Anyone know what the problem might be? The branch is clearly visible in GitHub UI. |
@estan, probably because it's in Dave's own fork :) you could add his repot as a remote. |
@jrogers facepalm. Thanks :) |
@davidchappelle, I may be wrong, but shouldn't be |
Yes you are correct. On Saturday, January 9, 2016, Elvis Stansvik notifications@github.com
|
Alright, if I change that to
Haven't quite figured out what happens. I'm testing right now with the |
The debug output from bonefish when running
|
The result object doesn't look right. On Saturday, January 9, 2016, Elvis Stansvik notifications@github.com
|
Looks like the result details are invalid. Maybe not getting set correctly. On Saturday, January 9, 2016, Elvis Stansvik notifications@github.com
|
I haven't used msgpack before, but this:
Shouldn't that be |
If I change that, it seems to get a little further in processing the YIELDs:
|
Yep good catch. Apologies for the bugs. Time crunch this morning getting On Saturday, January 9, 2016, Elvis Stansvik notifications@github.com
|
That is, it seems it's able to send the first RESULT to the caller:
But after that, the invocation seems gone in the router. |
Heh, no problem :) |
I guess it's the unconditional erasure of the invocation at the end of |
Anyway, I gotto go to bed, but this looks promising. Thanks! |
I just had to quickly test, and with diff --git a/src/bonefish/dealer/wamp_dealer.cpp b/src/bonefish/dealer/wamp_dealer.cpp
index ecd67c6..1a179a0 100644
--- a/src/bonefish/dealer/wamp_dealer.cpp
+++ b/src/bonefish/dealer/wamp_dealer.cpp
@@ -466,7 +466,7 @@ void wamp_dealer::process_yield_message(const wamp_session_id& session_id,
std::unique_ptr<wamp_result_message> result_message(
new wamp_result_message(std::move(yield_message->release_zone())));
result_message->set_request_id(dealer_invocation->get_request_id());
- result_message->set_details(result_details.marshal(yield_message->get_zone()));
+ result_message->set_details(result_details.marshal(result_message->get_zone()));
result_message->set_arguments(yield_message->get_arguments());
result_message->set_arguments_kw(yield_message->get_arguments_kw());
@@ -483,9 +483,11 @@ void wamp_dealer::process_yield_message(const wamp_session_id& session_id,
// will detach the session. When this happens the pending invocations
// be cleaned up. So we don't use an iterator here to erase the pending
// invocation because it may have just been invalidated above.
- m_pending_callee_invocations[session_id].erase(request_id);
- m_pending_caller_invocations[session->get_session_id()].erase(request_id);
- m_pending_invocations.erase(request_id);
+ if (!yield_options.get_option_or("progress", false)) {
+ m_pending_callee_invocations[session_id].erase(request_id);
+ m_pending_caller_invocations[session->get_session_id()].erase(request_id);
+ m_pending_invocations.erase(request_id);
+ }
}
void wamp_dealer::send_error(const std::unique_ptr<wamp_transport>& transport,
diff --git a/src/bonefish/messages/wamp_result_message.hpp b/src/bonefish/messages/wamp_result_message.hpp
index 3ec1af1..802a591 100644
--- a/src/bonefish/messages/wamp_result_message.hpp
+++ b/src/bonefish/messages/wamp_result_message.hpp
@@ -163,7 +163,7 @@ inline void wamp_result_message::set_request_id(const wamp_request_id& request_i
inline void wamp_result_message::set_details(const msgpack::object& details)
{
- if (details.type != msgpack::type::MAP) {
+ if (details.type == msgpack::type::MAP) {
m_details = msgpack::object(details, get_zone());
} else {
throw std::invalid_argument("invalid details"); it seems to work:
But I have no idea if the |
@davidchappelle, I'm a little unsure on how important the order of things in If it's not important that they are removed there and then, then I think the function could be simplified a little with: diff --git a/src/bonefish/dealer/wamp_dealer.cpp b/src/bonefish/dealer/wamp_dealer.cpp
index ecd67c6..c86a9e7 100644
--- a/src/bonefish/dealer/wamp_dealer.cpp
+++ b/src/bonefish/dealer/wamp_dealer.cpp
@@ -435,11 +435,12 @@ void wamp_dealer::process_yield_message(const wamp_session_id& session_id,
// We can't have a pending invocation without a pending caller/callee
// as they are tracked in a synchronized manner. So to have one without
// the other is considered to be an error.
- auto pending_callee_invocations_itr =
- m_pending_callee_invocations.find(session_itr->second->get_session_id());
- if (pending_callee_invocations_itr == m_pending_callee_invocations.end()) {
+ if (!m_pending_callee_invocations.count(session_itr->second->get_session_id())) {
throw std::logic_error("dealer pending callee invocations out of sync");
}
+ if (!m_pending_caller_invocations.count(session->get_session_id())) {
+ throw std::logic_error("dealer pending caller invocations out of sync");
+ }
wamp_yield_options yield_options;
yield_options.unmarshal(yield_message->get_options());
@@ -452,21 +453,12 @@ void wamp_dealer::process_yield_message(const wamp_session_id& session_id,
wamp_result_details result_details;
if (yield_options.get_option_or("progress", false)) {
result_details.set_detail("progress", true);
- } else {
- pending_callee_invocations_itr->second.erase(request_id);
}
- auto pending_caller_invocations_itr =
- m_pending_caller_invocations.find(session->get_session_id());
- if (pending_caller_invocations_itr == m_pending_caller_invocations.end()) {
- throw std::logic_error("dealer pending caller invocations out of sync");
- }
- pending_caller_invocations_itr->second.erase(request_id);
-
std::unique_ptr<wamp_result_message> result_message(
new wamp_result_message(std::move(yield_message->release_zone())));
result_message->set_request_id(dealer_invocation->get_request_id());
- result_message->set_details(result_details.marshal(yield_message->get_zone()));
+ result_message->set_details(result_details.marshal(result_message->get_zone()));
result_message->set_arguments(yield_message->get_arguments());
result_message->set_arguments_kw(yield_message->get_arguments_kw());
@@ -483,9 +475,11 @@ void wamp_dealer::process_yield_message(const wamp_session_id& session_id,
// will detach the session. When this happens the pending invocations
// be cleaned up. So we don't use an iterator here to erase the pending
// invocation because it may have just been invalidated above.
- m_pending_callee_invocations[session_id].erase(request_id);
- m_pending_caller_invocations[session->get_session_id()].erase(request_id);
- m_pending_invocations.erase(request_id);
+ if (!yield_options.get_option_or("progress", false)) {
+ m_pending_callee_invocations[session_id].erase(request_id);
+ m_pending_caller_invocations[session->get_session_id()].erase(request_id);
+ m_pending_invocations.erase(request_id);
+ }
}
void wamp_dealer::send_error(const std::unique_ptr<wamp_transport>& transport, |
It's not terribly nice with the two |
Also, those two tests that check that |
I mean, so the two tests would be something like // We can't have a pending invocation without a pending caller/callee
// as they are tracked in a synchronized manner. So to have one without
// the other is considered to be an error.
auto pending_callee_invocations_itr =
m_pending_callee_invocations.find(session_itr->second->get_session_id());
if (pending_callee_invocations_itr == m_pending_callee_invocations.end() ||
!pending_callee_invocations_itr->second.count(request_id)) {
throw std::logic_error("dealer pending callee invocations out of sync");
}
auto pending_caller_invocations_itr =
m_pending_caller_invocations.find(session->get_session_id());
if (pending_caller_invocations_itr == m_pending_caller_invocations.end() ||
!pending_caller_invocations_itr->second.count(request_id)) {
throw std::logic_error("dealer pending caller invocations out of sync");
} |
The reason that the three erases were called where they were was to keep These 2 calls at the bottom of the method seem to have been redundant:
m_pending_caller_invocations[session->get_session_id()].erase(request_id); So they can just be removed. Also, good catch on checking the request_id. Can you create a pull request for my fork so that I can pull in your On Sun, Jan 10, 2016 at 8:08 AM, Elvis Stansvik notifications@github.com
|
Ah. I'll create a pull request. But, regarding making the request_id check at the top an assert. If the comment at that check is true, perhaps that's not a good idea? The comment reads: // It is considered to be a normal condition if we cannot find the
// associated invocation. Typically this occurs if the invocation
// timed out or if the callers session has ended.
BONEFISH_TRACE("%1%, %2%", *session_itr->second % *yield_message);
const auto request_id = yield_message->get_request_id();
auto pending_invocations_itr = m_pending_invocations.find(request_id);
if (pending_invocations_itr == m_pending_invocations.end()) {
BONEFISH_TRACE("unable to find invocation ... timed out or session closed");
return;
} |
Or you meant asserts would be more correct for the |
Okay, created a pull request: https://github.com/davidchappelle/bonefish/pull/1 Let me know there if I misunderstood something. |
Yes for the other invocation checks. They are conditions that would On Sun, Jan 10, 2016 at 11:23 AM, Elvis Stansvik notifications@github.com
|
Great. I changed them to asserts in the PR. |
Hey, so have been testing out the progressive call results branch on @davidchappelle 's account and found an interesting bug. I'm still investigating, trying to get an isolated case but the error looks like this:
The context for this is a WAMP connection between autobahn|js and bonefish, with the rpc endpoint being provided by autobahn|python. This means the frontend serialization scheme is JSON, while the backend scheme is msgpack. The message_id (25) looks like it is being sent out as big endian and then read in as a little endian uint64 or some such since:
(sorry python code) I'm still attempting to isolate a small test case but figured this much info is worth sharing until then in case the problem or solution is obvious to someone else. Thanks. |
Since bonefish uses msgpack-c for all internal serialization, the On Fri, Jan 29, 2016 at 4:29 PM, Pramukta Kumar notifications@github.com
|
Yeah, weird that it would show up for progressive call results and not the regular ones. I'll continue to look. |
Hmm, this may just be a logging artifact too. |
The request id comes from the cached dealer invocation. Would be curious to
// We can't have a pending invocation without a pending caller/callee assert(m_pending_callee_invocations.count(session_itr->second->get_session_id()));
wamp_result_message(std::move(yield_message->release_zone())));
On Fri, Jan 29, 2016 at 4:37 PM, Pramukta Kumar notifications@github.com
|
Also, running bonefish under valgrind might also be helpful in finding the
On Fri, Jan 29, 2016 at 4:44 PM, David Chappelle chappedm@gmail.com wrote:
|
Sorry, got distracted with other things but was able to confirm that forcing both sides to use JSON (with the bonefish |
okay so those ids I posted don't even match so that was a completely ridiculous dead end. Not sure what the deal is. |
If you can post detailed steps on how to reproduce I can probably take a On Mon, Feb 1, 2016 at 4:48 PM, Pramukta Kumar notifications@github.com
|
Okay, I think I finally have the test case. Here's a slightly modified version of the autobahn progress example: bonefish-progressive-call-error.zip Here's how to reproduce:
The I've tried this test case on Mac OS X with nodejs v5.3, though the first time error I saw was with bonefish and python running on Linux. I was also able to see the error with everything running on Linux, but haven't run the test case in that env yet. |
Of course just as I send that, new info. changing the "value" key to a unicode string makes it work. Update: Returning raw string values from anything on the AutobahnPython (msgpack) side to a AutobahnJS (JSON) component results in the serialization error. Was able to reproduce by removing some of the |
Good to know @pramukta I think I will put in some better handling for the unhandled exception that you are seeing. Then I will put together a pull request. |
Great to finally see this in the light David (and all you others of course)! It looks great.
We're currently using Crossbar at work, but treating it as a "black box" and not making use of its application component hosting features (and many other features), and using another router which is more slim is thus of interest to us.
One thing we need though from the Advanced Profile is progressive call result (a stable part), as we're using this to deliver frames from an image detector. I don't know how high up on your agenda this is, but I'm filing this issue to keep track.
The text was updated successfully, but these errors were encountered: