Skip to content

Commit

Permalink
RSocketServer Setup callback accepts ctor params (#506)
Browse files Browse the repository at this point in the history
* adding RSocketNetworkStats interface for on{Connected, Disconnected, Closed} callbacks

* accpting network stats in the RSocketClient ctor

* Separating RSocket connection lifetime management into RSocketConnectionManager

* adding RSocketConnectionManager to RSocketClient

* RSocketServer Setup callback accepts ctor params
  • Loading branch information
lehecka authored Jun 7, 2017
1 parent 091dfb2 commit c0ce719
Show file tree
Hide file tree
Showing 23 changed files with 254 additions and 144 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ add_library(
src/internal/ScheduledSingleSubscription.cpp
src/internal/ScheduledSingleSubscription.h
src/internal/RSocketConnectionManager.cpp
src/internal/RSocketConnectionManager.h)
src/internal/RSocketConnectionManager.h
src/RSocketSetup.cpp
src/RSocketSetup.h)

target_include_directories(ReactiveSocket PUBLIC "${PROJECT_SOURCE_DIR}/yarpl/include")
target_include_directories(ReactiveSocket PUBLIC "${PROJECT_SOURCE_DIR}/yarpl/src")
Expand Down
10 changes: 5 additions & 5 deletions examples/channel-hello-world/ChannelHelloWorld_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ using namespace yarpl::flowable;

DEFINE_int32(port, 9898, "port to connect to");

