Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Consuming the output of sumByKey #651

Open
johnynek opened this issue Jan 29, 2016 · 23 comments
Open

Consuming the output of sumByKey #651

johnynek opened this issue Jan 29, 2016 · 23 comments

Comments

@johnynek
Copy link
Collaborator

Now away from Twitter, where insane data rate might not be the problem, I'd like the ability to have stricter semantics on sumByKey.

Right now, we don't guarantee that each value into a summer emits one output, indeed offline each key per batch emits at most one key value event.

It would be nice for that not to be the case. I'd like to see the full stream of outputs from a sumByKey, especially when used with a join.

I guess that:

  1. Most users don't consume the output of sumByKey.
  2. Most users want the behavior I mention when they do consume the event stream.
  3. For the few users that don't want all updates, and want a cache-enabled view, we give an option to keep current behavior.

In the case that there is in downstream consumer (common case) we continue to do the map-side aggregation and the batched aggregation.

Impacts: Twitter summingbird hadoop jobs would need to add an Option or may take linger to run in some cases.

Thoughts? @ianoc @isnotinvain @rubanm

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

How about having scanByKey instead ? Less Api breakage and sounds like the
behavior you want is scan?

On Thursday, January 28, 2016, P. Oscar Boykin notifications@github.com
wrote:

Now away from Twitter, where insane data rate might not be the problem,
I'd like the ability to have stricter semantics on sumByKey.

Right now, we don't guarantee that each value into a summer emits one
output, indeed offline each key per batch emits at most one key value event.

It would be nice for that not to be the case. I'd like to see the full
stream of outputs from a sumByKey, especially when used with a join.

I guess that:

  1. Most users don't consume the output of sumByKey.
  2. Most users want the behavior I mention when they do consume the
    event stream.
  3. For the few users that don't want all updates, and want a
    cache-enabled view, we give an option to keep current behavior.

In the case that there is in downstream consumer (common case) we continue
to do the map-side aggregation and the batched aggregation.

Impacts: Twitter summingbird hadoop jobs would need to add an Option or
may take linger to run in some cases.

Thoughts? @ianoc https://github.com/ianoc @isnotinvain
https://github.com/isnotinvain @rubanm https://github.com/rubanm


Reply to this email directly or view it on GitHub
#651.

@jnievelt
Copy link
Contributor

This seems be a store detail, right? E.g., we could implement a scalding.Store with a no-op partialMerge and a full scan in merge?

Online seems like it should be possible as well, though we've repeatedly run into the issue that those oldValue are reset when rolling over the batch threshold (e.g., hourly). I'm not sure if you'd want to include trying to tackle that as part of this.

@isnotinvain
Copy link
Contributor

After some discussion with @johnynek on gitter, I feel like adding a new method probably makes more sense. Something like .scanByKey as @ianoc mentioned.

We could debate about whether .sumByKey should have the proposed semantics and we come up with a new name for what is currently .sumByKey, or we can do the easier thing which is leave .sumByKey alone and add .scanByKey (side note -- is it really a scan? it's also got the side effect of summing right? so maybe a different name is needed).

But I think it's not great to make the behavior of .sumByKey tunable via an option. If we do this, then @johnynek's use case of relying on the new behavior becomes very fragile. So I think if we want 2 behaviors we should have 2 methods.

@johnynek
Copy link
Collaborator Author

Both suggestions are good ones.

We can certainly get this behavior with a certain store implementation. That would be opt in and safe.

We could also push this behavior on all systems with a method (scanSum, scanByKey, something like this).

We could even do both: refactor so we could make a store do this, then have a method that makes sure it always happens.

My concern about options to engage this is that it is complex (and has to be coordinated with the storm side options). scanSum (scanningSumByKey ?) on the other hand allows you to express logic that would be incorrect with the caching. e.g.:

src.map(fn)
  .sumByKey(store)
  .mapValues(_ => 1)
  .sumByKey(store2) 

now store2 has non-deterministic values.

Basically, to argue for my original point again: consuming the output of sumByKey seems pretty unsafe in the presence of caching. At stripe we do it to get a queue of events that a key is updated, but we don't trust the value. Not great.

So, I'd still like people to defend having caching when we consume the output of sumByKey. I feel like that API is really unsafe and adding a safe one with a new name is a real bummer (especially how rare this was in Jobs at Twitter, probably due in part to the unsafety).

@jnievelt the issue of consuming the output online was addressed here, but I dropped the ball: #547

@johnynek
Copy link
Collaborator Author

Another idea is to put a node in the graph for local summing:

// maybe reduces the number of events by combining events into the later event.
.sumByLocalKeys: Producer[P, (K, V)] 

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

So I don't believe a store works for sumByKey as @jnievelt mentions because in summingbird the store is a dumb thing per say. The bigger impact is the map side caching of it, you cannot have the full stream along with a map side cache.

Having sumByKey chained on is akin in weaker semantics to the windowed summing used in spark streaming and in flink. That is you are emitting effectively partial updates, though here we do not have a contract on their arrival rates. So things like threshold detection or using the latest value of a store and continuing on are possible.

Putting a node in the graph for local summing would involve changing every summingbird topology in the wild at twitter is it not? Thats a pretty breaking change.

Consuming the output without caching is possible today so that you get an emission for every input tuple is it not? Change the batching sizes to 1 ?

@jnievelt
Copy link
Contributor

A proper scan can't really leverage sumOption, for whatever that's worth. The caching also helps cut down on request rates to the store, though it's possible to do that caching and still emit everything. I have seen, however, cases where folks want some in-stream aggregation, so they'll do sumByKey on an empty/nop store and discard the oldValue.

One question I'd offer: if we were going to build something completely new here, would it be a scan? Would a fold/etc. be better, or something like mapGroup?

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

(Quick scan of internal dashboards for heron showed several jobs using this feature consuming streams at maybe not breakneck levels but one i just looked at was about 10k/sec ingestion rate)

screen shot 2016-01-28 at 5 30 25 pm

@johnynek
Copy link
Collaborator Author

one more note: if no one consumes the output of the sumByKey, you are always free to cache, so I had assumed that to be the common case. But I guess not with Tsar...

@johnynek
Copy link
Collaborator Author

I bet Tsar really wants sumByLocalKeys, (discard the Option and you get something like that)

Shame we can't be more direct.

@sritchie
Copy link
Collaborator

@johnynek Didn't we add something early on to disable map-side caching in the storm aggregator if anyone consumed the output of sumByKey? I know we talked about that. I can have a look in the code to figure out where that decision's made; I'm guessing that's all different with the new graph optimization stuff you all added.

I know we recognized this early and intended to kill the buffering feature if consumers existed.

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

I don't think you could consume from a summer back then at all, we only supported 1 sumByKey, and those didn't have outputs in the planned graphs afaik.

@isnotinvain
Copy link
Contributor

If we go the route of having 2 methods, couldn't the one with untrustworthy return values simply not have a return value?

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

Not easily if its in use today for that value. Its still a large change to code bases. Though if we wanted to terminate it I do like the idea of changing its type signature to a producer of unit... But that would require manual changes to anywhere that does use it for any purpose.

@isnotinvain
Copy link
Contributor

Well, for better migration we could have .sumByKey keeps its return value and returns all the deltas (so sumByKey becomes the proposed scanByKey), and then have the more efficient .unitSumByKey or something which has no return value which users can switch to if needed?

@ianoc
Copy link
Collaborator

ianoc commented Jan 29, 2016

I'm not sure what your suggesting there tbh

@isnotinvain
Copy link
Contributor

@ianoc oh I meant:

  1. change .sumByKey to have the new behavior that @johnynek wants of returning all deltas, not arbitrarily collapsed deltas, which is a type safe change but potentially not performance safe.

  2. add a new method, like .unitSumByKey (needs a better name) that returns nothing (produce of unit or something) to signal that the summing is done only as a side effect to the store. Anyone using .sumByKey that doesn't read the output and has performance issues would switch to this one.

@ianoc
Copy link
Collaborator

ianoc commented Feb 2, 2016

Both of those suggestions have massive migration pain @ twitter I would guesstimate. That would be my reservation. If your volunteering for the roll out, Oscar's original suggestion is fine with me....

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 2, 2016

I actually think the change might not be that bad. If no one consumes the output, no change. If they do, the either sumByKey again or .write. If they sumByKey, that one can do caching/collapsing. If they write, they will get all the events.

If @isnotinvain could help test this, I'd love to simplify the semantics.

@ianoc
Copy link
Collaborator

ianoc commented Feb 2, 2016

I guess the problem is if they do consume it say. Then if the input to the first sumByKey is large there will be no map-side reduce/caching occuring in that stream. The QPS to the store will sky rocket no?

@isnotinvain
Copy link
Contributor

So there's a way to do this with 0 migration, which would be leave .sumByKey alone, and add .unitSumByKey and .scanSumByKey. The only unfortunate thing about that is .sumByKey is the best name and now it's taken by the not-best method. It just seems to me like the behavior of current .sumByKey isn't really desirable, it should either be exact or have no useful return value at all (I think? unless there's a use case for the non-deterministic output of current .sumByKey?)

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 3, 2016

@ianoc can't local caching prevent the store from seeing much more traffic? We don't need to talk to the store for each update. But the output of the flatMappers will increase.

Id works like this: fetch a key. Thread it through a ton of values, write back to the store and emit the values. This was designed for this: https://github.com/twitter/storehaus/blob/develop/storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/BufferingStore.scala

@ianoc
Copy link
Collaborator

ianoc commented Feb 3, 2016

Few challenges with that...

  1. If it looses all mapside combination we suffer hot key issues in that single bolt, the serialization alone from that could overwhelm things. Ultimately however we lack good metrics to see how widespread this issue might be, the metrics that are available @ twitter are averages iirc. I just opened one of the larger Tsar/SB topologies at twitter here, mapper output is 7 million tuples per second, the map side caches are having a 10x effect before hitting the wire. This job however does not consume from the sumByKey

  2. Yep we should always be able to bolt on our reduce side aggregators, I think we will want to make the existing ones available than using anything in Buffering stores since there are multiple variants and changes to those performance curves that are too wide will be problematic I suspect for roll out.

(1) is probably the most problematic I think, since if it breaks things as deployed there is no obvious workaround's in place short of job restructuring.

(2) is a decent binary breaking change probably depending on how we do it

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants