Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky streaming test #460

Closed
gvolpe opened this issue Feb 1, 2021 · 12 comments · Fixed by #560 or #782
Closed

Flaky streaming test #460

gvolpe opened this issue Feb 1, 2021 · 12 comments · Fixed by #560 or #782
Labels
help wanted Extra attention is needed pinned

Comments

@gvolpe
Copy link
Member

gvolpe commented Feb 1, 2021

dev.profunktor.redis4cats.RedisStreamSpec:
==> X dev.profunktor.redis4cats.RedisStreamSpec.append/read to/from a stream  3.219s java.util.NoSuchElementException: null
    at fs2.Stream$CompileOps.$anonfun$lastOrError$3(Stream.scala:4329)
    at scala.Option.fold(Option.scala:251)
    at fs2.Stream$CompileOps.$anonfun$lastOrError$2(Stream.scala:4329)

https://github.com/profunktor/redis4cats/runs/1805553054?check_suite_focus=true

@gvolpe gvolpe added the help wanted Extra attention is needed label Feb 1, 2021
@stale
Copy link

stale bot commented Apr 2, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Apr 2, 2021
@gvolpe gvolpe added pinned and removed wontfix This will not be worked on labels Apr 2, 2021
@gvolpe gvolpe reopened this Jul 1, 2021
@gvolpe
Copy link
Member Author

gvolpe commented Jul 1, 2021

Still seeing this

dev.profunktor.redis4cats.RedisStreamSpec:
==> X dev.profunktor.redis4cats.RedisStreamSpec.append/read to/from a stream  3.212s java.util.NoSuchElementException: null
    at fs2.Stream$CompileOps.$anonfun$lastOrError$3(Stream.scala:4401)
    at scala.Option.fold(Option.scala:251)
    at fs2.Stream$CompileOps.$anonfun$lastOrError$2(Stream.scala:4401)

@wiwa
Copy link
Contributor

wiwa commented Feb 13, 2023

I was playing around with this. Somehow, setting the read to block = Some(1.millis) removes the flakiness (tested via 100+ runs). This doesn't make sense to me, since the default is block = Some(Duration.Zero), i.e. wait forever.

If we use N messages instead of 1, and say interruptAfter(1000.seconds), then .take(N - 1) makes it "work", while .take(N) leads to hanging forever.

Question: why is there a repeat here?

@gvolpe
Copy link
Member Author

gvolpe commented Feb 14, 2023

I was playing around with this. Somehow, setting the read to block = Some(1.millis) removes the flakiness (tested via 100+ runs). This doesn't make sense to me, since the default is block = Some(Duration.Zero), i.e. wait forever.

Fixes that require a sleep or so are really flaky, that's why this test is marked as such.

If we use N messages instead of 1, and say interruptAfter(1000.seconds), then .take(N - 1) makes it "work", while .take(N) leads to hanging forever.

Does take(N - 1) get you all the published messages? I think that would be missing the final message, no?

Question: why is there a repeat here?

I can't remember any details of the streaming module, it's been too long, but since you seem to have it fresh I'll counter-ask, why shouldn't there be a repeat there? :)

@wiwa
Copy link
Contributor

wiwa commented Feb 14, 2023

Yes, it misses the final message. It's less of a "here's a fix" and more of a "here are some things I noticed about it". That being said, the current test essentially says "block forever", until the interrupt, so I am specificially lowering the sleep from "infinite"/3.seconds to 1.millis

The reason I don't expect a repeat is that (I believe) list <- Stream.eval(rawStreaming.xRead(offsets.values.toSet, block, count)) already gives me count messages -- I wouldn't expect more than that. But the repeat does let us continue the read from the initial offset...?

@gvolpe
Copy link
Member Author

gvolpe commented Feb 14, 2023

But the repeat does let us continue the read from the initial offset...?

No, it would not, but there could be something wrong related to the block and count arguments. I am not really familiar with the streaming API, it's mainly experimental in this library.

@wiwa
Copy link
Contributor

wiwa commented Feb 14, 2023

I think I figured it out: redis/lettuce#1593

The block = Some(Duration.Zero) means that XREAD before XADD causes the client to block indefinitely, because async().xread() is only async from the point of view of Java, not the underlying Redis client. So, we need to use an async connection pool. Verified that this works if we just use a 2nd connection for XREAD.

As for why block = Some(1.millis) works: it's because XREAD returns with nothing, and the stream will continue XREADing nothing (i.e. 1000 reads/sec) until XADD is hit.

(Edit) This is also why my initial impression of the redundancy of repeat is wrong. Since XREAD's count is actually just "maxCount" -- it will return immediately upon no data.

@gvolpe
Copy link
Member Author

gvolpe commented Feb 15, 2023

I don't think repeat is wrong, as we'd want a continuous stream of data from the XREAD source, but definitely some improvements could be made to the logic behind block and count.

So, we need to use an async connection pool. Verified that this works if we just use a 2nd connection for XREAD.

That makes sense, you can see how transactions and pipelining work in this library. Instead of adding two underlying connections, what you need is to create two different instances of Streaming via mkStreamingConnection, which share the same RedisClient, and use one for reading and the other for writing.

@wiwa
Copy link
Contributor

wiwa commented Feb 15, 2023

Yep, fully agree with you. I meant to say that my impression was wrong (and that repeat, of some form, is correct). Sorry for confusion!

@gvolpe
Copy link
Member Author

gvolpe commented Feb 15, 2023

Ah no problems, thanks for digging into it! I think what we can do immediately is to create a proper withRedisStream helper method for testing that returns two instances of Streaming for read/write. Let me know if you'd like to give that a go.

That may already fix the test, but obviously your findings on the usage of block and count need to be fixed as well.

@gvolpe
Copy link
Member Author

gvolpe commented Feb 15, 2023

Documentation for streams needs to be marked as experimental as well, and we could document that two streaming connections need to be created (one for read; one for write), similar to how transactions/pipelining work.

@wiwa
Copy link
Contributor

wiwa commented Feb 16, 2023

I think I'll have time before the weekend, will take a look in the direction you suggested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed pinned
Projects
None yet
2 participants