Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ script:
- cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DCMAKE_EXE_LINKER_FLAGS="${CXX_LINKER_FLAGS}" -DMETA_CXX_STD=$CPP_VERSION
- make -j8
- ./tests
- ./experimental/yarpl/yarpl-tests
- cd ..
- ./scripts/prepare_tck_drivers.sh
- ./scripts/tck_test.sh -c cpp -s cpp
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ target_link_libraries(

add_dependencies(tcpresumeserver gmock)

# add_subdirectory(experimental/yarpl)
add_subdirectory(experimental/yarpl)

########################################
# Examples
Expand Down
10 changes: 8 additions & 2 deletions experimental/yarpl/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required (VERSION 3.4)
cmake_minimum_required (VERSION 3.2)

# To debug the project, set the build type.
set(CMAKE_BUILD_TYPE Debug)
Expand All @@ -15,14 +15,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
# Common configuration for all build modes.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-unused-parameter")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wpadded")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wno-padded")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -momit-leaf-frame-pointer")

# Configuration for Debug build mode.
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG}")

set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG")

include_directories(${CMAKE_SOURCE_DIR})


Expand Down Expand Up @@ -83,6 +85,7 @@ add_library(
include/yarpl/v/Subscriber.h
include/yarpl/v/Subscribers.h
include/yarpl/v/Subscription.h
src/yarpl/v/Refcounted.cpp
)

target_include_directories(
Expand Down Expand Up @@ -134,12 +137,15 @@ add_executable(
test/flowable_operators/Flowable_Take_test.cpp
test/flowable_operators/Flowable_SubscribeOn_test.cpp
test/Scheduler_test.cpp
test/v/RefcountedTest.cpp
test/v/FlowableTest.cpp
)

target_link_libraries(
yarpl-tests
yarpl
${GMOCK_LIBS} # inherited from reactivesocket-cpp CMake
${CMAKE_THREAD_LIBS_INIT}
)

target_include_directories(
Expand Down
65 changes: 47 additions & 18 deletions experimental/yarpl/examples/FlowableVExamples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "FlowableVExamples.h"

#include <iostream>
#include <sstream>
#include <string>
#include <thread>

Expand All @@ -22,6 +23,12 @@ auto printer() {
2 /* low [optional] batch size for demo */);
}

std::string getThreadId() {
std::ostringstream oss;
oss << std::this_thread::get_id();
return oss.str();
}

} // namespace

void FlowableVExamples::run() {
Expand Down Expand Up @@ -56,22 +63,44 @@ void FlowableVExamples::run() {

std::cout << "take example: 3 out of 10 items" << std::endl;
Flowables::range(1, 11)->take(3)->subscribe(printer<int64_t>());
}

// ThreadScheduler scheduler;

// FlowablesC::range(1, 10)
// ->subscribeOn(scheduler)
// ->map([](auto i) {
// std::this_thread::sleep_for(std::chrono::milliseconds(400));
// return "mapped->" + std::to_string(i);
// })
// ->take(2)
// ->subscribe(Subscribers::create<std::string>([](auto t) {
// std::cout << "Value received after scheduling: " << t << std::endl;
// }));

// // wait to see above async example
// /* sleep override */
// std::this_thread::sleep_for(std::chrono::milliseconds(1300));
//}
auto flowable = Flowable<int>::create(
[total=0](Subscriber<int>& subscriber, int64_t requested) mutable {
subscriber.onNext(12345678);
subscriber.onError(std::make_exception_ptr(
std::runtime_error("error")));
return std::make_tuple(int64_t{1}, false);
}
);

auto subscriber = Subscribers::create<int>(
[](int next) {
std::cout << "@next: " << next << std::endl;
},
[](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& exception) {
std::cerr << " exception: " << exception.what() << std::endl;
} catch (...) {
std::cerr << " !unknown exception!" << std::endl;
}
},
[] {
std::cout << "Completed." << std::endl;
}
);

flowable->subscribe(subscriber);

ThreadScheduler scheduler;

std::cout << "subscribe_on example" << std::endl;
Flowables::just({ "0: ", "1: ", "2: " })
->map([](const char* p) { return std::string(p); })
->map([](std::string log) { return log + " on " + getThreadId(); })
->subscribeOn(scheduler)
->subscribe(printer<std::string>());
std::cout << " waiting on " << getThreadId() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
2 changes: 1 addition & 1 deletion experimental/yarpl/examples/yarpl-playground.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

int main() {
std::cout << "*** Run yarpl::flowable::v examples ***" << std::endl;
// FlowableVExamples::run();
FlowableVExamples::run();
// std::cout << "*** Run ObservableExamples ***" << std::endl;
// ObservableExamples::run();
// std::cout << "*** Run FlowableExamples ***" << std::endl;
Expand Down
73 changes: 43 additions & 30 deletions experimental/yarpl/include/yarpl/v/Flowable.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <type_traits>
#include <utility>

#include "reactivestreams/ReactiveStreams.h"
#include "yarpl/Scheduler.h"
#include "yarpl/utils/type_traits.h"

#include "Refcounted.h"
Expand All @@ -21,17 +21,17 @@ class Flowable : public virtual Refcounted {
static const auto CANCELED = std::numeric_limits<int64_t>::min();
static const auto NO_FLOW_CONTROL = std::numeric_limits<int64_t>::max();

using Subscriber = Subscriber<T>;

virtual void subscribe(Reference<Subscriber>) = 0;
virtual void subscribe(Reference<Subscriber<T>>) = 0;

template <typename Function>
auto map(Function&& function);

auto take(int64_t);

auto subscribeOn(Scheduler&);

/**
* Create a flowable from an emitter.
* \brief Create a flowable from an emitter.
*
* \param emitter function that is invoked to emit values to a subscriber.
* The emitter's signature is:
Expand All @@ -46,48 +46,32 @@ class Flowable : public virtual Refcounted {
*
* \return a handle to a flowable that will use the emitter.
*/
template<typename Emitter>
class EmitterWrapper;

template <
typename Emitter,
typename = typename std::enable_if<std::is_callable<
Emitter(Subscriber&, int64_t),
Emitter(Subscriber<T>&, int64_t),
std::tuple<int64_t, bool>>::value>::type>
static auto create(Emitter&& emitter);

private:
virtual std::tuple<int64_t, bool> emit(Subscriber&, int64_t) {
virtual std::tuple<int64_t, bool> emit(Subscriber<T>&, int64_t) {
return std::make_tuple(static_cast<int64_t>(0), false);
}

template <typename Emitter>
class Wrapper : public Flowable {
public:
Wrapper(Emitter&& emitter) : emitter_(std::forward<Emitter>(emitter)) {}

virtual void subscribe(Reference<Subscriber> subscriber) {
new SynchronousSubscription(this, std::move(subscriber));
}

virtual std::tuple<int64_t, bool> emit(
Subscriber& subscriber,
int64_t requested) {
return emitter_(subscriber, requested);
}

private:
Emitter emitter_;
};

/**
* Manager for a flowable subscription.
*
* This is synchronous: the emit calls are triggered within the context
* of a request(n) call.
*/
class SynchronousSubscription : public Subscription, public Subscriber {
class SynchronousSubscription : public Subscription, public Subscriber<T> {
public:
SynchronousSubscription(
Reference<Flowable> flowable,
Reference<Subscriber> subscriber)
Reference<Subscriber<T>> subscriber)
: flowable_(std::move(flowable)), subscriber_(std::move(subscriber)) {
subscriber_->onSubscribe(Reference<Subscription>(this));
}
Expand Down Expand Up @@ -208,7 +192,7 @@ class Flowable : public virtual Refcounted {
std::mutex processing_;

Reference<Flowable> flowable_;
Reference<Subscriber> subscriber_;
Reference<Subscriber<T>> subscriber_;
};
};

Expand All @@ -218,11 +202,34 @@ class Flowable : public virtual Refcounted {

namespace yarpl {

template <typename T>
template <typename Emitter>
class Flowable<T>::EmitterWrapper : public Flowable<T> {
public:
explicit EmitterWrapper(Emitter&& emitter)
: emitter_(std::forward<Emitter>(emitter)) {}

virtual void subscribe(Reference<Subscriber<T>> subscriber) {
new SynchronousSubscription(
Reference<Flowable>(this), std::move(subscriber));
}

virtual std::tuple<int64_t, bool> emit(
Subscriber<T>& subscriber,
int64_t requested) {
return emitter_(subscriber, requested);
}

private:
Emitter emitter_;
};

template <typename T>
template <typename Emitter, typename>
auto Flowable<T>::create(Emitter&& emitter) {
return Reference<Flowable<T>>(
new Flowable<T>::Wrapper<Emitter>(std::forward<Emitter>(emitter)));
new Flowable<T>::EmitterWrapper<Emitter>(
std::forward<Emitter>(emitter)));
}

template <typename T>
Expand All @@ -239,4 +246,10 @@ auto Flowable<T>::take(int64_t limit) {
new TakeOperator<T>(Reference<Flowable<T>>(this), limit));
}

template<typename T>
auto Flowable<T>::subscribeOn(Scheduler& scheduler) {
return Reference<Flowable<T>>(
new SubscribeOnOperator<T>(Reference<Flowable<T>>(this), scheduler));
}

} // yarpl
Loading