Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel with transducer example fails #5

Open
rikkertkoppes opened this issue Sep 25, 2014 · 2 comments
Open

Channel with transducer example fails #5

rikkertkoppes opened this issue Sep 25, 2014 · 2 comments

Comments

@rikkertkoppes
Copy link

The channel from your blog implementation seems broken (in firefox and regenerator):

Full test:

var csp = require('js-csp');

function compose(f,g) {
    return function(x) {
        return f(g(x));
    }
}

function map(op) {
    return function(reduce) {
        return function(a,x) {
            return reduce(a, op(x));
        }
    }
}

function filter(pre) {
    return function(reduce) {
        return function(a,x) {
            return pre(x)? reduce(a,x): a;
        }
    }
}

var xform = compose(
    map(function(x) {
        return x * 2
    }),
    filter(function(x) {
        return x > 5
    })
);

var ch = csp.chan(1, xform);

csp.go(function*() {
    yield csp.put(ch, 1);
    yield csp.put(ch, 2);
    yield csp.put(ch, 3);
    yield csp.put(ch, 4);
});

csp.go(function*() {
    while(!ch.closed) {
        console.log(yield csp.take(ch));
    }
});

It outputs

2 6 8

It seems the "2" is not even processed by the xform.
Any idea what's going on?

@rikkertkoppes
Copy link
Author

I've been researching this a bit. It all works fine with maps, but fails with filters when they actually start filtering.

Core of the problem is that the firstgoroutine yielding on the channel put, but the value actually never gets put due to the filter. However, the goroutine is still suspended and resumes after the next successfull put (one that makes it through the filter)

Result is that the whole channel is out of sync.

No idea how to resolve it though, resuming the goroutine manually (next()) when the filter blocks seems awkward.

Maybe the solution lies in delegating the yield? (yield*) That would require rewriting channel.put and delegating the yield all the way to the adder (the transducer on the channel). That one would then yield or not.

@ubolonton
Copy link

The problem is that the reducing function add doesn't sit on the "straight-from-put-to-pending-take" path, where the buffer is not taken into account.

In core.async this was fixed by forcing all values to go through the reducing function first clojure/core.async@ad0ca86#diff-619df063e9a19af51078b3b476c49d0cL36

I still feel something is off in core.async though, specifically not being able to use transducer in a buffer-less channel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants