Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split stream class #15

Closed
CJ-Wright opened this issue May 31, 2017 · 13 comments
Closed

Split stream class #15

CJ-Wright opened this issue May 31, 2017 · 13 comments

Comments

@CJ-Wright
Copy link
Member

Would it be possible to split the stream class into two classes?
The first class would hold the init, emit, child, and loop methods.
The second class would inherit from the first and implement map, sink, buffer, etc.

Inspiration:
I have a very specific data topology and I need the various functions to operate differently then they currently do. By splitting the class I'd be able to use the same base class and just have to re-implement map, filter, etc. for my data needs.

A similar proposition; would it be possible to make the various internals of Streams hot swappable?
I don't need to change all the methods, for example delay most likely could stay the same, as could buffer. But map, filter, sliding_windo, etc won't work for the event model data topology.

Thoughts?
@danielballan

@mrocklin
Copy link
Collaborator

mrocklin commented May 31, 2017 via email

@CJ-Wright
Copy link
Member Author

The data exists as a generator which puts out (name, document) pairs.
There are 4 different name/documents:

  1. start: Metadata from the start of the experiment or processing (one per stream)
  2. descriptor: Describes the data held in the events (many per stream)
  3. events: the actual data (many per descriptor)
  4. stop: Finishing metadata (one per stream)

For map I need to issue a new start, descriptor, and stop documents. The mapped function will only apply to the event documents, which will also issue new event documents. So I need map to be aware that it only applies the function to the event documents.

Similarly for filter, I need to issue new start, descriptor, and stop documents, while true filtering only applies to the event level.

@CJ-Wright
Copy link
Member Author

Sink is most likely ok.

@mrocklin
Copy link
Collaborator

mrocklin commented May 31, 2017 via email

@CJ-Wright
Copy link
Member Author

One of the issues with subclassing is that the Stream class is written into most of the functions, so if I want to use map which I overwrite, then zip which I don't and then map again zip will have switched me over to Streams which will not have my bespoke map method.

I tried to look at #13 but I'm not certain I understand it fully yet.

@mrocklin
Copy link
Collaborator

@CJ-Wright to give some context here there are a number of cases where we want to have different map/filter/etc. behaviors (remote, batched, dataframes, with metadata, etc.). To add complexity we sometimes want to apply multiple such behaviors at the same time (remote-batched). We're trying to think of a clean way to enable this kind of behavior more generally. (By "we" I mean myself, @ordirules, @danielballan, and now yourself.)

@CJ-Wright
Copy link
Member Author

Hmm, ok. I will try to think on it.

@jrmlhermitte
Copy link
Contributor

@CJ-Wright if I understand correctly, you want to use streams to treat the most general case possible of the event based architecture. (Link is to provide again context for any newcomer) Correct? That is very interesting.

As a suggestion for map, how about two operations? One that returns a 4 tuple of 2 tuple pairs for example, the mapped function could return:

(('start', startdoc), ('descriptor', descriptordoc), ('event', eventdoc), ('stop', stopdoc) )

followed by a concat stream (splits the tuples into individual elements in the stream).

You can use __stream_map__ to output the proper 4-tuple, and treat the input in the proper way you choose. For the same reason as the output, I would be tempted to use something similar to partition. With a little bit of work, I think this would do the job for this one case.

Here is an example of __stream_map__ being used here.
Note that __stream_map__ is commented out but I have used it and it has worked just fine. See the decorator parse_streamdoc on that page for the implementation. Line 304 (note : this line number will change over time) result = f(*args, **kwargs) is where the function is still run, and the rest is details to get the inputs/outputs to come out right.

I would also argue that treating streams in the most general case (start, stop, descriptor, events documents, where the number of events is unknown until we receive a stop) might not be a good idea. My reasoning is that the stop implies a conditional statement. This isn't easy to delay or submit to a cluster in my opinion (somewhere a machine will have to make a decision what code to run, which can create delays). I would rather aggregate results, and then pass them through as closed entities (so the delays, from aggregation, happen in a known region of code). However, the general case, is very interesting, and may likely lead to interesting ideas. I am just playing devil's advocate. @danielballan and @tacaswell may have some opinions on this particular paragraph, if they have time.

Please correct me if I misunderstood anything. I'm interested to hear your ideas.

@jrmlhermitte
Copy link
Contributor

This just came across my mind. I think in your situation, you may want something similar to groupby (not quite), basically an accumulator that caches results in some dictionary. For example, for your events, a dictionary of uids: acc = {'f8213' : [eventdoc1, eventdoc2], '134af' : [eventdoc3, eventdoc4]}
you could do with with accumulator as of now. However, there currently is no way of sending external signals to the accumulator (similar to flush in collect), so this would have to be added. The idea would be you could use this accumulator in conjunction with a stream that filters stop events. This stream would then activate the flush method of the accumulator stream, with the uid of the stop event that describes the appropriate events. This flush would then cause the accumulator to emit that data in some list. You could then use zip to also ensure things are coming at the right time. Here is an example:

sin = Stream()
# split stream into a stream of starts, events and stops
s_starts = sin.filter(lambda x : x[0] == 'start')
s_events = sin.filter(lambda x : x[0] == 'event')
s_stops = sin.filter(lambda x : x[0] == 'stop')

def myacc(prev, next):
    # accumulate docs of form prev = (name, doc)
    # into next : next = dict of uids where doc is saved
    # assume ('start', doc)
    uid = prev[0]['uid']
    if uid not in next:
        next[uid] = list()
    next[uid].append(prev[0])

def myflushroutine(prev, uid):
    # pops data off
    # here what is returned would be emitted by the accumulator
    return prev.pop(uid)

# this will only emit when flushed
s_start_accum = sin.accumulate(myacc, flush=myflushroutine)
# this will also only emit when flushed
s_events_accum = sin.accumulate(myacc, flush=myflushroutine)

# here x[0]['uid'] is the general uid referring to the collection of events
# so it could be some other index. I'm assuming : stop -> ('stop', dict(uid=N,...))
# I make the assumption that when stop emits, *all* events are assumed to have arrived
# flush would need to be written
s_stop_accum.map(lambda x : s_start_accum.flush(x[0]['uid']) )
s_stop_accum.map(lambda x : s_events_accum.flush(x[0]['uid']) )

# this will only emit when all three arrive
sout = s_start_accum.zip(s_events_accum, s_stop_accum)

Anyway, I think this is logic that would help you for the event based model. Happy to hear thoughts.

For @mrocklin, would giving the accumulator some flush option make sense? (the user would have to implement this on their own; if not given, default behaviour is usual accumulator behaviour) Or else, would a more complex version of collect make more sense? (I feel collect and accumulate could converge)

@CJ-Wright
Copy link
Member Author

Yes I'd like to work with the event architecture (I don't know if I want the most general case yet (async everything), but moving in that direction may be good).

I'd prefer to not have all the duplication if possible, especially as one may not even have a stop document when running the analysis and in the most general case one could get different descriptors at different positions.

I don't really understand what you mean by "stop implies a conditional statement". I don't know how useful aggregation will be, especially if we want to use this data live during experiments (autonomous experimentation/variation of parameters). As for the delays I imagine that the machine making decisions will slow down the execution less than having to wait for the entire experiment to stop (depending on the experiment of course). Some of this name dependent processing is already done with bluesky callbacks. To put a bit of a blunt point on this, if you are willing to aggregate the results why not listify and use Dask?

@jrmlhermitte
Copy link
Contributor

I misunderstood you (thought you meant async); the code was for this, sorry. (I still think it's interesting how streams seems to handle that, in my opinion, quite nicely).

I'm mainly worried about the logic handling the full set of event documents being intertwined with other logic. I would still aggregate before doing anything else, but I'm definitely open to suggestions.

I should explain some context, as I think we may have different usage cases. In my case, the start, event and stop document stream is either a full image or a time series of images. Thus, reading in an image is basically the same as reading the full set of start, events and stop. The decision making is outside of this logic. We thus can consider a "unit" in the streams to be the start, events and stop packaged together, and thus aggregate them together. I don't think we'll need to step outside that assumption in our case. It seems like this may not be the case for you.

@CJ-Wright
Copy link
Member Author

Yea sorry I should have specified async document generation/data acquisition.

@danielballan and I had a discussion about if the stream logic can be separated from the document logic. He made a compelling case as to why it might not be, which I slowly seem to be coming around on.

I feel that that aggregation is rather limiting. For in-line data processing we can't wait for the stop document to come in (for an experiment that takes hours this could take a while).

My current working model goes something along the lines of this:

  1. We need something that understands both streams and the document model, so make a new class which inherits from both. This way we can have document aware maps, filters, scans and the like. As much as I'd like to have these things separated there doesn't seem to be a clean way; too much of update has to be tailored to the document model.
  2. The class which understands documents looks a lot like CallbacksBase, it has start, descriptor, event, and stop methods and a dispatcher which knows which documents to send to whom.
  3. One problem with this is that in some cases we want to operate on multiple (name, doc) pairs simultaneously (eg I zip two streams together and now want to use map on a function with two arguments). So instead of taking in a single doc now the document methods take in a tuple of docs.
  4. This becomes problematic with the events. Usually we want to perform 3 operations on events.
    1. Interrogate the event "guts": we look at the actual data in the event, and hand it over to something that either makes a judgment or performs an operation on it or something else.
    2. Issue a new event: we hand over some information which needs to be re-packaged into a new event and then issue that event.
    3. Pass the event through: Just allow the event to pass without doing anything to it. (This one is of dubious usefulness)

@CJ-Wright
Copy link
Member Author

I'm going to close this as the register_api system seems to be working very well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants