Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support scatter/gather operations with unknown number of splits #20

Open
samuell opened this issue Apr 22, 2016 · 10 comments
Open

Support scatter/gather operations with unknown number of splits #20

samuell opened this issue Apr 22, 2016 · 10 comments

Comments

@samuell
Copy link
Member

samuell commented Apr 22, 2016

Support Scatter operation: Produce more than one input per task (sub-stream)
Right now, a task produces only one output target per out-port. We need to be able to sometimes produce more than one output per task. This is needed e.g. in scatter / split operations.

Support gather operation: Receiving more than one input before triggering task (sub-stream) #19
Right now, a task is triggered as soon as all target in-ports have received a target. We need to be able to sometimes receive more than one input before triggering a new task. This is needed e.g. in gather or reduce operations.

Note that scatter with known number of splits is already possible, using parameter channels.

@samuell samuell changed the title Support scatter operation: Produce more than one input per task (sub-stream) Support scatter/gather operations with unknown number of splits Apr 22, 2016
@samuell
Copy link
Member Author

samuell commented Apr 22, 2016

Maybe this could be solved by using information packets (IPs) allowing a variable number of targets in them?

@samuell
Copy link
Member Author

samuell commented Jun 9, 2016

One solution to this is now implemented with the FileSplitter, and Merger components.

@samuell
Copy link
Member Author

samuell commented Dec 6, 2016

This could probably be implemented with a a field in the IP, containing a channel, that is used to read a sub-stream, until this "inner" channel is closed.

@samuell
Copy link
Member Author

samuell commented Dec 12, 2016

Excerpts from notes saved in my journal today:

Have been pondering this issue for several days now, without really getting anywhere ... until I realized now:

You have to do like in MapReduce / Spark etc, and insert "sorting keys" or what you want to call it, in each data item. This is since you don't really know in before hand what kind of "reduce" steps you want to run on the data. You might want to group on this and that, combine groups, etc etc. It even turns out, this kind of grouping is at the core of what a lot of data science is about - summarizing and comparing various groupings of data! Thus, this topic is really really important, and deserves a general solution.

Now I'm mostly thinking about whether this should be done by manually inserting such keys on demand at selected places in a workflow, or if we could just save all parameter values sent to a task, and use that as grouping/sorting keys when needed.

We should also remember that these (parameter values) need to be saved and logged for audit purposes, and so it might be a good idea to keep them synced somehow. or we could maybe allow to create aliases of keys, so that we can have a fixed alias to use in downstream groupings/sortings etc, even if we want to change the name of a parameter upstream. So then, if we want to do this, we can
just also update the alias for that parameter.

@monkeybutter
Copy link

monkeybutter commented Jan 15, 2017

Hi Samuel,

I'm just reading your notes here and I find them very interesting. I'm at the moment trying to figure out how to solve a very similar problem, how to integrate reduce operation naturally into a Go's FBP data processing pipeline.

Something that I'm considering is to specify in each IPs belonging to the same reduce group a reference to a channel that's used to perform the reduce operation. The tricky part is that the process in charge of doing the reduce operation has to maintain state during the operation (cannot be shared) and reset itself or die once it completes and sends the result back to the network. This would mean creating processes on demand per reduce operation or group of IPs to be reduced and I'm not sure how all this would fit within the FBP paradigm.

The proposal is still not clear in my head so this is just a vague idea that has come up reading your comments. I'll try to think more about this so I can propose something more elaborated, but I'd love to discuss more about this problem.

@samuell
Copy link
Member Author

samuell commented Jan 16, 2017

Hi @monkeybutter and thanks for very interesting input!

It is interesting that you propose using a channel for the reduce operation. I had not even thought about that option, so I will need to think more about this :)

Off arm though, it seems that specifying how to produce the reduce operation in the IPs in an early stage somewhat defeats the idea of keeping processes self contained: That is, you would want to keep all the logic for a particular reduce operation fully contained within one process. Therefor, I think I'm leaning towards the key-idea so far, since keys for data don't explicitly define how to do a reduce operation, but only provides the raw-data that can be used for it. Please correct me if I misunderstood!

Regarding multiple reduce operations, and what you call "creating processes on demand", this is, AFAIS, what the concept of FBP sub-networks are for, tightly coupled with sub-streams. That is, given some kind of delimiter for sub-streams (in JP Morrisons FBP book, they are special open/close bracket IPs), a new sub-network will be "lauched" (exactly as you are on to) once a sub-stream is opened. It will then run until the sub-stream is closed, whereafter it is terminated.

Many thanks for the input! This issue is my biggest concern at the moment, so I'm very eager to find a generic and elegant solution to it very soon. Let's keep thinking! :)

@monkeybutter
Copy link

monkeybutter commented Jan 16, 2017

I agree that having to define the reduce operation as part of an IP defeats the idea of FBP. It's kind of introducing state in IPs instead of just keeping state at the network level.

I've been thinking more about the concept of gather/reduce operations inside networks. My first approach was to think about networks as a long-lived processes in which IPs from different "tasks" (group of IPs belonging to a reduce operation), arrive asynchronously. This is why I was thinking that IPs had to have some kind of identification to perform the reduce operation. Also, this is where my previous proposal of specifying a channel to perform the reduce operation comes from.

After some more thought, I've realised that if networks are conceived as short-lived processes and are dynamically created per "task", the reduce operation happens naturally as another process in the network. A reduce operation only produces an output when it has gone through all the IPs that constitute a "task" and the whole network terminates after this.

I don't know how this short-lived networks concept fits with your SciPipe design. As far as I know, instantiating a network seems to be very light in your design, it just requires instantiating a few channels, structs and goroutines. I wonder how much this is related to the concept of "sub-networks" that you mention in your comment. Reading your description of them it seems quite related to the idea of short-lived networks that I'm trying to describe.

Anyway, I'm looking forward to read your comments on this. I don't know if this is helping you to figure out a solution for your original problem: "scatter/gather with unknown number of splits" but it has helped me a lot to think about how reduce operations can be introduced in a FBP network. Thanks for that.

@samuell
Copy link
Member Author

samuell commented Jan 17, 2017

I've been thinking more about the concept of gather/reduce operations inside networks. My first approach was to think about networks as a long-lived processes in which IPs from different "tasks" (group of IPs belonging to a reduce operation), arrive asynchronously. This is why I was thinking that IPs had to have some kind of identification to perform the reduce operation. Also, this is where my previous proposal of specifying a channel to perform the reduce operation comes from.

I see.

After some more thought, I've realised that if networks are conceived as short-lived processes and are dynamically created per "task", the reduce operation happens naturally as another process in the network. A reduce operation only produces an output when it has gone through all the IPs that constitute a "task" and the whole network terminates after this.

Exactly!

I don't know how this short-lived networks concept fits with your SciPipe design. As far as I know, instantiating a network seems to be very light in your design, it just requires instantiating a few channels, structs and goroutines. I wonder how much this is related to the concept of "sub-networks" that you mention in your comment. Reading your description of them it seems quite related to the idea of short-lived networks that I'm trying to describe.

Yes, I think your "short-lived networks" and FBP sub-networks overlap quite exactly.

In SciPipe, I have example code on how to wire up a sub-network (workflow), in the factory method of a "subnetwork process" here. But it does not yet support sub-streams, which is a big deal, and this is what I hope to solve with a sub-stream channel field in the IPs as described above. I just hope I'm not introducing state in IPs that way somehow :)

@samuell
Copy link
Member Author

samuell commented Jan 26, 2017

So a few relevant questions now seem to be:

  • How do the tagging operation look like, in practice?
  • How does a splitting component look like?
  • How does a parameter sweep component look like?
  • How are nested parameter sweeps implemented?

@samuell
Copy link
Member Author

samuell commented Jan 27, 2017

Optimally we would like to allow most of these operations, using the standard SciProcess type. With this in mind, this issue depends on #30 , and maybe another feature, of creating parameter sweeps based on taking parameters via channels instead of as fixed values (so that a cross-product of all parameter combinations can be produced).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants