Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSocket with Flowable. #356

Merged
merged 3 commits into from
Apr 26, 2017
Merged

RSocket with Flowable. #356

merged 3 commits into from
Apr 26, 2017

Conversation

benjchristensen
Copy link
Contributor

Update the RSocket APIs to have RSocketRequester and RSocketRequestHandler using Flowable.

Copy link
Contributor

@lehecka lehecka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change. I didn't find any issues. It looks you will have to rebase anyways so you can address the nits...

LOG(INFO) << "JsonRequestHandler.handleRequestStream " << request;

// string from payload data
auto pds = request.moveDataToString();
auto requestString = std::string(pds, request.data->length());
const char* p = reinterpret_cast<const char*>(request.data->data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess you got tired of writing this code so you eventually created Payload::cloneDataToString method.
Do you want to use it over here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and all other callers) can just use Payload::moveDataToString() as they consume the Payload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question in light of this portion of code.

I created cloneDataToString() because I couldn't get it to work in places where const was required, so I needed a method that copied instead of moved from a const Payload.

The code in question is in ExampleSubscriber:

void ExampleSubscriber::onNext(const Payload& element) noexcept {

That expects a const ref, and because of that I can't move data from it.

screen shot 2017-04-25 at 1 16 02 pm

LOG(INFO) << "TextRequestHandler.handleRequestStream " << request;

// string from payload data
auto pds = request.moveDataToString();
auto requestString = std::string(pds, request.data->length());
const char* p = reinterpret_cast<const char*>(request.data->data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here

LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request;

// string from payload data
const char* p = reinterpret_cast<const char*>(request.data->data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

yarpl::Reference<yarpl::Flowable<reactivesocket::Payload>>
HelloStreamRequestHandler::handleRequestStream(
reactivesocket::Payload request,
reactivesocket::StreamId streamId) {
LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request;

// string from payload data
const char* p = reinterpret_cast<const char*>(request.data->data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

* nature of RSocket, this can be used on the client as well.
*/
class RSocketRequestHandler {
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to see how this interface doesn't have to do anything with setup/resume initialization.

@@ -43,6 +43,13 @@ std::string Payload::moveDataToString() {
return data->moveToFbString().toStdString();
}

std::string Payload::cloneDataToString() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementing the << overload is a bit more idiomatic (and yields nicer code). Like here: https://github.com/ReactiveSocket/reactivesocket-cpp/blob/f1b862a23bc951bbe5869d03321eafa4f583a2a1/src/Common.h#L32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was matching the moveDataToString signature. Seems confusing to have one as a name with "move" and another with an operator overload.

Copy link
Contributor Author

@benjchristensen benjchristensen Apr 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I did not mean we should not have the << overload. That would be great to have.

It is somewhat confusing though since this type has both data and metadata. So it seems it would have to work for only one of them, or concat them.

@benjchristensen
Copy link
Contributor Author

server

$ ./example_conditional-request-handling-server 
I0425 13:31:09.476050 2478965696 RSocketServer.cpp:106] RSocketServer => initialize connection acceptor on start
I0425 13:31:09.477674 2478965696 RSocketServer.cpp:108] RSocketServer => initialize connection acceptor on start
I0425 13:31:09.480690 2478965696 TcpConnectionAcceptor.cpp:87] Starting TCP listener on port 9898 with 1 request threads
I0425 13:31:09.480919 2478965696 TcpConnectionAcceptor.cpp:111] ConnectionAcceptor => leave start
I0425 13:31:09.488426 106487808 TcpConnectionAcceptor.cpp:107] Listening on [::]:9898

client

$ ./example_conditional-request-handling-client 
I0425 13:31:31.763978 2478965696 TcpConnectionFactory.cpp:91] ConnectionFactory creation => host: localhost port: 9898
I0425 13:31:31.769127 2478965696 RSocketClient.cpp:16] RSocketClient => created
I0425 13:31:31.769189 2478965696 RSocketClient.cpp:20] RSocketClient => start connection with Future
I0425 13:31:31.769390 192020480 TcpConnectionFactory.cpp:35] ConnectionFactory => starting socket
I0425 13:31:31.769476 192020480 TcpConnectionFactory.cpp:38] ConnectionFactory => attempting connection to [::1]:9898
I0425 13:31:31.770026 192020480 TcpConnectionFactory.cpp:43] ConnectionFactory  => DONE connect
I0425 13:31:31.770179 192020480 TcpConnectionFactory.cpp:54] ConnectionFactory => socketCallback => Success
I0425 13:31:31.770336 192020480 RSocketClient.cpp:27] RSocketClient => onConnect received DuplexConnection
I0425 13:31:31.772184 192020480 TcpConnectionFactory.cpp:29] SocketConnectorAndCallback => destroy
I0425 13:31:31.772197 2478965696 ConditionalRequestHandling_Client.cpp:30] ------------------ Hello Bob!
I0425 13:31:31.772361 2478965696 ExampleSubscriber.cpp:18] ExampleSubscriber 0x61000000f640 created with =>   Initial Request: 5  Threshold for re-request: 3  Num to Take: 6
I0425 13:31:31.772508 2478965696 ExampleSubscriber.cpp:63] ExampleSubscriber 0x61000000f640 block thread
I0425 13:31:31.772656 192020480 ExampleSubscriber.cpp:26] ExampleSubscriber 0x61000000f640 onSubscribe
I0425 13:31:31.774497 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f640 onNext as string: Hello Bob 1!
I0425 13:31:31.774621 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f640 onNext as string: Hello Bob 2!
I0425 13:31:31.774646 192020480 ExampleSubscriber.cpp:38] ExampleSubscriber 0x61000000f640 requesting 2 more items
I0425 13:31:31.774693 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f640 onNext as string: Hello Bob 3!
I0425 13:31:31.774718 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f640 onNext as string: Hello Bob 4!
I0425 13:31:31.774751 192020480 ExampleSubscriber.cpp:38] ExampleSubscriber 0x61000000f640 requesting 2 more items
I0425 13:31:31.774780 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f640 onNext as string: Hello Bob 5!
I0425 13:31:31.774827 192020480 ExampleSubscriber.cpp:51] ExampleSubscriber 0x61000000f640 onComplete
I0425 13:31:31.774896 2478965696 ExampleSubscriber.cpp:68] ExampleSubscriber 0x61000000f640 unblocked
I0425 13:31:31.774931 2478965696 ConditionalRequestHandling_Client.cpp:37] ------------------ Hello Jane!
I0425 13:31:31.775010 192020480 ExampleSubscriber.cpp:51] ExampleSubscriber 0x61000000f640 onComplete
I0425 13:31:31.775017 2478965696 ExampleSubscriber.cpp:18] ExampleSubscriber 0x61000000f540 created with =>   Initial Request: 5  Threshold for re-request: 3  Num to Take: 6
I0425 13:31:31.775081 2478965696 ExampleSubscriber.cpp:63] ExampleSubscriber 0x61000000f540 block thread
I0425 13:31:31.775146 192020480 ExampleSubscriber.cpp:26] ExampleSubscriber 0x61000000f540 onSubscribe
I0425 13:31:31.776119 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f540 onNext as string: Hello Jane 1!
I0425 13:31:31.776248 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f540 onNext as string: Hello Jane 2!
I0425 13:31:31.776273 192020480 ExampleSubscriber.cpp:38] ExampleSubscriber 0x61000000f540 requesting 2 more items
I0425 13:31:31.776554 192020480 ExampleSubscriber.cpp:33] ExampleSubscriber 0x61000000f540 onNext as string: Hello Jane 3!
I0425 13:31:31.776582 192020480 ExampleSubscriber.cpp:51] ExampleSubscriber 0x61000000f540 onComplete
I0425 13:31:31.776702 2478965696 ExampleSubscriber.cpp:68] ExampleSubscriber 0x61000000f540 unblocked
I0425 13:31:31.776738 2478965696 RSocketClient.cpp:53] RSocketClient => destroy
I0425 13:31:31.776820 192020480 ExampleSubscriber.cpp:51] ExampleSubscriber 0x61000000f540 onComplete
I0425 13:31:31.776859 192020480 RSocketRequester.cpp:21] RSocketRequester => destroy on EventBase
I0425 13:31:31.776922 192020480 RSocketRequester.cpp:37] RSocketRequester => destroy
I0425 13:31:31.777216 2478965696 TcpConnectionFactory.cpp:97] ConnectionFactory => destroy

