-
Notifications
You must be signed in to change notification settings - Fork 13.6k
[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend #26651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
TypeSerializer<SV> newSerializer, | ||
TtlTimeProvider ttlTimeProvider) { | ||
|
||
Preconditions.checkArgument(priorSerializer instanceof TtlAwareSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in the variable names we use prior and new and previous and current. I suggest being consistent;
previousSerializer previousAwareSerializer
currentSerializer currentTtlAwareSerializer
@@ -212,7 +214,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable( | |||
StateDescriptor<?, V> stateDesc, | |||
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory, | |||
boolean allowFutureMetadataUpdates) | |||
throws StateMigrationException { | |||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we changing this away from putting out the specific exception - which seems best practise to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidradl migrateTtlAwareStateValues will also throw FlinkRuntimeException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree keep the original exception, as the FlinkRuntimeException
is uncheck one and there is no need to claim throwing it.
+ previousStateSerializer | ||
+ ")."); | ||
} else if (stateCompatibility.isCompatibleAfterMigration()) { | ||
migrateStateValues(stateDesc, previousStateSerializer, newStateSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the state is not compatible, I suggest we should at least log for that case - or should there be an error in that case?
@@ -299,6 +292,56 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable( | |||
return stateTable; | |||
} | |||
|
|||
/** Only triggering state migration when the state TTL is turned on or off is supported. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the text is supported
- I am not sure what this means.
TypeSerializer<List<V>> newSerializer, | ||
TtlTimeProvider ttlTimeProvider) { | ||
|
||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like there is suplication of code, can we re arrange the code so the following is not repeated for the different cases
priorSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);
Preconditions.checkArgument(
newSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);
TtlAwareSerializer<V, ?> priorTtlAwareElementSerializer =
((TtlAwareSerializer.TtlAwareListSerializer<V>) priorSerializer)
.getElementSerializer();
TtlAwareSerializer<V, ?> newTtlAwareElementSerializer =
((TtlAwareSerializer.TtlAwareListSerializer<V>) newSerializer)
.getElementSerializer();
…abling ttl in HeapKeyedStateBackend Co-authored-by: hejufang <hejufang@bytedance.com> Co-authored-by: Xiangyu Feng <xiangyu0xf@gmail.com>
Cool, I've closed another PR. |
@Zakelly Kindly remind for review. |
"State should be an AbstractRocksDBState but is " + state); | ||
} | ||
AbstractHeapState<K, N, V> heapState = (AbstractHeapState<K, N, V>) state; | ||
TtlAwareSerializer<V, ?> previousTtlAwareSerializer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable previousTtlAwareSerializer
is not used. You can safely delete this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, overall looks good
@@ -212,7 +214,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable( | |||
StateDescriptor<?, V> stateDesc, | |||
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory, | |||
boolean allowFutureMetadataUpdates) | |||
throws StateMigrationException { | |||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree keep the original exception, as the FlinkRuntimeException
is uncheck one and there is no need to claim throwing it.
|
||
while (iterator.hasNext()) { | ||
final StateEntry<K, N, V> entry = iterator.next(); | ||
stateTable.put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It there any concurrency issue when performing stateTable.put()
over the iteration?
If so, I'd suggest a new inner transform method to in-place update all the value of inner entries
[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend
What is the purpose of the change
Support state migration between disabling and enabling ttl in HeapKeyedStateBackend
Brief change log
Add
migrateTtlValue
inAbstractHeapState
. When the state TTL switch changes, trigger the migration of state data.Verifying this change
This change is already covered by existing tests, such as
StateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromEnablingToDisabling
andStateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromDisablingToEnabling
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation