Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reducer state #11

Merged
merged 4 commits into from Dec 17, 2018

Conversation

Projects
None yet
2 participants
@juhoautio
Copy link
Contributor

commented Nov 19, 2018

The reducer state is stored by Flink internally as "window-contents". Created a unit test to read reducer state & added static accessors in KeyedStateReader for convenience.

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

I will try to take a look today!

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

I think the reducing states in general are stored with a different format , much like list states

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

you need to have the Reducer itself to be able to combine the values as the state probably contains the individual elements to be reduced

@juhoautio

This comment has been minimized.

Copy link
Contributor Author

commented Nov 19, 2018

Ok, I think I can manage additional reducing of values read from state if that's needed.

But first: how to read the values from state?

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

Ah sorry maybe I was wrong here. I think the problem with your example is that you are using the MapStatereader. The reducer state is not a mapstate. It's a reducing state. Maybe the ValueStateReader would work better.

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

So im not really sure why the test case is constructed as such when you are trying to test the reducing state reading.

I think you probably copied the map state reading test, which wont work because map states are stored very differently from reducing states.

@juhoautio

This comment has been minimized.

Copy link
Contributor Author

commented Nov 19, 2018

Thanks, that's exactly the kind of insight that I was looking for! I realize now that it makes sense to try reading the reducing state with a ValueStateReader. The fact that my reduced value happens to be a map has nothing to do with an actual MapState.

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Nov 19, 2018

Cool, I would try to simplify the test case to not include so many operators, only the window and the minimal other stuff :) maybe that helps in further debugging

Successfully read reducer state of Map<String, String>
Running the test prints:

	countState = [{2=3}, {1=1}]

&

	mapValues = [{2=3}, {1=1}]

Which is correct because keyBy is by key only, it should randomly pick one map with key=1 (of the 3 input maps) and one map with key=2 (only 1 such input map).
@juhoautio

This comment has been minimized.

Copy link
Contributor Author

commented Dec 13, 2018

@gyfora thanks, I added commit.

Now the test successfully reads reducer state of Map<String, String> \o/


Running the test prints:

countState = [{2=3}, {1=1}]

&

mapValues = [{2=3}, {1=1}]

Which is correct, because keyBy is by key only => it should:

  • randomly pick one map with key=1 (of the 3 input maps)
  • and one map with key=2 (only 1 such input map)

If you have any further suggestions as I start to clean this up (and try to use this for my actual use case with a rather big state), please let me know!

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Dec 14, 2018

sounds good, then I guess you dont need to keep the changes for the AbstractMapStateReader. Or does that fix any issue?

It might make sense to add a static helper to the KeyedStateReader class so users wouldnt have to specify "reducer-state-name" etc

@juhoautio juhoautio force-pushed the juhoautio:reducing_state branch from b381be0 to a364013 Dec 14, 2018

@juhoautio juhoautio changed the title WIP support reducing state Support reading reducer state Dec 14, 2018

@juhoautio juhoautio changed the title Support reading reducer state Support reducer state Dec 14, 2018

@juhoautio juhoautio force-pushed the juhoautio:reducing_state branch from a364013 to b99043d Dec 14, 2018

@juhoautio

This comment has been minimized.

Copy link
Contributor Author

commented Dec 14, 2018

@gyfora this one's ready for review. Thanks for guiding me through to make this work!

@juhoautio juhoautio force-pushed the juhoautio:reducing_state branch from b99043d to 49a17a6 Dec 14, 2018

@juhoautio juhoautio force-pushed the juhoautio:reducing_state branch from 49a17a6 to a7f789f Dec 14, 2018

@gyfora gyfora merged commit c4a4287 into king:master Dec 17, 2018

@gyfora

This comment has been minimized.

Copy link
Contributor

commented Dec 17, 2018

looks good, thank you for experimenting with this!

I will start working on a Builder for creating the reader during the holidays, now we start to have too many static methods :)

@juhoautio juhoautio deleted the juhoautio:reducing_state branch Dec 17, 2018


public abstract class KeyedStateReader<K, V, O> extends RichFlatMapFunction<KeyedStateRow, O>
implements ResultTypeQueryable<O> {

private static final Logger LOGGER = LoggerFactory.getLogger(KeyedStateReader.class);
private static final long serialVersionUID = 1L;

// The name "window-contents" appears as a plain string in many places in Flink source code. There is no constant
// that we could refer from Flink.
private static final String REDUCER_STATE_NAME = "window-contents";

This comment has been minimized.

Copy link
@juhoautio

juhoautio Mar 6, 2019

Author Contributor

@gyfora, now that I'm looking at this, the correct term to use would be "window state" instead of "reducer state". Because it's not specific to reducer type of window operator. Is it ok if I make a PR to adjust the terminology?

This comment has been minimized.

Copy link
@gyfora

gyfora Mar 11, 2019

Contributor

sounds good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.