Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CReate a dynamic stream (Observable) #105

Closed
lmouline opened this issue Mar 24, 2015 · 5 comments
Closed

CReate a dynamic stream (Observable) #105

lmouline opened this issue Mar 24, 2015 · 5 comments

Comments

@lmouline
Copy link

Hello,

I would like to implement a simple example of CEP using your library. My example is the following : I have two streams (Observables). When I press the 'a' key, an event/message is created in one stream and when I press 'g' an event is created in the other stream. Then I would like to join this two streams and do something.

However, I don't know how to create an 'infinite' observable with your library. I think that I have to use the method rxcpp::observable<>::create(). But, I have no idea about which parameter I should use.

I have already implemented this example in JAVA : the parameter is an "Action1".

Is it the same thing in C++? Do you have any idea, or example, to help me?

Thank you in advance for your help,

Best regards,

kirkshoop pushed a commit to kirkshoop/RxCpp that referenced this issue Mar 24, 2015
@kirkshoop
Copy link
Member

I added a sample to show how this would look using std::cin. For other UI event systems create would add a keyboard event handler that would call dest::on_next().

@lmouline
Copy link
Author

Thank you! It works!
Actually it is the same thing in JAVA :)
If you want, I can send you my whole example when I finish.

@lmouline
Copy link
Author

Just another question : I am trying to use window_with_time method. But, I don't know which parameter I should use for the "Coordination".

I see that you use the same template name in the merge method, and you put an observable as parameter, it doesn't work with window_with_time method.

When I read your test code, I thing that the "Coordination" parameter should be a worker or a scheduler. However, I don't know how get this from observable.

Thank you again for your help.

(It's very hard to use your API without documentation for a non C++ expert :/)

@kirkshoop
Copy link
Member

Yes, documentation is needed.
The first step would be to get the infrastructure set up. Help selecting a doc generator and setting up the CMake to do the generation would be most welcome.

Coordinations allow the operators to delegate coordination of events from different threads. This way each operator can focus on the algorithm. This is the major concept in rxcpp that differs from the Rx implementations in other languages. I found the idea of having UI events from the same thread merged using atomic and mutex primitives was distasteful in C++.

The perf tests show ways to use Coordinations. group_by uses them in different ways to calculate pi.

coordination strategies

  • The observe_on_... strategy will use a queue to move calls from input observers on any thread to calls on another observer on the thread selected for output by the '. . .' part of the name. The output observer calls will always be from the same thread.
  • The serialize_... strategy will use a mutex to serialize calls from input observers on any thread to the output observer. The output observer calls will be from the thread calling the input, but not overlapped.

thread allocation strategies

  • The ..._new_thread strategy will create a new thread for each instance of the operator.
  • The ..._event_loop strategy will select one thread from a limited pool for each instance of the operator.
  • The ..._one_worker strategy shares one coordination strategy among multiple operators and multiple instances of an operator.

Examples:

Combining a coordination strategy with a thread allocation strategy results in a Coordination.

. . .
  merge(rxcpp::observe_on_new_thread()).
. . .

Or

. . .
  observe_on(rxcpp::serialize_new_thread()).
. . .

Or

. . .
  subscribe_on(rxcpp::observe_on_event_loop()).
. . .

@lmouline
Copy link
Author

lmouline commented Apr 2, 2015

Thank you for your help! It's work, again.

About the documentation, I could try to find a doc generator next week.

I close this issue.

@lmouline lmouline closed this as completed Apr 2, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants