Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pipeline-async functions #26

Closed
zeroware opened this issue Nov 27, 2014 · 10 comments
Closed

add pipeline-async functions #26

zeroware opened this issue Nov 27, 2014 · 10 comments

Comments

@zeroware
Copy link
Contributor

I'm trying to apply an async function to every value of one channel.
In FRP this operation is typically called flatMap.
The function takes a function which return a chan.

Here is a draft of what it may look like :

var csp = require('js-csp');

/**
 *
 * @param {function} f
 * @param {Channel} input
 * @param {Channel} [output]
 * @return {Channel}
 */
function flatMap(f, input, output) {

    if (!output) {
        output = csp.chan();
    }

    csp.go(function* () {
        while(true) {
            var value = yield csp.take(input);

            if (value === csp.CLOSED) {
                output.close();
                break;
            }

            var mapResult = f(value);

            while (true) {
                var outValue = yield csp.take(mapResult);
                if (outValue === csp.CLOSED) {
                    break;
                } else {
                    yield csp.put(output, outValue);
                }
            }

        }
    });

    return output;
}

Is there any other way to compose existing function to produce this kind of result ?

EDIT : I realized that this implementation is closer to flatMapConcat than flatMap.

@zeroware
Copy link
Contributor Author

zeroware commented Dec 8, 2014

Anyone ?

@discreteinfinity
Copy link

In the above code, if 'f' were an async function then the variable 'mapResult' would be undefined. In fact, even if 'f' was a synchronous function, an error will be thrown when csp.take(mapResult) gets executed.
I think what you want to do is use csp.putAsync within a callback for whatever Async function you will be using. Then you could take from the channel(s) receiving the async results in a separate subprocess.

@zeroware
Copy link
Contributor Author

zeroware commented Dec 8, 2014

@discreteinfinity What I want is a function that take one input channel and apply the synchronous function "f" to each value from the input channel.
The "f" function contract is that the return value must be a channel.
Every returned channel is then merged into the (optionnaly provided) output channel which is finally returned.

This is the equivalent of flatMap in other FRP framework like Bacon.js

@jlongster
Copy link
Collaborator

@zeroware Sorry, I was at a work week last week and today been catching up on work stuff. I will respond more thoroughly later tonight or tomorrow.

I haven't fully read this issue, but a flatMap equivalent would be the mapcat transducer (see #26 (comment)).

var ch = chan(1, t.mapcat(foo));

Reading your comment above, that isn't the same though. It will only merge in concrete arrays, not channels. If you want that, you should look at the higher-order mapcatFrom and others: https://github.com/ubolonton/js-csp/blob/master/doc/advanced.md#mapcatfromf-ch-bufferorn.

These APIs will probably be slightly changed soon to make things as simple/uniform as possible, but the above should work.

@zeroware
Copy link
Contributor Author

zeroware commented Dec 8, 2014

@jlongster Thanks for answering. I don't know if mapcat can do it.
Here is my exact use case :

/**
 * Return a channel which contains the HTTP response of GET request to url
 *
 * @param {string} url
 * @return {Channel}
 */
var getUrlContent = function(url) {
//...
};

var chan = csp.chan();

yield csp.put(chan, "http://example.org");
yield csp.put(chan, "http://example.org/foo");

var responseChan = flatMap(chan, getUrlContent);

// Response chan will be filled with HTTP responses from urls contained in input channel.

@ubolonton
Copy link
Contributor

The current functions cannot do it (not directly).
I think what you want is similar to core.async's pipeline-async, which hasn't been ported yet.
I will look into it soon.

@zeroware zeroware changed the title flatMap like operation add pipeline-async functions Dec 9, 2014
@zeroware
Copy link
Contributor Author

zeroware commented Dec 9, 2014

@ubolonton Thanks , I looked at core.async lib and pipeline-async is what i'm looking for.
I will try to do a pull request if I manage to implement it successfully.

@discreteinfinity
Copy link

@zeroware Sorry, I guess didn't quite understand exactly how flatMap worked but seeing the spec for pipeline-async has made things clearer for me. Anyways, Im excited to hear that something like pipeline-async may be making its way into js-csp !

@zeroware
Copy link
Contributor Author

Initial pipeline implementation : #28

@ubolonton
Copy link
Contributor

I merged the pipeline implementation. We just need some docs/examples now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants