Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Jun 13, 2024
1 parent bf7348e commit 4086b96
Showing 1 changed file with 44 additions and 50 deletions.
94 changes: 44 additions & 50 deletions src/tests/rppgrpc/test_async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/map.hpp>
#include <rpp/operators/observe_on.hpp>
#include <rpp/schedulers/new_thread.hpp>
#include <rpp/subjects/publish_subject.hpp>

#include <grpc++/server_builder.h>
Expand Down Expand Up @@ -52,7 +54,7 @@ TEST_CASE("async client reactor")
grpc::ClientContext ctx{};

const auto bidi_reactor = new rppgrpc::client_bidi_reactor<Request, Response>();
bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::subscribe(out_mock);
bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock);
subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(bidi_reactor->get_observer());


Expand Down Expand Up @@ -121,63 +123,55 @@ TEST_CASE("async client reactor")
}
SECTION("client-side read + completion")
{
const auto initial_call = NAMED_REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_))
.RETURN(grpc::Status::OK)
.LR_SIDE_EFFECT({
Response response{};
std::cout << std::this_thread::get_id() << __LINE__ << std::endl;
REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_))
.RETURN(grpc::Status::OK)
.LR_SIDE_EFFECT({
Response response{};

for (int v : {1, 2, 3})
{
response.set_value(v);
_2->Write(response);
}
});
bidi_reactor->init();

REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s);
REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s);
const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s);

wait(completed);
}
SECTION("client-side read-write + completeion")
{
REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_))
.RETURN(grpc::Status::OK)
.LR_SIDE_EFFECT({
Request request{};
while (_2->Read(&request))
{
Response response{};
response.set_value(request.value() * 10);
_2->Write(response);
}
});

for (int v : {1, 2, 3})
{
std::cout << std::this_thread::get_id() << __LINE__ << std::endl;
response.set_value(v);
_2->Write(response);
}
std::cout << std::this_thread::get_id() << __LINE__ << std::endl;
});
std::cout << std::this_thread::get_id() << __LINE__ << std::endl;
bidi_reactor->init();

std::thread{[&] {
REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s);
REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s);
const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s);
REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s);
subj.get_observer().on_next(1);

REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s);
subj.get_observer().on_next(2);

std::cout << std::this_thread::get_id() << __LINE__ << std::endl;
const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();

wait(initial_call);
}}.join();
wait(completed);
}
// SECTION("successful read-write")
// {
// REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_))
// .RETURN(grpc::Status::OK)
// .LR_SIDE_EFFECT({
// Request request{};
// while(_2->Read(&request)) {
// Response response{};
// response.set_value(request.value()*10);
// _2->Write(response);
// }
// });

// bidi_reactor->init();

// REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s);
// subj.get_observer().on_next(1);

// REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s);
// subj.get_observer().on_next(2);

// const auto completed = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s);
// subj.get_observer().on_completed();

// wait(completed);
// }
}


// auto validate_write = [&](auto& stream_mock, auto*& reactor) {
// SECTION("write to stream")
// {
Expand Down

0 comments on commit 4086b96

Please sign in to comment.