Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: monadic interface and CompletableFuture compatibility of Future type #2105

Closed
jongwook opened this issue Jan 9, 2014 · 30 comments · Fixed by #11607
Closed
Labels

Comments

@jongwook
Copy link

jongwook commented Jan 9, 2014

As discussed in the groups post, monadic interfaces of future type will make it easier to compose asynchronous operations.

Monad is a concept in functional programming, which can be roughly understood as a generic type that adds a behavior on an existing type and supports transformation methods like map, flatMap, etc. (disclaimer: not at all a precise definition).

When Future type is monadic, those methods allow us to combine Futures to build more complex asynchronous operations. Scala Future is an example of a monadic interface. JDK8's CompletableFuture has similar methods (thenApply ≈ map(), thenCompose ≈ flatMap(), etc.)

So it is desirable for Netty Future to have various composing functionality like the above examples. Some tricks might be needed since Netty 5 will still be JDK6 compatible but it'd be nice to make Netty Future compatible with JDK8 CompletableFuture or CompletionStage

As a possible next step, Monadic interfaces for collection types such as ChannelGroup will provide more convenient facilities for use with asynchronous operations. For example, using RxJava's Observable API,

channels.flatMap(Channel::close)
    .doOnNext(done -> logger.info("just closed a channel"))
    .subscribe( ... )

will close all channels in a ChannelGroup while logging each time when a channel is closed.

For this we also need a "collection of futures" monad type. This might complicate the util.concurrent package but it will make various combination/filtering/transforming scenarios possible as seen in RxJava Observable.

@trustin
Copy link
Member

trustin commented Jan 10, 2014

Well put. Are you interested in working on this, @jongwook ?

@jongwook
Copy link
Author

I don't have any experience of OSS contribution yet, but it looks like a good place to start.
Let me give it a shot on this. Thanks!

@trustin
Copy link
Member

trustin commented Jan 14, 2014

No worries, @jongwook. Please feel free to add me to your Google Talk / Kakao / LINE contact list and sync up with me.

@normanmaurer
Copy link
Member

Same here… if you feel like you need help feel free to reach out.

-- 
Norman Maurer

An 14. Januar 2014 at 07:32:41, Trustin Lee (notifications@github.com) schrieb:

No worries, @jongwook. Please feel free to add me to your Google Talk / Kakao / LINE contact list and sync up with me.


Reply to this email directly or view it on GitHub.

@jongwook
Copy link
Author

Well... like I explained in the email I sent to Trustin, I figured that this task is closely linked to the core concurrency architecture of Netty, so resolving this issue would require significant changes on it and is beyond my current capabilities.
Sorry about my hastiness and I hope I'll be able to participate in other issues in the future.

@trustin
Copy link
Member

trustin commented Feb 11, 2014

No worries JW

@Scottmitch Scottmitch modified the milestones: 5.0.0.Alpha3, 5.0.0.Alpha2 Sep 23, 2014
@chrisprobst
Copy link
Contributor

How actively is this topic considered for Netty 5 ?
Obviously, Java 8 provides interesting capabilities (CompletionStage...) which somehow replace certain things in Netty (Almost futures in general).

Or to put it another way: Will Netty 5 be based on Java 8 ?

@trustin
Copy link
Member

trustin commented Aug 29, 2015

@chrisprobst It's in the back burner, but it would be awesome if someone leads this effort.

@trustin
Copy link
Member

trustin commented Aug 29, 2015

We are not sure about Java 8 requirement of Netty 5 yet, but we can at least make sure it works well with lambda expressions and it adopts all the goodies introduced in Java 8 such as CompletionStage and more, while ensuring future API compatibility.

@trustin trustin modified the milestone: 5.0.0.Alpha3 Mar 7, 2016
@jimmiebfulton
Copy link

I'm currently building a fairly sophisticated messaging system (Kafka-esque) with CompletableFuture as part of the interface. Nothing fancy. My protocol includes a correlationId that gets stored in a map with a ComletableFuture. The future gets completed on the return response. Works quite nicely. I didn't want to expose a 3rd-party library as part of it's public API, and I want the API to feel fairly modern, so I translate between Netty and CompletableFuture for sends and such. What's the current thinking for Netty 5 regarding Java 8? I'm willing to prototype/get the ball rolling, but I want to make sure that my efforts aren't wasted if this isn't a goal of the project. I realize that there would be some breaking API changes, but that appears to be the case for Netty 5, regardless. Would it make sense to bundle the breaking API changes into the next major release? I'd also be interested in being part of the efforts to get Netty 5 across the finish line, as well. Say the word, and I'll get crackin'. ;-)

@Scottmitch
Copy link
Member

What's the current thinking for Netty 5 regarding Java 8

The master branch (and what was Netty 5) has been deprecated and is no longer supported. See #4466

@jimmiebfulton
Copy link

jimmiebfulton commented May 11, 2016

Oh, wow. Sorry about that. Thanks for the response, @Scottmitch. I understand the motivation for removing the branch as a maintained version, and I've wondered myself how onerous it was maintaining that branch long term. At the same time, I was really looking forward to the simplified handler hierarchy, as well as other small fixes. I've been interested in Netty for a quite some time, but never had a project that required it. It also looked like a steep-ish learning curve for someone peeking in casually from time to time. Now that I'm employing it seriously in a project, it's actually rather straight-forward. However, it does feel like the learning curve need not appear so steep. It's the small things. Simplified handler hierarchy, smoothing out the rough edges such as channelRead0(), etc, would go a long way.

I took a cursory dive into the code yesterday, doing some rough sketching by duplicating methods that take ChannelPromise and reworking them with CompletableFuture variants. I haven't explored the implications on the threading model (and realize there are devils in those details), but strictly from a high level view, the APIs and usage are largely compatible. The APIs/functionality are quite similar, in fact. I also noticed opportunities for simplifying aspects taken care of by CompletableFuture.

One observation: Some of the various implementations of promises/futures are designed as "carriers" of variables, vs returning the results of completed work. An example: Its seems that overwhelmingly, the chief use of ChannelFuture#channel() is to get access to the channel after a bind, and that most other usages already have the channel available by other means, anyway. Now that I've become familiar with CompletableFuture, it now strikes me as odd that I can "get" the channel before the work is completed during bind(). Removing #channel() might simplify things quite a bit.

In a CompletableFuture world:

ServerBootstrap#bind() returns CompletableFuture<Channel>.

bootstrap.bind().thenAccept(channel -> {
    // Channel is only available when bind is complete.
    // Less confusion to new users about the contract of #bind().  
    // The channel is available when it's available, and lessens opportunities for mistakes early on in the learning process.
});

Or:

Channel channel = bootstrap.bind().join();

A challenge would be on how to address ChannelProgressiveFuture, which not only carries state, but variable state. Not sure how this would be addressed, yet, but perhaps another mechanism might be employed for what seems like a usecase-specific type. Some type of listener that can be registered on the specific handlers that require this? Dunno.

Otherwise, most of the futures appear to be used for notifications that work has been completed, and would be rather straight-forward to replace with CompletableFuture<Void>.

As indicated in my first post, there'd be no way around API breakage without really cluttering the API. I was hoping that Netty 5 might be the "modernize and/or make breaking API changes as needed" release. Is that door shut? Is there a path forward for Java 8? As we move increasingly to an asynchronous programming world, it seems there might be opportunity for synergy with other async frameworks/code that employ the same future model, and I don't see a stronger/more natural candidate than CompletableFuture at this time.

I'm still fairly newish to Netty, so forgive me if my assessments are off base and/or naive.

Thanks!

@jimmiebfulton
Copy link

jimmiebfulton commented May 12, 2016

And just playing around... as a transition/refactoring strategy? This allows the outward-facing APIs to be introduced before reworking internals...

AbstractBootstrap:

public CompletableFuture<Channel> bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    CompletableFuture<Channel> future = new CompletableFuture<>();
    doBind(localAddress).addListener((ChannelFuture f) -> {
        if (f.isCancelled()) {
            future.cancel(false);
        } else if (f.cause() != null) {
            future.completeExceptionally(f.cause());
        } else {
            future.complete(f.channel());
        }
    });
    return future;
}

@Test
public void testLateRegisterSuccess() throws Exception {
    TestEventLoopGroup group = new TestEventLoopGroup();
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group);
        bootstrap.channel(LocalServerChannel.class);
        bootstrap.childHandler(new DummyHandler());
        bootstrap.localAddress(new LocalAddress("1"));
        CompletableFuture<Channel> future = bootstrap.bind();
        assertFalse(future.isDone());
        group.promise.setSuccess();
        final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
        future.thenAccept(channel -> {
            queue.add(channel.eventLoop().inEventLoop(Thread.currentThread()));
            queue.add(future.isDone());
        });
        assertTrue(queue.take());
        assertTrue(queue.take());
    } finally {
        group.shutdownGracefully();
        group.terminationFuture().sync();
    }
}

