Buffer all IPs and flush on disconnect #58

Closed
wants to merge 1 commit into
from

Conversation

Projects
None yet
2 participants
Member

kenhkan commented Dec 10, 2012

Phew. This is a big PR. So here we go:

This may be familiar to you:

class SomeComponent ...
...
@inPorts.in.on "beginGroup", (group) ->
  @groups.push(group)
@inPorts.in.on "data", (data) ->
   # Do something here
@inPorts.in.on "endGroup", (group) ->
  @groups.pop()
@inPorts.in.on "disconnect", () ->
   # Do some more things here
...

This is quite common in the existing libraries and I find myself doing this over and over again. Not only that, when I need to manipulate the data in batch, like examining the groups and deciding to send the data forward or not, I need to do something like this:

class SomeComponent ...
...
@inPorts.in.on "beginGroup", (group) ->
  @groups.push(group)
@inPorts.in.on "data", (data) ->
   @data.push(data)
@inPorts.in.on "endGroup", (group) ->
  @groups.pop()
@inPorts.in.on "disconnect", () ->
  # Do some tests/filters/manipulation/whatever here
  for group in @groups
    @outPorts.out.beginGroup(group)
  for datum in @data
    @outPorts.out.send(datum)
  for group in @groups
    @outPorts.out.endGroup()
  @outPorts.out.disconnect()
...

Doing either the first example or the second example repeatedly is not fun, nor is it correct: you lose the structure of groupings. Instead of having something like an XML-like tree structure, which groups are supposed to provide, by the time you access it, it's linear.

Hence this PR. It expands the role of the port to allow each port to have a buffer of incoming/outgoing connection. It flushes itself every time a connection is disconnected. The primary benefit is that until you disconnect, you know that you still have the data at bay and you can manipulate it. So for example:

class SomeComponent ...
...
@inPorts.in.on "beginGroup", (group) ->
  @outPorts.out.beginGroup(group)
@inPorts.in.on "data", (data) ->
  groups = @outPorts.out.getGroups()
  # Only grouped data are forwarded
  if groups.length > 0
    @outPorts.out.send(data)
@inPorts.in.on "endGroup", (group) ->
    @outPorts.out.endGroup()
@inPorts.in.on "disconnect", () ->
  buffer = @outPorts.out.getBuffer()
  # Do something with the buffer
  @outPorts.out.setBuffer(buffer)
  @outPorts.out.disconnect()
...

This is a trivial example but you can see that I can choose to manipulate all the "sent" groups/data before they actually get sent. Great for batch operations.

Here are the side benefits:

  1. Now a downstream process would not be aware that an IP is coming its way until the connection between its upstream processes disconnect. This is a problem right now because it forwards groups/data immediately so in a synchronous graph, all processes would receive a beginGroup before they get a data. This is not how the mental model of FBP should work (at least not mine :p)
  2. There is no initial downstream connection problem. See #40.
  3. You force the programmer to write correct FBP programs. I mean, you have to disconnect for your stuff to be sent.

Now this was done a while back. I have been testing it in production so it was before the style change and me knowing that there was a test directory. I will throw in tests for this but I need to know if this is something from which noflo can benefit. I imagined it could be a separate class BufferedPort extends Port but I think it's not necessary because it's backward-compatible.

Let me know if I can clarify any aspect of this since this is a pretty big change in the codebase.

Member

kenhkan commented Dec 10, 2012

Oh sorry. I forgot to add. The tests don't pass because test cases would have to be updated to take into account the fact that IPs are buffered. AsyncComponent would also be updated for that change. I'm simply looking for some feedback on the validity of this PR. If the idea is sound, I'd update the tests and relevant codes to make it pass the tests.

By the way, when I said backward-compatible, I meant the interface is backward-compatible. If a component currently expects IPs to be sent immediately without buffer, it'll break. So I don't know rather it's good to enforce some best practices like always disconnect after sending and always expect IPs to arrive as one connection rather than "split up" by types.

This is why I PR'ed for feedbacks. ;) Thanks!

Owner

bergie commented Jan 14, 2013

If we're buffering IPs, then the important thing to take care of is also that they're sent out in the same order as they've been received. This would simplify a lot of flows.

For example, if I have a component that fetches the contents of a web page asynchronously, and I send it a bunch of URLs, right now the results are sent out in whatever order the async callbacks fired. Instead, they should be sent out in the order the component received the original URLs.

Member

kenhkan commented Jan 15, 2013

Hmmm. That's true. Do you think this is the only major con to it? I used to have this as a BufferedPort which I would instantiate instead of noflo.Port but later I realized that the inconsistency between how two handle flow really doesn't work out.

I haven't had a chance to play around with AsyncComponent so I'm going to experiment with it and fix this problem and write some test cases if buffering does sound like a good direction to pursue.

Owner

bergie commented Jan 22, 2013

A bit of continuation to this, from the FBP book:

Now how do we prove to our satisfaction that this connection is processing our data correctly? Well, there are two constraints that apply to IPs passing between any two processes. If we use the names in the above example, then:

  • every IP must arrive at SELECTOR after it leaves FILTER
  • any pair of IPs leaving FILTER in a given sequence must arrive at SELECTOR in the same sequence

The first constraint is called the "flow" constraint, while the second is called the "order-preserving" constraint. If you think about it using a factory analogy, this is all you need to ensure correct processing. Suppose two processes A and B respectively generate and consume two IPs, X and Y. A will send X, then send Y; B will receive X, then receive Y (order-preserving constraint). Also, B must receive X after A sends it, and similarly for Y (flow constraint). This does not mean that B cannot issue its "receives" earlier - it will just be suspended until the data arrives. It also does not matter whether A sends Y out before or after B receives X. The system is perfectly free to do whatever results in the best performance. We can show this schematically - clearly the second diagonal line can slide forward or back in time without affecting the final result.

We should ensure that NoFlo obeys these rules, also with async components.

Owner

bergie commented Jan 22, 2013

...and even more from the FBP mailing list:

"Flow Based Programming" mentions three general constraints. To
these I have added two more. They are:

(FBP) The flow constraint: Basically this says a package cannot
be delivered until it has been sent. This may seem obvious but
it implies that data flows follow a flow diagram.

(FBP) The one place at a time constraint: In FBP an information
packet can only be in one place at a time - either in a queue,
within another IP, or within an auton. I distinguish between
documents and data; Documents are objects that can only be in
one place at a time. Data are not objects; they are only values.

(FBP) The consistent ordering constraint: The general statement
is that objects must be processed in the same order that they are
sent. The ordering constraint implies that packages must be
stored in queues before they are delivered.

There are three variants of this constraint, inport order, auton
order, and nest order.

Inport order: The inport queue (or its image in a larger queue)
preserves the order in which the packages were sent.

Auton order: Packages sent to an auton are processed in the order
they were sent. Auton order is usually associated with an auton
queue.

Nest order: A master queue holds all undelivered packages in the
order that they were sent.

Member

kenhkan commented Apr 19, 2013

It's been a while! Yes, the description of an FBP system makes a lot of sense here. Let me confirm I have a good grip on this before moving on to fix this.

Do you think the idea of buffering violates any of the constraints?

  1. Flow constraint: the receiving process does not get the second packet before the first because packets are put in a queue when buffered.
  2. Time constraint: the buffer is as described in the FBP book in the way that once you send one, it's in the buffer until it hits the receiving process. See below though.
  3. Ordering constraint: order should be preserved in the buffer as a queue.

The problem may lie in that manipulation may take place after a packet has been sent as you can access the buffer after sending packets. Would that be one of your concerns? If that's the case, would it be a good idea to make this into a separate component built on top of noflo.Component that handles easy buffering should anyone needs to manipulate the entire stream rather than on a packet-to-packet basis?

I obviously wrote it to suit my needs so there are a lot of blindspots for me. Please feel free to point them out as I want to make this work for the NoFlo community.

Member

kenhkan commented Apr 20, 2013

Thinking more on this, I just want to blast this out to see if my thinking is logical.

The problem I'm trying to solve here is that FBP documents are sometimes hierarchical. That is, each sub-group is not necessarily the only child of the parent group. Therefore, under the way current components are constructed, as soon as you don't send an IP out after manipulating it upon receiving it, the paradigm breaks. This is because this assumes the grouping structure to be linear (i.e. each sub-group is the only child of its parent).

There's quite a few examples of existing components that follow this pattern. One is the Throttle component. It pushes incoming IPs into a queue and release upon process() but by that time the hierarchy structure has been lost. There should be a community standard when it comes to a situation like this, even if we end up mandating end users who program the resulting graph to not use hierarchical grouping, although this is very unlikely and also very difficult to avoid.

If hierarchical grouping is allowed for the end users, the biggest issue then is that each receiving function (i.e. begingroup, data, or endgroup) must then manipulate and could only manipulate each IP individually, given that (1) the function has no information of the next IP unless the function caches the IP itself within the process, and even if it does that (2) the function cannot act upon any IP because once it has information about the next IP, the IP in question must already have been sent out.

This is obviously not a problem if you record each incoming IP and act on the current IP based on the IPs collected so far. The more likely case in real life is that you have to manipulate the IPs en masse within a particular group.

Another example would be to turn all IPs in a specific grouping (possibly identified by some regex on the group) into an array. Now, one may argue that this should be implemented as a graph. A process extracts the matching group and forwards only that part to a process that combines the IPs into an array. Then yet another process puts the array (now just one IP) back into the spot from which the original IPs were extracted.

I'm completely okay with this approach and I agree that although this greatly complicates things, it does align with the spirit of FBP. And of course you can reuse a generalized version of the above as a graph. I'm looking more for a standardized way to do something like this because if the graph approach is acceptable and preferred, we should not encourage component authors to write a component that processes any connection that postpones processing with a hierarchical grouping structure.

If this is not the desirable approach, then the solution is what I was trying to accomplish, which is to provide a Port that allows programmers to preserve the hierarchy while manipulate the IPs before sending them out. Though I must say I'm having second thoughts on this as I think the graph approach may be better.

Is there anything that I'm missing? I feel like this could be brought over to the FBP Google Group. I'm afraid that this is too contextual or too NoFlo-specific. Do you think it would be appropriate?

Member

kenhkan commented Apr 25, 2013

I realized that there is a better way to implement this functionality as an external package. I have it implemented and just testing it internally. Will update to this thread once it's good to go. Keeping NoFlo's core small is a better route now that I see how it should work. Closing this but will post updates on that package.

kenhkan closed this Apr 25, 2013

kenhkan deleted the kenhkan:buffer_all_ips_and_flush_on_disconnect branch Apr 25, 2013

Owner

bergie commented Apr 26, 2013

Thanks for the work on this! Let me know how that external package proceeds :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment