New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Observable.doOnStartTask and Observable.doOnStartEval #704

Merged
merged 1 commit into from Sep 10, 2018

Conversation

Projects
None yet
2 participants
@ilya-murzinov
Contributor

ilya-murzinov commented Sep 3, 2018

Resolves #697

// protocol calls, then the behavior should be undefined.
try {
if (isStart) {
cb(elem).onErrorHandle(onError).runAsync

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 3, 2018

Contributor

Not sure about this 🤔

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 4, 2018

Contributor

I realised that this is incorrect, will fix shortly

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from 68e68ea to 8e87de2 Sep 3, 2018

@codecov

This comment has been minimized.

codecov bot commented Sep 3, 2018

Codecov Report

Merging #704 into master will decrease coverage by 5.64%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #704      +/-   ##
==========================================
- Coverage   90.59%   84.94%   -5.65%     
==========================================
  Files         391      392       +1     
  Lines       10991    11756     +765     
  Branches     2057     2146      +89     
==========================================
+ Hits         9957     9986      +29     
- Misses       1034     1770     +736

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from 8e87de2 to 2e537d4 Sep 4, 2018

@Avasil

Thanks a lot for PR @ilya-murzinov
It's very close, I've left some comments

try {
if (isStart) {
val t = for {
_ <- cb(elem).onErrorHandle(ex => { onError(ex); Stop })

This comment has been minimized.

@Avasil

Avasil Sep 5, 2018

Collaborator

I'm not sure about this. @alexandru please correct me if I'm wrong.

Here we call out.OnError in case of error, return Stop but ignore it and still call out.OnNext which violates Observer contract (you shouldn't send anything after onError)

In my opinion we should short citcuit here in case of error. Ideally, we could write failing test now and then make it work after fix. Something similar to what you already have with hand written Subscriber but keeping track of onNext / onError calls

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 5, 2018

Contributor

Thanks @Avasil I've noticed this bug and was thinking about how to fix it after I pushed yesterday :D
And short-circuting in case of error makes perfect sense, I just couldn't figure out how to implement it in a best way.

@@ -1145,6 +1145,12 @@ abstract class Observable[+A] extends Serializable { self =>
final def doOnStart(cb: A => Unit): Observable[A] =
self.liftByOperator(new DoOnStartOperator[A](cb))
final def doOnStartTask(cb: A => Task[Unit]): Observable[A] =

This comment has been minimized.

@Avasil

Avasil Sep 5, 2018

Collaborator

Could you add scaladocs?
Similar to doOnStart but mentioning the difference and other variants (like doOnNextAck and others )

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 5, 2018

Contributor

sure, will do

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from 2e537d4 to a17d6e8 Sep 5, 2018

// stream the error downstream if it happens, but if the
// error happens because of calls to `onNext` or other
// protocol calls, then the behavior should be undefined.
def tryExecute(): Task[Option[Ack]] = try {

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 5, 2018

Contributor

@Avasil idk how elegant this looks to you, please feel free to suggest improvements!

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

I think we don't need to use Option at all.

We could approach it like in this example (I've realized that tryExecute() is unused there, already fixed that with PR):
https://github.com/monix/monix/blob/master/monix-reactive/shared/src/main/scala/monix/reactive/internal/operators/EvalOnNextAckOperator.scala#L52

Something like this:

cb(elem)
  .flatMap(_ => Task.fromFuture(out.onNext(elem))) // success
  .onErrorHandle(ex => { onError(ex); Stop) } ) // failure

And also cb(elem) in try-catch to protect against functions like _ => throw ex

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

I tried this one as well, but I didn't like the fact that onErrorHandle also catches errors from onNext, do you think this is fine?

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

Oh, you're right
Maybe we could match on attempt?

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from a17d6e8 to b95c80a Sep 6, 2018

isStart = false
t.attempt.flatMap {

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

@Avasil another try :D

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

btw thanks for the hint!

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

Cool @ilya-murzinov !
I've added two more comments with nit picks but overall it looks great. :) Thanks a lot!

t.attempt.flatMap {
case Left(ex) => Task.eval { onError(ex); Stop }
case Right(()) => Task.deferFuture(out.onNext(elem))
}.runAsync

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

We could use small optimization like here - it will be synchronous in case of Stop and sometimes onNext is synchronous too and just returnsAck at the end

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

yep, thanks for anther hint!

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

tbh, I don't completely understand how it works...

ack.value match {
  case Some(Success(sync)) => sync
  case _ => ack
}

I mean, sync is Ack and ack is CancelableFuture[Ack], so how do types align here? 🤔

t.attempt.flatMap {
case Left(ex) => Task.eval { onError(ex); Stop }
case Right(()) => Task.deferFuture(out.onNext(elem))

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

We run it right away and need Task only for types so IMO we could use Task.fromFuture

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

yep, makes sense, I just thought that you can't be "too lazy" in this case :D

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from b95c80a to b388656 Sep 6, 2018

* only for the first element
* @see [[doOnStart]] for a simpler version that doesn't allow
* asynchronous execution, or
* [[doOnStartEval]] for a version that can do ev

This comment has been minimized.

@Avasil

Avasil Sep 6, 2018

Collaborator

Oh, and sorry for the nitpicks but something went wrong with the formatting here (looks better on doOnStartEval version)

This comment has been minimized.

@ilya-murzinov

ilya-murzinov Sep 6, 2018

Contributor

yeah, totally, my bad!

@Avasil

Avasil approved these changes Sep 6, 2018

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from b388656 to b429a17 Sep 6, 2018

@ilya-murzinov ilya-murzinov force-pushed the ilya-murzinov:task-do-on-start-eval branch from b429a17 to 16d7643 Sep 6, 2018

@Avasil Avasil merged commit fd96400 into monix:master Sep 10, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment