Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6553c0b
Address lint warnings
vjn Apr 14, 2017
b5fc7b6
Added tests for the v/ version.
vjn Apr 18, 2017
0be928d
Added a test for ->take()
vjn Apr 20, 2017
b5e78bb
Fix open source build
lexs Apr 20, 2017
7cdceb1
Address lint warnings
vjn Apr 14, 2017
dc19683
Added tests for the v/ version.
vjn Apr 18, 2017
6702be3
Added a test for ->take()
vjn Apr 20, 2017
f63be2d
Switched CMake version, to get past travis build
vjn Apr 20, 2017
afde178
Merge branch 'flowable' of github.com:vjn/reactivesocket-cpp into flo…
vjn Apr 20, 2017
c4da7f6
Use std::size_t instead of size_t
vjn Apr 20, 2017
84cd455
Fix build error (detected in travis)
vjn Apr 21, 2017
1a92b80
Remove dependency on reactivestreams
vjn Apr 21, 2017
59772f8
Took out build-problematic typedef
vjn Apr 21, 2017
d82edfa
Moved Wrapper out of Flowable, to fix the build
vjn Apr 21, 2017
92a0e5e
Add workaround for (old) gcc bug
vjn Apr 21, 2017
74acf2c
Thread libraries for yarpl tests
vjn Apr 21, 2017
ce0513c
Merged tests, disabled failing one
vjn Apr 21, 2017
97364bb
Added yarpl/tests to travis
vjn Apr 21, 2017
ab0ebda
Enable java-client to cpp-server tck tests
Apr 21, 2017
1a21264
Address lint warnings
vjn Apr 14, 2017
2a5e8cd
Added tests for the v/ version.
vjn Apr 18, 2017
8124b52
Added a test for ->take()
vjn Apr 20, 2017
d43bf6e
Switched CMake version, to get past travis build
vjn Apr 20, 2017
8627d4a
Use std::size_t instead of size_t
vjn Apr 20, 2017
d27759f
Fix build error (detected in travis)
vjn Apr 21, 2017
2b5bd6c
Remove dependency on reactivestreams
vjn Apr 21, 2017
c459ce9
Took out build-problematic typedef
vjn Apr 21, 2017
6cb4983
Moved Wrapper out of Flowable, to fix the build
vjn Apr 21, 2017
2fdb1b7
Add workaround for (old) gcc bug
vjn Apr 21, 2017
41db3ef
Thread libraries for yarpl tests
vjn Apr 21, 2017
b192210
Merged tests, disabled failing one
vjn Apr 21, 2017
96948c3
Added yarpl/tests to travis
vjn Apr 21, 2017
7d9cedc
Merge branch 'flowable' of github.com:vjn/reactivesocket-cpp into flo…
vjn Apr 22, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ script:
- cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DCMAKE_EXE_LINKER_FLAGS="${CXX_LINKER_FLAGS}" -DMETA_CXX_STD=$CPP_VERSION
- make -j8
- ./tests
- ./experimental/yarpl/yarpl-tests
- cd ..
- ./scripts/prepare_tck_drivers.sh
- ./scripts/tck_test.sh -c cpp -s cpp
- ./scripts/tck_test.sh -c java -s java
- ./scripts/tck_test.sh -c java -s cpp
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ target_link_libraries(

add_dependencies(tcpresumeserver gmock)

# add_subdirectory(experimental/yarpl)
add_subdirectory(experimental/yarpl)

########################################
# Examples
Expand Down
10 changes: 8 additions & 2 deletions experimental/yarpl/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cmake_minimum_required (VERSION 3.4)
cmake_minimum_required (VERSION 3.2)

# To debug the project, set the build type.
set(CMAKE_BUILD_TYPE Debug)
Expand All @@ -15,14 +15,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
# Common configuration for all build modes.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-unused-parameter")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wpadded")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-weak-vtables -Wno-padded")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -momit-leaf-frame-pointer")

# Configuration for Debug build mode.
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG}")

set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG")

include_directories(${CMAKE_SOURCE_DIR})


Expand Down Expand Up @@ -83,6 +85,7 @@ add_library(
include/yarpl/v/Subscriber.h
include/yarpl/v/Subscribers.h
include/yarpl/v/Subscription.h
src/yarpl/v/Refcounted.cpp
)

target_include_directories(
Expand Down Expand Up @@ -134,12 +137,15 @@ add_executable(
test/flowable_operators/Flowable_Take_test.cpp
test/flowable_operators/Flowable_SubscribeOn_test.cpp
test/Scheduler_test.cpp
test/v/RefcountedTest.cpp
test/v/FlowableTest.cpp
)

target_link_libraries(
yarpl-tests
yarpl
${GMOCK_LIBS} # inherited from reactivesocket-cpp CMake
${CMAKE_THREAD_LIBS_INIT}
)

target_include_directories(
Expand Down
65 changes: 47 additions & 18 deletions experimental/yarpl/examples/FlowableVExamples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "FlowableVExamples.h"

#include <iostream>
#include <sstream>
#include <string>
#include <thread>

Expand All @@ -22,6 +23,12 @@ auto printer() {
2 /* low [optional] batch size for demo */);
}

std::string getThreadId() {
std::ostringstream oss;
oss << std::this_thread::get_id();
return oss.str();
}

} // namespace

void FlowableVExamples::run() {
Expand Down Expand Up @@ -56,22 +63,44 @@ void FlowableVExamples::run() {

std::cout << "take example: 3 out of 10 items" << std::endl;
Flowables::range(1, 11)->take(3)->subscribe(printer<int64_t>());
}

// ThreadScheduler scheduler;

// FlowablesC::range(1, 10)
// ->subscribeOn(scheduler)
// ->map([](auto i) {
// std::this_thread::sleep_for(std::chrono::milliseconds(400));
// return "mapped->" + std::to_string(i);
// })
// ->take(2)
// ->subscribe(Subscribers::create<std::string>([](auto t) {
// std::cout << "Value received after scheduling: " << t << std::endl;
// }));

// // wait to see above async example
// /* sleep override */
// std::this_thread::sleep_for(std::chrono::milliseconds(1300));
//}
auto flowable = Flowable<int>::create(
[total=0](Subscriber<int>& subscriber, int64_t requested) mutable {
subscriber.onNext(12345678);
subscriber.onError(std::make_exception_ptr(
std::runtime_error("error")));
return std::make_tuple(int64_t{1}, false);
}
);

auto subscriber = Subscribers::create<int>(
[](int next) {
std::cout << "@next: " << next << std::endl;
},
[](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& exception) {
std::cerr << " exception: " << exception.what() << std::endl;
} catch (...) {
std::cerr << " !unknown exception!" << std::endl;
}
},
[] {
std::cout << "Completed." << std::endl;
}
);

flowable->subscribe(subscriber);

ThreadScheduler scheduler;

std::cout << "subscribe_on example" << std::endl;
Flowables::just({ "0: ", "1: ", "2: " })
->map([](const char* p) { return std::string(p); })
->map([](std::string log) { return log + " on " + getThreadId(); })
->subscribeOn(scheduler)
->subscribe(printer<std::string>());
std::cout << " waiting on " << getThreadId() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
2 changes: 1 addition & 1 deletion experimental/yarpl/examples/yarpl-playground.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

int main() {
std::cout << "*** Run yarpl::flowable::v examples ***" << std::endl;
// FlowableVExamples::run();
FlowableVExamples::run();
// std::cout << "*** Run ObservableExamples ***" << std::endl;
// ObservableExamples::run();
// std::cout << "*** Run FlowableExamples ***" << std::endl;
Expand Down
73 changes: 43 additions & 30 deletions experimental/yarpl/include/yarpl/v/Flowable.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <type_traits>
#include <utility>

#include "reactivestreams/ReactiveStreams.h"
#include "yarpl/Scheduler.h"
#include "yarpl/utils/type_traits.h"

#include "Refcounted.h"
Expand All @@ -21,17 +21,17 @@ class Flowable : public virtual Refcounted {
static const auto CANCELED = std::numeric_limits<int64_t>::min();
static const auto NO_FLOW_CONTROL = std::numeric_limits<int64_t>::max();

using Subscriber = Subscriber<T>;

virtual void subscribe(Reference<Subscriber>) = 0;
virtual void subscribe(Reference<Subscriber<T>>) = 0;

template <typename Function>
auto map(Function&& function);

auto take(int64_t);

auto subscribeOn(Scheduler&);

/**
* Create a flowable from an emitter.
* \brief Create a flowable from an emitter.
*
* \param emitter function that is invoked to emit values to a subscriber.
* The emitter's signature is:
Expand All @@ -46,48 +46,32 @@ class Flowable : public virtual Refcounted {
*
* \return a handle to a flowable that will use the emitter.
*/
template<typename Emitter>
class EmitterWrapper;

template <
typename Emitter,
typename = typename std::enable_if<std::is_callable<
Emitter(Subscriber&, int64_t),
Emitter(Subscriber<T>&, int64_t),
std::tuple<int64_t, bool>>::value>::type>
static auto create(Emitter&& emitter);

private:
virtual std::tuple<int64_t, bool> emit(Subscriber&, int64_t) {
virtual std::tuple<int64_t, bool> emit(Subscriber<T>&, int64_t) {
return std::make_tuple(static_cast<int64_t>(0), false);
}

template <typename Emitter>
class Wrapper : public Flowable {
public:
Wrapper(Emitter&& emitter) : emitter_(std::forward<Emitter>(emitter)) {}

virtual void subscribe(Reference<Subscriber> subscriber) {
new SynchronousSubscription(this, std::move(subscriber));
}

virtual std::tuple<int64_t, bool> emit(
Subscriber& subscriber,
int64_t requested) {
return emitter_(subscriber, requested);
}

private:
Emitter emitter_;
};

/**
* Manager for a flowable subscription.
*
* This is synchronous: the emit calls are triggered within the context
* of a request(n) call.
*/
class SynchronousSubscription : public Subscription, public Subscriber {
class SynchronousSubscription : public Subscription, public Subscriber<T> {
public:
SynchronousSubscription(
Reference<Flowable> flowable,
Reference<Subscriber> subscriber)
Reference<Subscriber<T>> subscriber)
: flowable_(std::move(flowable)), subscriber_(std::move(subscriber)) {
subscriber_->onSubscribe(Reference<Subscription>(this));
}
Expand Down Expand Up @@ -208,7 +192,7 @@ class Flowable : public virtual Refcounted {
std::mutex processing_;

Reference<Flowable> flowable_;
Reference<Subscriber> subscriber_;
Reference<Subscriber<T>> subscriber_;
};
};

Expand All @@ -218,11 +202,34 @@ class Flowable : public virtual Refcounted {

namespace yarpl {

template <typename T>
template <typename Emitter>
class Flowable<T>::EmitterWrapper : public Flowable<T> {
public:
explicit EmitterWrapper(Emitter&& emitter)
: emitter_(std::forward<Emitter>(emitter)) {}

virtual void subscribe(Reference<Subscriber<T>> subscriber) {
new SynchronousSubscription(
Reference<Flowable>(this), std::move(subscriber));
}

virtual std::tuple<int64_t, bool> emit(
Subscriber<T>& subscriber,
int64_t requested) {
return emitter_(subscriber, requested);
}

private:
Emitter emitter_;
};

template <typename T>
template <typename Emitter, typename>
auto Flowable<T>::create(Emitter&& emitter) {
return Reference<Flowable<T>>(
new Flowable<T>::Wrapper<Emitter>(std::forward<Emitter>(emitter)));
new Flowable<T>::EmitterWrapper<Emitter>(
std::forward<Emitter>(emitter)));
}

template <typename T>
Expand All @@ -239,4 +246,10 @@ auto Flowable<T>::take(int64_t limit) {
new TakeOperator<T>(Reference<Flowable<T>>(this), limit));
}

template<typename T>
auto Flowable<T>::subscribeOn(Scheduler& scheduler) {
return Reference<Flowable<T>>(
new SubscribeOnOperator<T>(Reference<Flowable<T>>(this), scheduler));
}

} // yarpl
Loading