the logs from server processing the client request:

I0425 13:31:31.770418 107024384 TcpConnectionAcceptor.cpp:25] Accepting TCP connection on FD 13
I0425 13:31:31.770781 107024384 RSocketServer.cpp:155] RSocketServer => received new connection
I0425 13:31:31.770840 107024384 RSocketServer.cpp:157] RSocketServer => going to accept duplex connection
I0425 13:31:31.772869 107024384 RSocketServer.cpp:114] RSocketServer => received new setup payload
I0425 13:31:31.772949 107024384 ConditionalRequestHandling_Server.cpp:39] Connection Request => text/plain MimeType
I0425 13:31:31.772980 107024384 RSocketServer.cpp:128] RSocketServer => received request handler
I0425 13:31:31.773599 107024384 TextRequestHandler.cpp:15] TextRequestHandler.handleRequestStream [metadata: <null> data: 3]
I0425 13:31:31.775804 107024384 TextRequestHandler.cpp:15] TextRequestHandler.handleRequestStream [metadata: <null> data: 4]
I0425 13:31:31.778566 107024384 RSocketServer.cpp:192] Removed ReactiveSocket

@benjchristensen
Copy link
Contributor Author

Example code showing how Flowable can now be used:

class HelloStreamRequestHandler : public rsocket::RSocketRequestHandler {
 public:
  /// Handles a new inbound Stream requested by the other end.
  yarpl::Reference<yarpl::Flowable<reactivesocket::Payload>>
  handleRequestStream(
      reactivesocket::Payload request,
      reactivesocket::StreamId streamId) override {
    LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request;

    // string from payload data
    auto requestString = request.moveDataToString();

    return Flowables::range(1, 10)->map([name = std::move(requestString)](
        int64_t v) {
      std::stringstream ss;
      ss << "Hello " << name << " " << v << "!";
      std::string s = ss.str();
      return Payload(s, "metadata");
    });
  }
};

@facebook-github-bot
Copy link

@lehecka has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

@lehecka lehecka changed the title RSocket with Flowable RSocket with Flowable. Apr 26, 2017
@lehecka lehecka merged commit a8ef49e into rsocket:master Apr 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants