Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zip 3 or more streams #159

Closed
xiaojian-hong opened this issue Apr 25, 2021 · 0 comments
Closed

zip 3 or more streams #159

xiaojian-hong opened this issue Apr 25, 2021 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@xiaojian-hong
Copy link
Member

I thought it would be straightforward, but I'm facing some difficulties...

I have a data source app that sends 5 packets encoded with 5 different keys:

func generateAndSendData(stream quic.Stream) {
    keys := []byte{0x10, 0x11, 0x13, 0x7E, 0x7F}

    for i, key := range keys {
        time.Sleep(100 * time.Millisecond)
        
        codec := y3.NewCodec(key)

        sendingBuf, _ := codec.Marshal(int64(i))

        _, err := stream.Write(sendingBuf)
        if err != nil {
            log.Printf("Couldn't send buffer with i=%v", i)
        } else {
            fmt.Print(".")
        }
    }
}

And the flow app that has 5 streams that are subscribed to each individual key and then zipped together:

var zipper = func(_ context.Context, a interface{}, b interface{}) (interface{}, error) {
    accumulator, ok := a.([]interface{})
    if !ok {
        fmt.Printf("No accumulator: %v + %v\n", a, b)
        return []interface{}{a, b}, nil
    }

    fmt.Printf("With accumulator: %v + %v\n", accumulator, b)
    accumulator = append(accumulator, b)
    return accumulator, nil
}

var convert = func(v []byte) (interface{}, error) {
    fmt.Printf("Got: %v\n", v)
	return y3.ToInt64(v)
}

// Handler will handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
	streamA:= rxstream.Subscribe(0x10).OnObserve(convert)
	streamB:= rxstream.Subscribe(0x11).OnObserve(convert)
	streamC:= rxstream.Subscribe(0x13).OnObserve(convert)
	streamD:= rxstream.Subscribe(0x7E).OnObserve(convert)
	streamE:= rxstream.Subscribe(0x7F).OnObserve(convert)

    return streamA.
        ZipFromIterable(streamB, zipper).
        ZipFromIterable(streamC, zipper).
        ZipFromIterable(streamD, zipper).
        ZipFromIterable(streamE, zipper).
        StdOut().
	Encode(0x11)
}

The problem, is that only the first two packets sent by the source are received on the flow. The order of zipping doesn't matter, the order of sending by the source matters only, for what I can see...

What can I do? (Also, I'm zipping sequentially, because there's no Merge operator... Is it really missing?)

Originally posted by @kuredoro in #158

@xiaojian-hong xiaojian-hong self-assigned this Apr 25, 2021
@xiaojian-hong xiaojian-hong added the bug Something isn't working label Apr 25, 2021
fanweixiao pushed a commit that referenced this issue Apr 30, 2021
* fix(rx): prevent blocking when Rx function returns an error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants