Skip to content

Commit

Permalink
[draft] filter expired data when restore
Browse files Browse the repository at this point in the history
  • Loading branch information
ljz2051 committed Sep 1, 2023
1 parent ed4937c commit 7a058fd
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RestoredStateTransformer.RestoredStateTransformerFactory;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.internal.InternalKvState;

Expand All @@ -45,7 +46,10 @@ default <N, SV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull StateDescriptor<S, SV> stateDesc)
throws Exception {
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, StateSnapshotTransformFactory.noTransform());
namespaceSerializer,
stateDesc,
StateSnapshotTransformFactory.noTransform(),
RestoredStateTransformerFactory.noTransform());
}

/**
Expand All @@ -64,7 +68,8 @@ default <N, SV, S extends State, IS extends S> IS createOrUpdateInternalState(
<N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
@Nonnull RestoredStateTransformerFactory<SV> restoredStateTransformerFactory)
throws Exception;

/**
Expand All @@ -85,14 +90,15 @@ default <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalSta
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
@Nonnull RestoredStateTransformerFactory<SV> restoredStateTransformerFactory,
boolean allowFutureMetadataUpdates)
throws Exception {
if (allowFutureMetadataUpdates) {
throw new UnsupportedOperationException(
this.getClass().getName() + "doesn't support to allow future metadata update");
} else {
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory);
namespaceSerializer, stateDesc, snapshotTransformFactory, restoredStateTransformerFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
Expand All @@ -30,9 +32,10 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* Compound meta information for a registered state in a keyed state backend. This combines all
Expand All @@ -47,6 +50,7 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat
@Nonnull private final StateSerializerProvider<N> namespaceSerializerProvider;
@Nonnull private final StateSerializerProvider<S> stateSerializerProvider;
@Nonnull private StateSnapshotTransformFactory<S> stateSnapshotTransformFactory;
@Nullable private final Time stateTtlTime;

public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
Expand All @@ -59,22 +63,25 @@ public RegisteredKeyValueStateBackendMetaInfo(
name,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
StateSnapshotTransformFactory.noTransform());
StateSnapshotTransformFactory.noTransform(),
null);
}

public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
@Nonnull String name,
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull TypeSerializer<S> stateSerializer,
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory) {
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory,
@Nonnull StateTtlConfig stateTtlConfig) {

this(
stateType,
name,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
stateSnapshotTransformFactory);
stateSnapshotTransformFactory,
stateTtlConfig.isEnabled() ? stateTtlConfig.getTtl() : null);
}

@SuppressWarnings("unchecked")
Expand All @@ -96,7 +103,14 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER))),
StateSnapshotTransformFactory.noTransform());
StateSnapshotTransformFactory.noTransform(),

(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TTL) != null)
? Time.milliseconds(
Long.parseLong(
Preconditions.checkNotNull(
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TTL))))
: null);

Preconditions.checkState(
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
Expand All @@ -107,20 +121,26 @@ private RegisteredKeyValueStateBackendMetaInfo(
@Nonnull String name,
@Nonnull StateSerializerProvider<N> namespaceSerializerProvider,
@Nonnull StateSerializerProvider<S> stateSerializerProvider,
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory) {
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory,
@Nullable Time stateTtlTime) {

super(name);
this.stateType = stateType;
this.namespaceSerializerProvider = namespaceSerializerProvider;
this.stateSerializerProvider = stateSerializerProvider;
this.stateSnapshotTransformFactory = stateSnapshotTransformFactory;
this.stateTtlTime = stateTtlTime;
}

@Nonnull
public StateDescriptor.Type getStateType() {
return stateType;
}

public Optional<Time> getStateTtlTime() {
return stateTtlTime != null ? Optional.of(stateTtlTime) : Optional.empty();
}

@Nonnull
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializerProvider.currentSchemaSerializer();
Expand Down Expand Up @@ -258,10 +278,12 @@ && getStateType() != StateDescriptor.Type.UNKNOWN) {

@Nonnull
private StateMetaInfoSnapshot computeSnapshot() {
Map<String, String> optionsMap =
Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
stateType.toString());
Map<String, String> optionsMap = new HashMap<>(2);
optionsMap.put(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(), stateType.toString());
if (stateTtlTime != null) {
optionsMap.put(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TTL.toString(),
String.valueOf(stateTtlTime.toMilliseconds()));
}
Map<String, TypeSerializer<?>> serializerMap = CollectionUtil.newHashMapWithExpectedSize(2);
Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
CollectionUtil.newHashMapWithExpectedSize(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.flink.runtime.state;

import javax.annotation.Nullable;

public interface RestoredStateTransformer<T> {
@Nullable
T filterOrTransform(@Nullable T value);

interface RestoredStateTransformerFactory<T> {
RestoredStateTransformerFactory<?> NO_TRANSFORM = createNoTransform();

static <T> RestoredStateTransformerFactory<T> noTransform() {
return (RestoredStateTransformerFactory<T>) NO_TRANSFORM;
}

static <T> RestoredStateTransformerFactory<T> createNoTransform() {
return restoredMetaInfo -> new RestoredStateTransformer<T>() {
@Nullable
@Override
public T filterOrTransform(@Nullable T value) {
return value;
}
};
}

RestoredStateTransformer<T> createRestoredStateTransformer(
RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RestoredStateTransformer.RestoredStateTransformerFactory;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResult;
Expand Down Expand Up @@ -258,14 +259,20 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
: restoredKvMetaInfo;

stateTable.setMetaInfo(restoredKvMetaInfo);
if (restoredKvMetaInfo.getStateTtlTime().isPresent()
&& restoredKvMetaInfo.getStateTtlTime().get().toMilliseconds() < stateDesc.getTtlConfig().getTtl().toMilliseconds()) {
//TODO iterator all key-value data, and filter the expired data
}

} else {
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
namespaceSerializer,
newStateSerializer,
snapshotTransformFactory);
snapshotTransformFactory,
stateDesc.getTtlConfig());

newMetaInfo =
allowFutureMetadataUpdates
Expand Down Expand Up @@ -308,10 +315,11 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory)
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
@Nonnull RestoredStateTransformerFactory<SV> restoredStateTransformerFactory)
throws Exception {
return createOrUpdateInternalState(
namespaceSerializer, stateDesc, snapshotTransformFactory, false);
namespaceSerializer, stateDesc, snapshotTransformFactory, restoredStateTransformerFactory, false);
}

@Override
Expand All @@ -320,6 +328,7 @@ public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalStat
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory,
@Nonnull RestoredStateTransformerFactory<SV> restoredStateTransformerFactory,
boolean allowFutureMetadataUpdates)
throws Exception {
StateTable<K, N, SV> stateTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ private void readStateHandleStateData(
try (InputStream kgCompressionInStream =
streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {

registeredKVStates.get(kvStatesById.get("")).metaInfo.getStateSnapshotTransformFactory().createForDeserializedState().get().filterOrTransform()

readKeyGroupStateData(
kgCompressionInStream, kvStatesById, keyGroupIndex, numStates, readVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public enum CommonOptionsKeys {
* operator state is distributed on restore
*/
OPERATOR_STATE_DISTRIBUTION_MODE,
/** Key to define the state ttl of a key/value keyed-state */
KEYED_STATE_TTL,
}

/** Predefined keys for the most common serializer types in the meta info. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.RestoredStateTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -220,7 +221,9 @@ TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDes
OIS originalState =
(OIS)
stateBackend.createOrUpdateInternalState(
namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
namespaceSerializer, ttlDescriptor,
getSnapshotTransformFactory(),
getStateRestoreTransformerFactory());
return new TtlStateContext<>(
originalState,
ttlConfig,
Expand Down Expand Up @@ -268,6 +271,11 @@ private StateSnapshotTransformFactory<?> getSnapshotTransformFactory() {
}
}

private RestoredStateTransformer.RestoredStateTransformerFactory<TtlValue<SV>> getStateRestoreTransformerFactory() {
return new TtlStateRestoreTransformer.Factory<>(timeProvider);
}


/**
* Serializer for user state value with TTL. Visibility is public for usage with external tools.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.flink.runtime.state.ttl;

import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RestoredStateTransformer;

import javax.annotation.Nullable;

public class TtlStateRestoreTransformer<T> implements RestoredStateTransformer<TtlValue<T>> {
private final TtlTimeProvider ttlTimeProvider;
private long oldTtlInMilliseconds;

public TtlStateRestoreTransformer(TtlTimeProvider ttlTimeProvider, RegisteredKeyValueStateBackendMetaInfo<?, ?> oldMetaInfo) {
this.ttlTimeProvider = ttlTimeProvider;
this.oldTtlInMilliseconds = oldMetaInfo.getStateTtlTime().get().toMilliseconds();
}

@Nullable
@Override
public TtlValue<T> filterOrTransform(@Nullable TtlValue<T> value) {
if (value != null && TtlUtils.expired(value.getLastAccessTimestamp(), oldTtlInMilliseconds, ttlTimeProvider)) {
return null;
}
return value;
}


public static class Factory<TTLV> implements RestoredStateTransformerFactory<TtlValue<TTLV>> {
private final TtlTimeProvider ttlTimeProvider;

public Factory(TtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}

@Override
public RestoredStateTransformer<TtlValue<TTLV>> createRestoredStateTransformer(
RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo) {
return new TtlStateRestoreTransformer<>(ttlTimeProvider, restoredMetaInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ protected void takeAndRestoreSnapshot(int numberOfKeyGroupsAfterRestore) throws
restoreSnapshot(sbetc.takeSnapshot(), numberOfKeyGroupsAfterRestore);
}

protected void setTtlConfig(StateTtlConfig.UpdateType updateType,
StateTtlConfig.StateVisibility visibility,
long ttl) throws Exception {
this.ttlConfig = getConfBuilder(ttl)
.setUpdateType(updateType)
.setStateVisibility(visibility)
.disableCleanupInBackground()
.build();
}

private void restoreSnapshot(KeyedStateHandle snapshot, int numberOfKeyGroups)
throws Exception {
sbetc.createAndRestoreKeyedStateBackend(numberOfKeyGroups, snapshot);
Expand Down Expand Up @@ -296,6 +306,28 @@ public void testRelaxedExpirationOnRead() throws Exception {
assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
}

@Test
public void test() throws Exception {
initTest(
StateTtlConfig.UpdateType.OnCreateAndWrite,
StateTtlConfig.StateVisibility.NeverReturnExpired);
timeProvider.time = 0;
ctx().update(ctx().updateEmpty);

timeProvider.time += 50;
assertEquals(UNEXPIRED_AVAIL, ctx().getUpdateEmpty, ctx().get());

timeProvider.time += TTL * 2; // previous value has been expired.

setTtlConfig(
StateTtlConfig.UpdateType.OnCreateAndWrite,
StateTtlConfig.StateVisibility.NeverReturnExpired, TTL * 10);
takeAndRestoreSnapshot();

timeProvider.time += 1;
assertEquals(EXPIRED_AVAIL, ctx().emptyValue, ctx().get());
}

@Test
public void testExpirationTimestampOverflow() throws Exception {
initTest(
Expand Down

0 comments on commit 7a058fd

Please sign in to comment.