New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ZStream#zipWithLatest #1482
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a quick rebase after the rename of InputStream
to Pull
. Also left two minor comments.
@@ -1530,6 +1537,24 @@ class ZStream[-R, +E, +A](val process: ZManaged[R, E, Pull[R, E, A]]) extends Se | |||
*/ | |||
final def runDrain: ZIO[R, E, Unit] = run(Sink.drain) | |||
|
|||
final def schedule[R1 <: R, E1 >: E, B](schedule: ZSchedule[R1, A, B]): ZStream[R1 with Clock, E1, B] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really nice. Can it also replace spaced
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'll try to 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some weird behaviour when I use a schedule, where the second stream in my test makes progress much faster than the first one in the init phase. I will investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found it. The schedule will now contribute to deciding the end of the stream. So, if the schedule ends after 2 repeats, only 3 elements will be produced. It seems more intuitive to me (see the definition of fixed
), but I'm keen to hear your opinion. If ok I'll move on to look at spaced
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually found the previous behavior, in which decision.cont == false
means that the schedule is restarted, to be more intuitive. This is also more in line with aggregateWithin
, which restarts the schedule when it stops.
Could you explain what's the advantage?
By the way, we can attain this behavior with the previous formulation if you emit the schedule's output into the stream. We can then use takeWhile
or collectWhile
to end the stream when the schedule ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe, I had a realization myself. There are two ways to apply a schedule to a stream with regards to the schedule's cont
.
Given a stream A B C
and a schedule (Schedule.exponential(2.millis) && Schedule.recur(2) && Schedule.const("Done")).map(_._2)
, these are the interpretations:
-
As long as the schedule wants to continue, repeat the current element after a delay:
A |-2ms-| A |-4ms-| A |-8ms-| "Done" B |-2ms-| B |-4ms-| B |-8ms-| "Done" ...
-
As long as the schedule wants to continue, emit the stream's next element after a delay:
A |-2ms-| B |-4ms-| C |-8ms-| "Done"
The first interpretation is the existing spaced
. I guess that this implementation should be the second interpretation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dang, I was quite happy with the terseness of scheduleWith
😅 there may be some confusion as to which does what, are you ok with naming the one that acts on each elements scheduleElementsWith
and the one that acts on the whole stream scheduleWith
(I also think it would fix the confusion between ZStream.spaced
and ZSchedule.spaced
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, those names are good! We can bring ZStream#repeat
inline with that too:
- The signature can be changed to:
final def repeat[R1 <: R, B](schedule: ZSchedule[R1, Any, B]): ZStream[R1 with Clock, E, Either[B, A]]
- And a
repeatWith
variant can be added.
Can be done in a follow-up of course :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do in a follow-up, I'd like to see where the ticket on combinators is going.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ending with the schedule is more "foundational", since you can turn a terminating schedule into a forever schedule with Schedule#forever
. However it is less convenient. Usually this argues for two methods: one to do the more essential thing, and the other, more visible and obvious one, to do the Schedule#forever
thing.
for { | ||
is <- self.mergeEither(that).process | ||
state <- Ref.make[(Option[A], Option[B])]((None, None)).toManaged_ | ||
pull: Pull[R1, E1, C] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you could just put the pull
expression directly in the yield
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of a pattern in the ZStream
class, I think it's a little easier to read.
@iravid I think this is ready for a final review, other schedule-based combinators can be documented in the ticket? I'm happy to pick them up right after... but I wonder if sth more useful would be to convert tests to the new framework? |
@regiskuckaertz Help with the tests conversion would be much much appreciated 🙏🏻 but really, pick up whatever you fancy the most :-) |
@regiskuckaertz could you amend a commit to re-trigger the CI? |
for { | ||
decision <- schedule.update(a, sched) | ||
_ <- clock.sleep(decision.delay) | ||
_ <- state.set((!decision.cont, decision.state, Some(decision.finish))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm didn't we say that the schedule should not stop the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see that my example was unclear. In this formulation, I meant that the schedule should restart between the elements. So for a stream "A B C A B C"
, this should be result:
A |-2ms-| B |-4ms-| C |-8ms-| "Done" A |-2ms-| B |-4ms-| C |-8ms-| "Done"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this is implement for point 2 above:
- As long as the schedule wants to continue, emit the stream's next element after a delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the short circuiting can be attained by collectWhile { case Right(v) => v }
, which would stop on the first Left
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(sorry for dragging this!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, let's merge this and address in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the semantics of scheduleElements
, isn't it? 🤔 I could add a test to prove that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah not quite. scheduleElements
repeats every element and restarts
Thanks 🙏 |
Resolves #1204
Resolves #1492
Happy to close this if #1270 is updated (cc @contrun )
I have also added a few combinators I couldn't find how to express easily, but if I missed something I'll get rid of them.