Skip to content

Commit

Permalink
Fix channel (#475)
Browse files Browse the repository at this point in the history
* fixing rsocket/examples

* fixing implementation of the channel state machine

* PublisherBase.cpp
  • Loading branch information
lehecka committed May 22, 2017
1 parent 1a344a8 commit 80aa1e6
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 454 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ add_library(
src/statemachine/ChannelResponder.h
src/statemachine/ConsumerBase.cpp
src/statemachine/ConsumerBase.h
src/statemachine/PublisherBase.cpp
src/statemachine/PublisherBase.h
src/statemachine/RequestResponseRequester.cpp
src/statemachine/RequestResponseRequester.h
Expand Down
212 changes: 79 additions & 133 deletions src/statemachine/ChannelRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,180 +8,126 @@ namespace rsocket {
using namespace yarpl;
using namespace yarpl::flowable;

ChannelRequester::ChannelRequester(const ConsumerBase::Parameters& params)
: ConsumerBase(params), PublisherBase(/*initialRequestN=*/1) {}

void ChannelRequester::onSubscribe(
Reference<Subscription> subscription) noexcept {
CHECK(State::NEW == state_);
if (ConsumerBase::isTerminated()) {
subscription->cancel();
return;
}
publisherSubscribe(subscription);
// Request the first payload immediately.
subscription->request(1);
CHECK(!requested_);
publisherSubscribe(std::move(subscription));
}

void ChannelRequester::onNext(Payload request) noexcept {
switch (state_) {
case State::NEW: {
state_ = State::REQUESTED;
// FIXME: find a root cause of this asymmetry; the problem here is that
// the ConsumerBase::request might be delivered after the whole thing is
// shut down, if one uses InlineConnection.
size_t initialN = initialResponseAllowance_.drainWithLimit(
Frame_REQUEST_N::kMaxRequestN);
size_t remainingN = initialResponseAllowance_.drain();
// Send as much as possible with the initial request.
CHECK_GE(Frame_REQUEST_N::kMaxRequestN, initialN);
newStream(
StreamType::CHANNEL,
static_cast<uint32_t>(initialN),
std::move(request),
false);
// We must inform ConsumerBase about an implicit allowance we have
// requested from the remote end.
ConsumerBase::addImplicitAllowance(initialN);
// Pump the remaining allowance into the ConsumerBase _after_ sending the
// initial request.
if (remainingN) {
ConsumerBase::generateRequest(remainingN);
}
} break;
case State::REQUESTED: {
debugCheckOnNextOnError();
writePayload(std::move(request), 0);
break;
if(!requested_) {
requested_ = true;

size_t initialN = initialResponseAllowance_.drainWithLimit(
Frame_REQUEST_N::kMaxRequestN);
size_t remainingN = initialResponseAllowance_.drain();
// Send as much as possible with the initial request.
CHECK_GE(Frame_REQUEST_N::kMaxRequestN, initialN);
newStream(
StreamType::CHANNEL,
static_cast<uint32_t>(initialN),
std::move(request),
false);
// We must inform ConsumerBase about an implicit allowance we have
// requested from the remote end.
ConsumerBase::addImplicitAllowance(initialN);
// Pump the remaining allowance into the ConsumerBase _after_ sending the
// initial request.
if (remainingN) {
ConsumerBase::generateRequest(remainingN);
}
case State::CLOSED:
break;
return;
}

checkPublisherOnNext();
writePayload(std::move(request), false);
}

// TODO: consolidate code in onCompleteImpl, onErrorImpl, cancelImpl
void ChannelRequester::onComplete() noexcept {
releasePublisher();
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
closeStream(StreamCompletionSignal::COMPLETE);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
completeStream();
} break;
case State::CLOSED:
break;
if (!requested_) {
closeStream(StreamCompletionSignal::CANCEL);
return;
}
publisherComplete();
completeStream();
tryCompleteChannel();
}

void ChannelRequester::onError(const std::exception_ptr ex) noexcept {
releasePublisher();
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
closeStream(StreamCompletionSignal::APPLICATION_ERROR);
break;
case State::REQUESTED: {
applicationError(folly::exceptionStr(ex).toStdString());
} break;
case State::CLOSED:
break;
if (!requested_) {
closeStream(StreamCompletionSignal::CANCEL);
return;
}
publisherComplete();
applicationError(folly::exceptionStr(ex).toStdString());
tryCompleteChannel();
}

void ChannelRequester::request(int64_t n) noexcept {
switch (state_) {
case State::NEW:
// The initial request has not been sent out yet, hence we must accumulate
// the unsynchronised allowance, portion of which will be sent out with
// the initial request frame, and the rest will be dispatched via
// ConsumerBase:request (ultimately by sending REQUEST_N frames).
initialResponseAllowance_.release(n);
break;
case State::REQUESTED:
ConsumerBase::generateRequest(n);
break;
case State::CLOSED:
break;
if (!requested_) {
// The initial request has not been sent out yet, hence we must accumulate
// the unsynchronised allowance, portion of which will be sent out with
// the initial request frame, and the rest will be dispatched via
// ConsumerBase:request (ultimately by sending REQUEST_N frames).
initialResponseAllowance_.release(n);
return;
}
checkConsumerRequest();
ConsumerBase::generateRequest(n);
}

void ChannelRequester::cancel() noexcept {
releaseConsumer();
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
closeStream(StreamCompletionSignal::CANCEL);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
cancelStream();
} break;
case State::CLOSED:
break;
if (!requested_) {
closeStream(StreamCompletionSignal::CANCEL);
return;
}
cancelConsumer();
cancelStream();
tryCompleteChannel();
}

void ChannelRequester::endStream(StreamCompletionSignal signal) {
switch (state_) {
case State::NEW:
case State::REQUESTED:
// Spontaneous ::endStream signal messagesns an error.
DCHECK(StreamCompletionSignal::COMPLETE != signal);
DCHECK(StreamCompletionSignal::CANCEL != signal);
state_ = State::CLOSED;
break;
case State::CLOSED:
break;
}
terminatePublisher(signal);
terminatePublisher();
ConsumerBase::endStream(signal);
}

void ChannelRequester::tryCompleteChannel() {
if (publisherClosed() && consumerClosed()) {
closeStream(StreamCompletionSignal::COMPLETE);
}
}

void ChannelRequester::handlePayload(
Payload&& payload,
bool complete,
bool flagsNext) {
bool end = false;
switch (state_) {
case State::NEW:
// Cannot receive a frame before sending the initial request.
CHECK(false);
break;
case State::REQUESTED:
if (complete) {
state_ = State::CLOSED;
end = true;
}
break;
case State::CLOSED:
break;
}
bool next) {
CHECK(requested_);
processPayload(std::move(payload), next);

processPayload(std::move(payload), flagsNext);

if (end) {
closeStream(StreamCompletionSignal::COMPLETE);
if (complete) {
completeConsumer();
tryCompleteChannel();
}
}

void ChannelRequester::handleError(folly::exception_wrapper errorPayload) {
switch (state_) {
case State::NEW:
// Cannot receive a frame before sending the initial request.
CHECK(false);
break;
case State::REQUESTED:
state_ = State::CLOSED;
ConsumerBase::onError(errorPayload);
closeStream(StreamCompletionSignal::ERROR);
break;
case State::CLOSED:
break;
}
void ChannelRequester::handleError(folly::exception_wrapper ex) {
CHECK(requested_);
errorConsumer(std::move(ex));
tryCompleteChannel();
}

void ChannelRequester::handleRequestN(uint32_t n) {
CHECK(requested_);
PublisherBase::processRequestN(n);
}

void ChannelRequester::handleCancel() {
CHECK(requested_);
publisherComplete();
tryCompleteChannel();
}
} // reactivesocket
12 changes: 4 additions & 8 deletions src/statemachine/ChannelRequester.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class ChannelRequester : public ConsumerBase,
public PublisherBase,
public yarpl::flowable::Subscriber<Payload> {
public:
explicit ChannelRequester(const ConsumerBase::Parameters& params)
: ConsumerBase(params), PublisherBase(0) {}
explicit ChannelRequester(const ConsumerBase::Parameters& params);

private:
void onSubscribe(yarpl::Reference<yarpl::flowable::Subscription>
Expand All @@ -38,18 +37,15 @@ class ChannelRequester : public ConsumerBase,
void handlePayload(Payload&& payload, bool complete, bool flagsNext) override;
void handleRequestN(uint32_t n) override;
void handleError(folly::exception_wrapper errorPayload) override;
void handleCancel() override;

void endStream(StreamCompletionSignal) override;
void tryCompleteChannel();

/// State of the Channel requester.
enum class State : uint8_t {
NEW,
REQUESTED,
CLOSED,
} state_{State::NEW};
/// An allowance accumulated before the stream is initialised.
/// Remaining part of the allowance is forwarded to the ConsumerBase.
AllowanceSemaphore initialResponseAllowance_;
bool requested_{false};
};

} // reactivesocket

0 comments on commit 80aa1e6

Please sign in to comment.