(Passes)

@Scottmitch
Copy link
Member

I was really looking forward to the simplified handler hierarchy,

Yes we were also bummed to give up some of these improvements, but in the end we came to the conclusion that the improvements did justify maintaining a new major version.

methods that take ChannelPromise and reworking them with CompletableFuture variants

The threading characteristics between ChannelPromise and CompletableFuture in general are quite different. Also the order in which listeners are notified is also different (ChannelPromise = fifo, CompletableFuture = lifo when not complete, similar to fifo when complete). The API of CompletableFuture (and CompletionStage) is also very broad and exposes methods which in my experience can be problematic for users (missing exceptions, etc...).

An example: Its seems that overwhelmingly, the chief use of ChannelFuture#channel() is to get access to the channel after a bind

More analysis would be needed but it is possible we could get rid of this interface once we have a release which allows API breaking changes. We can consider this as a feature for the next major release.

@dimitarg
Copy link

I don't think that Netty needs to implement CompletionStage in order to provide map, bind, and unit. As long as the combinators are there, as a user I personally don't care at all if they are the JDK ones.
Maybe it makes sense to split the issue in two.

I translate between Netty and *** some monad-like async abstraction *** for sends and such

Such wrappers always create GC pressure though. Nice to have it in Netty - no extra objects needed.

@dimitarg
Copy link

dimitarg commented Nov 25, 2016

[EDIT this code is incorrect, disregard the comment]

Something I drafted up really quickly, might help as a very rough idea to whoever might take up implementing the canonical monadic operations for netty.Future:
https://gist.github.com/dimitarg/0d14f4b28719a8e5138198de9a551656

Not sure if that's the threading semantics one would always want, but at least this way the user has full control. Can be bound to the event loop etc. I guess the intended use is to create only one such obejct per executor.

@pschichtel
Copy link

What about a separate module that provides conversion methods?

Something like this could be in there:

public static <T> CompletableFuture<T> toJava(Future<T> nettyFuture) {
    final CompletableFuture<T> future = new CompletableFuture<>();

    nettyFuture.addListener((Future<T> f) -> {
        if (f.isSuccess()) {
            future.complete(f.getNow());
        } else {
            future.completeExceptionally(f.cause());
        }
    });

    return future;
}

@slandelle
Copy link
Contributor

@pschichtel IMHO, such code is not completely right (I do similar things in AHC API and I'm considering fixing it). If you return a CompletableFuture, people might expect cancel to work, while there's no way it would cancel the underlying Netty future. Returning CompletionStage instead would work, then.

@dimitarg
Copy link

dimitarg commented Jul 11, 2017

The adapter / wrapper is the general approach most netty-based frameworks take, but it has a couple of problems

  • The threading semantics are hard to get right since there's a mismatch between the two models
  • Unnecessary object allocation
  • In some cases unnecessary context switching.

It will be awesome if one can provide a module with static methods map(), flatMap(), unit() / fork() for netty futures; then netty.concurrent can be used as a general purpose implementation that is also compositional.

Going further down the line, the Scala libraries based on Netty can then provide Functor, Applicative and Monad typeclass instances directly for Netty Future by simply reusing the above definitions.

I tried to play around with this idea and what stopped me is I realised I don't really understand how netty's Future exactly works. Is there a documentation on the threading semantics - i.e. what happens when on which thread? Or do I best reverse-engineer it with some tests?

I will also once again reiterate that providing a monadic interface to netty futures, and integration with CompletableFuture are two completely separate aspects. My personal conviction is that the latter aspect brings little value. That is mostly because the CompletableFuture API is too huge and convoluted, and hopelessly uncanonical.

@normanmaurer
Copy link
Member

@dimitarg actually it is quite easy... All the notifications happens in the EventExecutor thread that is used by the Future.

@johnou
Copy link
Contributor

johnou commented Jul 17, 2017

@slandelle please give me a heads up if / when you change that, we use it extensively in our project.

The important thing to remember when using conversion methods like in Async Http Client is that the thread the cf returns on may be one of the Netty event loop threads therefore the thenXAsync methods come in handy if you are required to perform blocking io with the response.

@dimitarg
Copy link

dimitarg commented Aug 3, 2017

@normanmaurer Thanks! It is as you say, and that is a good approach. The user should fork explicitly if they so desire.

Well, then, the monadic interface should follow the exact same semantics. Here is an initial implementation:

public class FutureMonad
{
    public static <A, B> Future<B> map(Future<A> fut, Function<A, B> f)
    {
        Promise<B> result = ImmediateEventExecutor.INSTANCE.newPromise();
        failOrEffect(fut, result, () ->result.setSuccess(f.apply(fut.getNow())));
        return result;
    }


    public static <A,B> Future<B> flatMap(Future<A> fut, Function<A, Future<B>> f)
    {
        Promise<B> promise = ImmediateEventExecutor.INSTANCE.newPromise();
        failOrEffect(fut, promise, () -> {
            Future<B> fut2 = f.apply(fut.getNow());
            failOrEffect(fut2, promise, () -> promise.setSuccess(fut2.getNow()));
        });

        return promise;
    }

    public static <A> Future<A> unit(A a)
    {
        return ImmediateEventExecutor.INSTANCE.newSucceededFuture(a);
    }

    public static <A> Future<A> fork(Future<A> fut, EventExecutor ex)
    {
        Promise<A> result = ex.newPromise();
        failOrEffect(fut, result, () -> result.setSuccess(fut.getNow()));
        return result;
    }

    public static <A> Future<A> fork(A a, EventExecutor ex)
    {
        return ex.newSucceededFuture(a);
    }

    public static <A> Future<A> fork(Supplier<A> f, EventExecutor ex)
    {
        return ex.submit(() -> f.get());
    }


    private static void failOrEffect(Future<?> upstream, Promise<?> promise, Effect f)
    {
        upstream.addListener(x ->
        {
            if (upstream.isCancelled())
            {
                promise.cancel(false);
            }
            else if (upstream.cause() != null)
            {
                promise.setFailure(x.cause());
            }
            else
            {
                f.run();
            }
        });
    }


    interface Effect
    {
        void run();
    }
}

Let's put it to use:

package playground;

import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;

import java.util.concurrent.Callable;

import static playground.FutureMonad.*;

public class Main
{
    public static void main(String[] args) throws InterruptedException
    {
        DefaultEventExecutor mainEx = new DefaultEventExecutor(new DefaultThreadFactory("single-producer"));
        DefaultEventExecutorGroup userEx = new DefaultEventExecutorGroup(10,
                new DefaultThreadFactory("first-user-group")
        );

        DefaultEventExecutorGroup userEx2 = new DefaultEventExecutorGroup(10,
                new DefaultThreadFactory("second-user-group"));


        Future<Integer> upstream = mainEx.submit(logItAndValue(3, "producer in mainEx"));

        Future<Integer> mappedMain = map(upstream, result -> {
            logCurrThread("map for mainEx");
            return result;
        });

        Future<Integer> forked = fork(mappedMain, userEx.next());

        map(forked, val -> {
            logCurrThread("map of forked");
            return val;
        });

        Future<Integer> userBlocking = userEx.submit(() -> simulateBlocking(42, 200));

        Future<Integer> upstreamFlatmapBlocking = flatMap(upstream, x -> userBlocking);

        map(upstreamFlatmapBlocking, result -> {
            logCurrThread("map of upstream flatMapped with blocking");
            return result;
        });

        Future<Integer> forkedUser2 = fork(() -> simulateBlocking(42, 100), userEx2.next());

        Future<Integer> backUser1 = flatMap(forkedUser2, res -> fork(() -> simulateBlocking(res, 100), userEx.next()));

        map(backUser1, res -> {
            logCurrThread("mapping forked user 2 forked back to user 1");
            return res;
        });

        Future<Integer> blabla = flatMap(backUser1, res -> unit(res));

        map(blabla, x-> {
            logCurrThread("map after flatMap with unit");
            return "ok";
        });

        mainEx.shutdownGracefully().await();
        userEx.shutdownGracefully().await();
        userEx2.shutdownGracefully().await();
    }


    private static  <A> Callable<A> logItAndValue(A value, String msg)
    {
        return () -> {
            logCurrThread(msg);
            return value;
        };
    }

    private static <A> A simulateBlocking(A val, long sleep)
    {
        try
        {
            logCurrThread("blocking stuff going on");
            Thread.sleep(sleep);
            return val;
        }
        catch (InterruptedException e)
        {
            throw new RuntimeException(e);
        }

    }

    private static void logCurrThread(String message)
    {
        System.out.println(Thread.currentThread().getName()  + " : " + message);
    }


}

This outputs (ordering aside):

single-producer-1-1 : producer in mainEx
single-producer-1-1 : map for mainEx
first-user-group-3-1 : map of forked
first-user-group-3-2 : blocking stuff going on
second-user-group-4-1 : blocking stuff going on
first-user-group-3-3 : blocking stuff going on
first-user-group-3-2 : map of upstream flatMapped with blocking
first-user-group-3-3 : mapping forked user 2 forked back to user 1
first-user-group-3-3 : map after flatMap with unit

Nothing surprising here. Simple semantics - thread context never changes unless user states explicitly, either via fork, or via returning a Future spawned from a new executor via flatMap.

I'm pretty happy with this. I will write some tests and probably put this in production next week. Right now we are using Guava for this and I'd really like to remove it because it takes up 3mb in my distro :)

Can you comment if this code seems correct, at first glance?

Do you think this is worthy of living in Netty common, or somewhere else in Netty? If so I will be happy to send a PR.

I think it is quite common nowadays to want to do this sort of stuff in your app layer. Might be useful.

@johnou
Copy link
Contributor

johnou commented Aug 3, 2017

Future is awful, requires the user to continually poll for when it is ready.

@dimitarg
Copy link

dimitarg commented Aug 3, 2017

This is io.netty.util.concurrent.Future ?

@slandelle
Copy link
Contributor

The awfully complex thing with Future composition is cancellation.
Lightbend people discarded the problem and dropped cancel from Scala Future and use CompletionStage from their Java APIs.
IIRC, Finagle Future has composite cancel support, might be worth getting Twitter people in the loop.

@dimitarg
Copy link

dimitarg commented Aug 3, 2017

I've updated the failOrEffect implementation to handle cancellation in a more sensible manner. If the upstream future is cancelled, the result returned by map and flatMap will be cancelled, too.

    public static void main(String[] args) throws InterruptedException
    {
        DefaultEventExecutorGroup userEx = new DefaultEventExecutorGroup(10,
                new DefaultThreadFactory("first-user-group")
        );

        Future<Integer> willBeCancelled = userEx.next().newPromise();
        Future<Integer> mapped = map(willBeCancelled, x -> x.hashCode());



        System.out.println("able to cancel:" + willBeCancelled.cancel(true));
        mapped.await();

        System.out.println("mapped cancelled: " + mapped.isCancelled());
        System.out.println("mapped cause: " + mapped.cause());
        System.out.println("mapped done: " + mapped.isDone());
        System.out.println("mapped result: " + mapped.getNow());




        userEx.shutdownGracefully().await();
    }

This outputs:

able to cancel:true
mapped cancelled: true
mapped cause: java.util.concurrent.CancellationException
mapped done: true
mapped result: null

@dimitarg
Copy link

dimitarg commented Aug 3, 2017

@slandelle I'm not sure I completely understand your comment. Just to reiterate, what at least I am aiming for here, is monadic interface for n.u.c.Future, and not a complete overhaul of the netty concurrency facilities. That's way past my capabilities, and, I think, the scope of the issue.

@dimitarg
Copy link

dimitarg commented Aug 10, 2017

I've started working on the approach I described above in this repo:

https://github.com/novarto-oss/netty-future-monad

What I have so far is the monad implementation, as well as property tests that check the implementation obeys left identity, right identity and associativity.

What is coming up is JMH benches that show how this performs compared to an identical approach with guava futures, and java std CompletableFuture.

If the numbers turn out acceptable, I'll document and publish to artifactory / bintray.

chrisvest added a commit that referenced this issue Aug 26, 2021
…1607)

Motivation:
Making futures easier to compose, combine, and extend is useful to have as part of the API, since implementing this correctly and efficiently can be tricky.

Modification:
Add `Future.map(Function<V,R>) -> Future<R>` and `Future.flatMap(Function<V,Future<R>>) -> Future<R>` default methods to the `Future` interface.
These methods return new Future instance, that will be completed when the original future completes, and the result will be processed through the given mapping function.
These two methods take care to propagate cancellation and exceptions correctly:
Cancellation propagates both ways between the new and original future.
Failures only propagate from the original future to the returned new Future instance.

Result:
A few convenient methods for modifying and composing futures.

This PR fixes #8523, and perhaps also #2105
@chrisvest
Copy link
Contributor

map and flatMap methods got merged into master — closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.