class HelloChannelRequestHandler : public rsocket::RSocketResponder {
class HelloChannelRequestResponder : public rsocket::RSocketResponder {
public:
/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<Flowable<rsocket::Payload>> handleRequestChannel(
Expand Down Expand Up @@ -49,13 +49,13 @@ int main(int argc, char* argv[]) {
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));

// global request handler
auto handler = std::make_shared<HelloChannelRequestHandler>();
// global request responder
auto responder = std::make_shared<HelloChannelRequestResponder>();

auto* rawRs = rs.get();
auto serverThread = std::thread([rawRs, handler] {
auto serverThread = std::thread([rawRs, responder] {
// start accepting connections
rawRs->startAndPark([handler](auto& setupParams) { return handler; });
rawRs->startAndPark([responder](RSocketSetup& setup) { setup.createRSocket(responder); });
});

// Wait for a newline on the console to terminate the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ int main(int argc, char* argv[]) {
// RSocket server accepting on TCP
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));
// global request handlers
auto textHandler = std::make_shared<TextRequestHandler>();
auto jsonHandler = std::make_shared<JsonRequestHandler>();
// global request responders
auto textResponder = std::make_shared<TextRequestResponder>();
auto jsonResponder = std::make_shared<JsonRequestResponder>();
// start accepting connections
rs->startAndPark(
[textHandler, jsonHandler](auto& setupParams)
-> std::shared_ptr<RSocketResponder> {
if (setupParams.dataMimeType == "text/plain") {
[textResponder, jsonResponder](auto& setup) {
if (setup.params().dataMimeType == "text/plain") {
LOG(INFO) << "Connection Request => text/plain MimeType";
return textHandler;
} else if (setupParams.dataMimeType == "application/json") {
setup.createRSocket(textResponder);
} else if (setup.params().dataMimeType == "application/json") {
LOG(INFO) << "Connection Request => application/json MimeType";
return jsonHandler;
setup.createRSocket(jsonResponder);
} else {
LOG(INFO) << "Connection Request => Unsupported MimeType"
<< setupParams.dataMimeType;
throw UnsupportedSetupError("Unknown MimeType");
<< setup.params().dataMimeType;
setup.error(UnsupportedSetupError("Unknown MimeType"));
}
});
}
4 changes: 2 additions & 2 deletions examples/conditional-request-handling/JsonRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ using namespace yarpl::flowable;

/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<Flowable<rsocket::Payload>>
JsonRequestHandler::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "JsonRequestHandler.handleRequestStream " << request;
JsonRequestResponder::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "JsonRequestResponder.handleRequestStream " << request;

// string from payload data
auto requestString = request.moveDataToString();
Expand Down
2 changes: 1 addition & 1 deletion examples/conditional-request-handling/JsonRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "src/Payload.h"
#include "src/RSocket.h"

class JsonRequestHandler : public rsocket::RSocketResponder {
class JsonRequestResponder : public rsocket::RSocketResponder {
public:
/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<yarpl::flowable::Flowable<rsocket::Payload>>
Expand Down
4 changes: 2 additions & 2 deletions examples/conditional-request-handling/TextRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ using namespace yarpl::flowable;

/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<Flowable<rsocket::Payload>>
TextRequestHandler::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "TextRequestHandler.handleRequestStream " << request;
TextRequestResponder::handleRequestStream(Payload request, StreamId streamId) {
LOG(INFO) << "TextRequestResponder.handleRequestStream " << request;

// string from payload data
auto requestString = request.moveDataToString();
Expand Down
2 changes: 1 addition & 1 deletion examples/conditional-request-handling/TextRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "src/Payload.h"
#include "src/RSocket.h"

class TextRequestHandler : public rsocket::RSocketResponder {
class TextRequestResponder : public rsocket::RSocketResponder {
public:
/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<yarpl::flowable::Flowable<rsocket::Payload>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using namespace yarpl::single;

DEFINE_int32(port, 9898, "port to connect to");

class HelloFireAndForgetRequestHandler : public rsocket::RSocketResponder {
class HelloFireAndForgetRequestResponder : public rsocket::RSocketResponder {
public:
void handleFireAndForget(Payload request, StreamId streamId) override {
std::cout << "Received fireAndForget: " << request.moveDataToString()
Expand All @@ -37,13 +37,13 @@ int main(int argc, char* argv[]) {
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));

// global request handler
auto handler = std::make_shared<HelloFireAndForgetRequestHandler>();
// global request responder
auto responder = std::make_shared<HelloFireAndForgetRequestResponder>();

auto rawRs = rs.get();
auto serverThread = std::thread([=] {
// start accepting connections
rawRs->startAndPark([handler](auto& setupParams) { return handler; });
rawRs->startAndPark([responder](auto& setup) { setup.createRSocket(responder); });
});

// Wait for a newline on the console to terminate the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ using namespace yarpl::single;

DEFINE_int32(port, 9898, "port to connect to");

class HelloRequestResponseRequestHandler : public rsocket::RSocketResponder {
class HelloRequestResponseRequestResponder : public rsocket::RSocketResponder {
public:
Reference<Single<Payload>> handleRequestResponse(
Payload request,
StreamId streamId) override {
std::cout << "HelloRequestResponseRequestHandler.handleRequestResponse "
std::cout << "HelloRequestResponseRequestResponder.handleRequestResponse "
<< request << std::endl;

// string from payload data
Expand Down Expand Up @@ -52,13 +52,13 @@ int main(int argc, char* argv[]) {
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));

// global request handler
auto handler = std::make_shared<HelloRequestResponseRequestHandler>();
// global request responder
auto responder = std::make_shared<HelloRequestResponseRequestResponder>();

auto rawRs = rs.get();
auto serverThread = std::thread([=] {
// start accepting connections
rawRs->startAndPark([handler](auto& setupParams) { return handler; });
rawRs->startAndPark([responder](auto& setup) { setup.createRSocket(responder); });
});

// Wait for a newline on the console to terminate the server.
Expand Down
10 changes: 5 additions & 5 deletions examples/stream-hello-world/StreamHelloWorld_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ using namespace yarpl::flowable;

DEFINE_int32(port, 9898, "port to connect to");

class HelloStreamRequestHandler : public rsocket::RSocketResponder {
class HelloStreamRequestResponder : public rsocket::RSocketResponder {
public:
/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<Flowable<rsocket::Payload>> handleRequestStream(
rsocket::Payload request,
rsocket::StreamId streamId) override {
std::cout << "HelloStreamRequestHandler.handleRequestStream " << request
std::cout << "HelloStreamRequestResponder.handleRequestStream " << request
<< std::endl;

// string from payload data
Expand Down Expand Up @@ -50,13 +50,13 @@ int main(int argc, char* argv[]) {
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));

// global request handler
auto handler = std::make_shared<HelloStreamRequestHandler>();
// global request responder
auto responder = std::make_shared<HelloStreamRequestResponder>();

auto rawRs = rs.get();
auto serverThread = std::thread([=] {
// start accepting connections
rawRs->startAndPark([handler](auto& setupParams) { return handler; });
rawRs->startAndPark([responder](auto& setup) { setup.createRSocket(responder); });
});

// Wait for a newline on the console to terminate the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ using namespace yarpl::observable;

DEFINE_int32(port, 9898, "port to connect to");

class PushStreamRequestHandler : public rsocket::RSocketResponder {
class PushStreamRequestResponder : public rsocket::RSocketResponder {
public:
/// Handles a new inbound Stream requested by the other end.
yarpl::Reference<Flowable<Payload>> handleRequestStream(
Payload request,
rsocket::StreamId streamId) override {
std::cout << "PushStreamRequestHandler.handleRequestStream " << request
std::cout << "PushStreamRequestResponder.handleRequestStream " << request
<< std::endl;

// string from payload data
Expand Down Expand Up @@ -82,13 +82,13 @@ int main(int argc, char* argv[]) {
auto rs = RSocket::createServer(
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));

// global request handler
auto handler = std::make_shared<PushStreamRequestHandler>();
// global request responder
auto responder = std::make_shared<PushStreamRequestResponder>();

auto rawRs = rs.get();
auto serverThread = std::thread([=] {
// start accepting connections
rawRs->startAndPark([handler](auto& setupParams) { return handler; });
rawRs->startAndPark([responder](auto& setup) { setup.createRSocket(responder); });
});

// Wait for a newline on the console to terminate the server.
Expand Down
6 changes: 3 additions & 3 deletions src/RSocketClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ RSocketClient::~RSocketClient() {
VLOG(1) << "Destroying RSocketClient";
}

folly::Future<std::shared_ptr<RSocketRequester>> RSocketClient::connect(
folly::Future<std::unique_ptr<RSocketRequester>> RSocketClient::connect(
SetupParameters setupParameters,
std::shared_ptr<RSocketResponder> responder,
std::unique_ptr<KeepaliveTimer> keepaliveTimer,
std::shared_ptr<RSocketStats> stats,
std::shared_ptr<RSocketNetworkStats> networkStats) {
VLOG(2) << "Starting connection";

folly::Promise<std::shared_ptr<RSocketRequester>> promise;
folly::Promise<std::unique_ptr<RSocketRequester>> promise;
auto future = promise.getFuture();

connectionFactory_->connect([
Expand Down Expand Up @@ -71,7 +71,7 @@ folly::Future<std::shared_ptr<RSocketRequester>> RSocketClient::connect(

rs->connectClientSendSetup(std::move(connection), std::move(setupParameters));

auto rsocket = std::make_shared<RSocketRequester>(std::move(rs), eventBase);
auto rsocket = std::make_unique<RSocketRequester>(std::move(rs), eventBase);
promise.setValue(std::move(rsocket));
});

Expand Down
2 changes: 1 addition & 1 deletion src/RSocketClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RSocketClient {
* To destruct a single RSocketRequester sooner than the RSocketClient
* call RSocketRequester.close().
*/
folly::Future<std::shared_ptr<RSocketRequester>> connect(
folly::Future<std::unique_ptr<RSocketRequester>> connect(
SetupParameters setupParameters = SetupParameters(),
std::shared_ptr<RSocketResponder> responder = std::shared_ptr<RSocketResponder>(),
std::unique_ptr<KeepaliveTimer> keepaliveTimer = std::unique_ptr<KeepaliveTimer>(),
Expand Down
2 changes: 1 addition & 1 deletion src/RSocketRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RSocketRequester::~RSocketRequester() {

if (stateMachine_) {
eventBase_.add([stateMachine = std::move(stateMachine_)] {
VLOG(2) << "Destroying RSocketStateMachine on EventBase";
VLOG(2) << "Releasing RSocketStateMachine on EventBase";
});
}
}
Expand Down
Loading

0 comments on commit c0ce719

Please sign in to comment.