Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creating an observable from C callback functions #496

Closed
therealkenc opened this issue Dec 21, 2023 · 7 comments
Closed

creating an observable from C callback functions #496

therealkenc opened this issue Dec 21, 2023 · 7 comments
Labels
question Further information is requested

Comments

@therealkenc
Copy link

I cannot figure out the correct way to create an observable from a C style emitter with callback functions. Example shape:

typedef void (*StringCallback)(const char* s);
typedef void (*DoneCallback)();
// we don't know how this function is implemented, only that it calls onstring()
// a number of times and ondone() once, almost certainly from another thread.
void do_async_thing(StringCallback onstring, DoneCallback ondone);

I have a solution of sorts, but it is almost certainly "wrong" as it involves creating a dynamic object and cleaning up. Rather than lead with my incorrect approach, better to just ask flatly how to turn the above into an observable to which I can subscribe. The key point in the setup above is that all the "state" is maintained inside the black box which is do_async_thing(). It takes care of whatever state it needs to emit C strings, I don't. I am sure the solution is "obvious" once I see it, but I've tried at length and appear to be missing something fundamental. TIA.

@victimsnino
Copy link
Owner

victimsnino commented Dec 22, 2023

Hi!

Yeah, pretty interesting task. Due to c-style function pointer is stateless (i mean, we can't pass lambda with some capture) and it doesn't provide extra user_object (for example, void(const char* s, void* user_object)) to be used to have some state inside, there is only one possible thing - use global variables/functions. Like this:

https://rpp.godbolt.org/z/5azdnh67h

There are 2 options, not sure which one is more appropriate for your needs. First one looks much more straightforward, second one is more "reactive" (call to do_async_thing happens only after first ever subscription)

If it is not suitable solution, feel free to provide some additional details/requirements =)

@victimsnino victimsnino added the question Further information is requested label Dec 22, 2023
@therealkenc
Copy link
Author

Thank-you so much for taking a look. I can give you a void* context on the api -- I was trying to make the example simpler by omitting but I think doing that obscured rather than simplified. Using a global publish_subject is kind of like a new() with only one slot.

Here is what I have:

https://rpp.godbolt.org/z/9MxsYhGhK

Crux:

auto create_observable() {
    auto onstring = [](const void* ctx, const char* s) {
        subject_from_ctx(ctx)->get_observer().on_next(s);
    };
    auto ondone = [](const void* ctx) {
        auto subject = subject_from_ctx(ctx);
        subject->get_observer().on_completed();
        delete subject;  // <--- this I don't love
    };
    auto subject = new rpp::subjects::publish_subject<const char*>();
    do_async_thing(subject, onstring, ondone);
    return subject->get_observable();
}

It is so "awesome" that it causes clang to crash. GCC will eat it.

The crash is bad enough, but from a pattern standpoint, creating a new subject just so I can delete it feels like I am doing it wong.

@victimsnino
Copy link
Owner

victimsnino commented Dec 22, 2023

In this case you can do it even much easier
https://rpp.godbolt.org/z/jcjz4K5xP

In short: we are doing raw detached copy of original observer instead of providing extra indirection via subject

But there is difference in behavior:

  • you are starting do_async_thing for each new observer instead of "sharing" same do_async_thing for observers using same observable.
  • To achieve previous behavior you can add | rpp::operators::publish() | rpp::operators::ref_count() - actually it works in same way, but more elegant - it makes subscription to original observable during first ever subscription and then share this observable for any future subscriptions.
  • if you still need to start do_async_thing even before first ever subscription ,then you can call .connect() on result of | rpp::operators::publish (it returns connectable observable)

So, i mean this one:

const auto connectable = observable | rpp::ops::publish();
connectable.connect(); // there actually happens subscription of "nothing" to your observable and it starts do_async_thing

// then subscribe as regular
connectable.subscribe(....);

or

const auto resulting_observable = observable | rpp::ops::publish() | rpp::ops::ref_count();

resulting_observable.subscribe(....); // this one initiates do_async_thing
resulting_observable.subscribe(....); // this one just connects to previous do_async_thing

BTW: looks like clang is crashing due to using latches in unexpected way

@therealkenc
Copy link
Author

therealkenc commented Dec 22, 2023

Thank you! I can work with this.

BTW: looks like clang is crashing due to using latches in unexpected way

Interesting, I did not expect that was the problem. Is capturing not allowed (or not well formed)? Using your setup:

    std::string huh = "huh";
    create_observable()
    | rpp::ops::as_blocking()
    | rpp::ops::subscribe(
        [](const std::string& s) { 
            std::cout << "got: " << s << std::endl; 
        },
        [=]() {
            std::cout << huh << " completed" << std::endl;
        });
    std::cout << "and we're done" << std::endl;

This takes down clang too, though I can't say I understand why.

@victimsnino
Copy link
Owner

Yeah, looks interesting.. have to check it later

@victimsnino
Copy link
Owner

Made attemp to fix.

@victimsnino
Copy link
Owner

Fixed on godbolt
https://rpp.godbolt.org/z/5jrGcq4os

(compilation results of exactly same source are cached, so, have to modify it in anyway - atleast add space at any place)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants