Skip to content

Commit

Permalink
[FLINK-5507] [queryable state] Remove list variant of asQueryableState
Browse files Browse the repository at this point in the history
The queryable state "sink" using ListState stores all incoming data
forever and is never cleaned. Eventually, it will pile up too much
memory and is thus of limited use.

This closes apache#3129.
This closes apache#3120 (left over).
  • Loading branch information
Nico Kruber authored and joseprupi committed Feb 12, 2017
1 parent f501730 commit cdeaab5
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 155 deletions.
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -681,30 +680,6 @@ public QueryableStateStream<KEY, T> asQueryableState(
getKeyType().createSerializer(getExecutionConfig()));
}

/**
* Publishes the keyed stream as a queryable ListStance instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
public QueryableStateStream<KEY, T> asQueryableState(
String queryableStateName,
ListStateDescriptor<T> stateDescriptor) {

transform("Queryable state: " + queryableStateName,
getType(),
new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));

stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());

return new QueryableStateStream<>(
queryableStateName,
getType().createSerializer(getExecutionConfig()),
getKeyType().createSerializer(getExecutionConfig()));
}

/**
* Publishes the keyed stream as a queryable FoldingState instance.
*
Expand Down
Expand Up @@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala

import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
Expand Down Expand Up @@ -503,30 +503,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
getKeyType.createSerializer(executionConfig))
}

/**
* Publishes the keyed stream as a queryable ListState instance.
*
* @param queryableStateName Name under which to the publish the queryable state instance
* @param stateDescriptor State descriptor to create state instance from
* @return Queryable state instance
*/
@PublicEvolving
def asQueryableState(
queryableStateName: String,
stateDescriptor: ListStateDescriptor[T]) : QueryableStateStream[K, T] = {

transform(
s"Queryable state: $queryableStateName",
new QueryableAppendingStateOperator(queryableStateName, stateDescriptor))(dataType)

stateDescriptor.initializeSerializerUnlessSet(executionConfig)

new QueryableStateStream(
queryableStateName,
stateDescriptor.getSerializer,
getKeyType.createSerializer(executionConfig))
}

/**
* Publishes the keyed stream as a queryable FoldingState instance.
*
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
Expand Down Expand Up @@ -820,109 +819,6 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
}
}

/**
* Tests simple list state queryable state instance. Each source emits
* (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
* queried. The tests succeeds after each subtask index is queried with
* a list of size numElements and each emitted tuple is part of the list.
*/
@Test
public void testListState() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();

final int numElements = 128;

final QueryableStateClient client = new QueryableStateClient(cluster.configuration());

JobID jobId = null;
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_SLOTS);
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));

DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));

// List state
ListStateDescriptor<Tuple2<Integer, Long>> listState = new ListStateDescriptor<>(
"any",
source.getType());

QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
return value.f0;
}
}).asQueryableState("timon", listState);

// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobId = jobGraph.getJobID();

cluster.submitJobDetached(jobGraph);

// Now query
long expected = numElements + 1; // +1 for 0-value

FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
for (int key = 0; key < NUM_SLOTS; key++) {
final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
key,
queryableState.getKeySerializer(),
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);

boolean success = false;
while (deadline.hasTimeLeft() && !success) {
Future<byte[]> future = getKvStateWithRetries(client,
jobId,
queryableState.getQueryableStateName(),
key,
serializedKey,
retryDelay);

byte[] serializedValue = Await.result(future, deadline.timeLeft());

List<Tuple2<Integer, Long>> list = KvStateRequestSerializer.deserializeList(
serializedValue,
queryableState.getValueSerializer());

if (list.size() == expected) {
for (int i = 0; i < expected; i++) {
Tuple2<Integer, Long> elem = list.get(i);
assertEquals("Key mismatch", key, elem.f0.intValue());
assertEquals("Value mismatch", i, elem.f1.longValue());
}

success = true;
} else {
// Retry
Thread.sleep(50);
}
}

assertTrue("Did not succeed query", success);
}
} finally {
// Free cluster resources
if (jobId != null) {
Future<CancellationSuccess> cancellation = cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));

Await.ready(cancellation, deadline.timeLeft());
}

client.shutDown();
}
}

/**
* Tests simple folding state queryable state instance. Each source emits
* (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
Expand Down

0 comments on commit cdeaab5

Please sign in to comment.