Skip to content

[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend #26651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

hejufang
Copy link
Contributor

@hejufang hejufang commented Jun 8, 2025

[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend

What is the purpose of the change

Support state migration between disabling and enabling ttl in HeapKeyedStateBackend

Brief change log

Add migrateTtlValue in AbstractHeapState. When the state TTL switch changes, trigger the migration of state data.

Verifying this change

This change is already covered by existing tests, such as StateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromEnablingToDisabling and StateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromDisablingToEnabling.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 8, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@hejufang hejufang changed the title [FLINK-37294][state] Support state migration between disabling and enabling ttl in RocksDBKeyedStateBackend [FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend Jun 8, 2025
TypeSerializer<SV> newSerializer,
TtlTimeProvider ttlTimeProvider) {

Preconditions.checkArgument(priorSerializer instanceof TtlAwareSerializer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in the variable names we use prior and new and previous and current. I suggest being consistent;
previousSerializer previousAwareSerializer
currentSerializer currentTtlAwareSerializer

@@ -212,7 +214,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
StateDescriptor<?, V> stateDesc,
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory,
boolean allowFutureMetadataUpdates)
throws StateMigrationException {
throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing this away from putting out the specific exception - which seems best practise to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl migrateTtlAwareStateValues will also throw FlinkRuntimeException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree keep the original exception, as the FlinkRuntimeException is uncheck one and there is no need to claim throwing it.

+ previousStateSerializer
+ ").");
} else if (stateCompatibility.isCompatibleAfterMigration()) {
migrateStateValues(stateDesc, previousStateSerializer, newStateSerializer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the state is not compatible, I suggest we should at least log for that case - or should there be an error in that case?

@@ -299,6 +292,56 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
return stateTable;
}

/** Only triggering state migration when the state TTL is turned on or off is supported. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the text is supported - I am not sure what this means.

TypeSerializer<List<V>> newSerializer,
TtlTimeProvider ttlTimeProvider) {

Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there is suplication of code, can we re arrange the code so the following is not repeated for the different cases


              priorSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);
        Preconditions.checkArgument(
                newSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);

        TtlAwareSerializer<V, ?> priorTtlAwareElementSerializer =
                ((TtlAwareSerializer.TtlAwareListSerializer<V>) priorSerializer)
                        .getElementSerializer();
        TtlAwareSerializer<V, ?> newTtlAwareElementSerializer =
                ((TtlAwareSerializer.TtlAwareListSerializer<V>) newSerializer)
                        .getElementSerializer();

@xiangyuf
Copy link
Contributor

@hejufang Hi, I did some improvement for your implementation in this PR(#26674). I've also changed this commit as a co-authored commit. PTAL.

…abling ttl in HeapKeyedStateBackend

Co-authored-by: hejufang <hejufang@bytedance.com>
Co-authored-by: Xiangyu Feng <xiangyu0xf@gmail.com>
@hejufang
Copy link
Contributor Author

@hejufang Hi, I did some improvement for your implementation in this PR(#26674). I've also changed this commit as a co-authored commit. PTAL.

@xiangyuf Thank you for the improvements. I have push the new commit to the current branch, and we can continue to track this PR.

@xiangyuf
Copy link
Contributor

Cool, I've closed another PR.

@xiangyuf
Copy link
Contributor

@Zakelly Kindly remind for review.

"State should be an AbstractRocksDBState but is " + state);
}
AbstractHeapState<K, N, V> heapState = (AbstractHeapState<K, N, V>) state;
TtlAwareSerializer<V, ?> previousTtlAwareSerializer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable previousTtlAwareSerializer is not used. You can safely delete this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment resolved

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, overall looks good

@@ -212,7 +214,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
StateDescriptor<?, V> stateDesc,
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory,
boolean allowFutureMetadataUpdates)
throws StateMigrationException {
throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree keep the original exception, as the FlinkRuntimeException is uncheck one and there is no need to claim throwing it.


while (iterator.hasNext()) {
final StateEntry<K, N, V> entry = iterator.next();
stateTable.put(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It there any concurrency issue when performing stateTable.put() over the iteration?
If so, I'd suggest a new inner transform method to in-place update all the value of inner entries

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants