Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

euphoria-core: Consider adding support for side-outputs #124

Closed
vanekjar opened this issue May 10, 2017 · 1 comment
Closed

euphoria-core: Consider adding support for side-outputs #124

vanekjar opened this issue May 10, 2017 · 1 comment

Comments

@vanekjar
Copy link
Contributor

When we discussed implementation details of counters we came across that they may be implemented as side-output from each operator. Details in issue #31. But it seems side-outputs is more general concept deserving separate issue.

The purpose of this issue is to think about implementing similar concept to Apache Beam additional outputs or Apache Flink side outputs

According to @je-ik:

We could design a parallel outputs to the main output, by naming them (and therefore construct what looks like counter, but essentially is nothing more than another Dataset). The code would then look like this:

  Flow flow = Flow.create();
  Dataset<Integer> input = ...;
  Dataset<Integer> output = FlatMap.of(input)
      .using((in, ctx) -> {
        ctx.collect( /* do whatever transformation of `in` */ );
        ctx.collect("input-elements", 1L);
      })
      .output();
  Dataset<Long> inputElements = flow.getNamedStream("input-elements");
  // now I can do whatever i want with this stream, I can window it as I wish, aggregate by a function
  // of my choice and so on, and finally, persist the dataset where I wish

So,

  • in the example above I used strings to identify the corresponding outputs, but of course, it was just an example - this would need to be modified a little to incorporate strong typing of the output Datasets - this goes in the direction of tags in the sense of Beam
  • the output would probably be neither keyed nor windowed, it would, ofcourse, carry timestamp
  • the output would be available via the Flow
  • the executor can know if an output is not used, because user code has to read it (via getTaggedStream)
  • what you do with the output Dataset is left on the user code, so you can use it for bussiness logic (joining it with some "main" dataset) or monitoring and debugging (storing it into appropriate sink - e.g. elastic search)

A little modified example, which covers the above topics:

  Flow flow = Flow.create();
  Dataset<Integer> input = ...;
  NamedTag<Long> elementsTag = NamedTag.named("input-elements").typed(Long.class);
  Dataset<Integer> output = FlatMap.of(input)
      .using((in, ctx) -> {
        ctx.collect( /* do whatever transformation of `in` */ );
        ctx.collect(elementsTag, 1L);
      })
      .withNamedTags(elementsTag)
      .output();
  Dataset<Long> inputElements = flow.getTaggedStream(elementsTag);
  // now I can do whatever i want with this stream, I can window it as I wish, aggregate by a function
  // of my choice and so on, and finally, persist the dataset where I wish
@je-ik
Copy link
Contributor

je-ik commented Nov 14, 2018

In roadmap.

@je-ik je-ik closed this as completed Nov 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants