Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom control flow #85

Closed
ajaychandran opened this Issue Nov 3, 2015 · 6 comments

Comments

Projects
None yet
2 participants
@ajaychandran
Copy link

ajaychandran commented Nov 3, 2015

Hi,

I am trying to write user space device drivers using Monifu.

I have a USB port exposed as an Observable and Subscriber for reading and writing bits, similar to a Scalaz Exchange.

The driver must sequence requests sent to the device, that is, once a request is written to the device, the next one should be written only after a response is received. Furthermore, if a response is not received within a specified timeout, the first request must be retried.

I do not want to add this sequencing logic in the client. A client should be able to send any number of requests and the driver should sequence it appropriately.

Are there any examples for such custom processing? Any help will be much appreciated.

@ajaychandran

This comment has been minimized.

Copy link
Author

ajaychandran commented Dec 28, 2015

For posterity, I was able to solve the problem by updating the driver state defined as (last: Option[Request], queue: Queue[Request]) atomically. The state was updated when either a request or response was received.
Requests were written and scheduled for retry only when last is non empty. Note that this needs to be performed during response processing also.

@alexandru

This comment has been minimized.

Copy link
Member

alexandru commented Dec 28, 2015

@ajaychandran sorry for not helping you out with this. I haven't received the GitHub notification, saw your question later, postponed it, then forgot about it :-)

BTW, I would model the business logic itself by means of the scan operator, so I would keep a case class State(Option[Request], queue: Queue[Request]) as the accumulator of scan.

@ajaychandran

This comment has been minimized.

Copy link
Author

ajaychandran commented Dec 28, 2015

@alexandru No problems. The time spent in coming with the solution led to a deeper understanding of the library and the problem.

The operator scan would not work in this case since both the observable and observer need to update the state atomically.

A USB port is modeled as a Subject[BitVector, BitVector].
The driver transforms a port (by piping a codec at both ends) into a Subject[O, I] where O/I are output/input device messages. These unidirectional streams (write and read) need to coordinate using the atomic state (Option[O], Queue[O]) to achieve sequencing and retry.

PS: Thanks for creating this library. It is much easier to follow (IMO) than the competition and handles terminal conditions correctly*.

  • There is a problem with monifu.concurrent.cancelables.RefCountCancelable (as of version 1.0). Executing the block below does not print anything.
  val ref = RefCountCancelable(println("completed"))
  Seq.fill(10)(ref.acquire()).foreach(_.cancel())

I am using a custom BooleanCancelable. See https://gist.github.com/ajaychandran/c2229a351e5bb68e208f

@alexandru

This comment has been minimized.

Copy link
Member

alexandru commented Dec 28, 2015

@ajaychandran the RefCountCancelable example you listed is behaving properly, at least under its current contract. In order for you to be shown a "completed" message, the RefCountCancelable reference itself needs to be canceled.

val ref = RefCountCancelable(println("completed"))
Seq.fill(10)(ref.acquire()).foreach(_.cancel())
ref.cancel() // <-- needs this call to work

You see, by doing ref.cancel() you're in effect saying that "I want to cancel, but only after all child references have been canceled as well". If there are no child references active, then the cancellation is immediate. If there are child references active, then the last child reference to cancel will also trigger the cancellation.

@ajaychandran

This comment has been minimized.

Copy link
Author

ajaychandran commented Dec 28, 2015

I see now. That seems more reasonable.
Thanks.

PS: The class documentation is not clear on this.

@alexandru

This comment has been minimized.

Copy link
Member

alexandru commented Dec 28, 2015

@ajaychandran thanks, will update the api docs.

alexandru added a commit that referenced this issue Jun 29, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.