Skip to content

Commit

Permalink
[FLINK-5480] [savepoints] Add setUidHash method to DataStream API
Browse files Browse the repository at this point in the history
This closes apache#3117.
  • Loading branch information
StefanRRichter authored and joseprupi committed Feb 12, 2017
1 parent 907c986 commit f68fbe8
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 30 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
Expand Down Expand Up @@ -85,6 +86,7 @@ public CassandraSink<IN> name(String name) {
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
@PublicEvolving
public CassandraSink<IN> uid(String uid) {
if (useDataStreamSink) {
getSinkTransformation().setUid(uid);
Expand All @@ -94,6 +96,36 @@ public CassandraSink<IN> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public CassandraSink<IN> setUidHash(String uidHash) {
if (useDataStreamSink) {
getSinkTransformation().setUidHash(uidHash);
} else {
getStreamTransformation().setUidHash(uidHash);
}
return this;
}

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
Expand Down
Expand Up @@ -130,7 +130,7 @@ private boolean generateNodeHash(
boolean isChainingEnabled) {

// Check for user-specified ID
String userSpecifiedHash = node.getTransformationId();
String userSpecifiedHash = node.getTransformationUID();

if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
Expand Down Expand Up @@ -192,7 +192,7 @@ private boolean generateNodeHash(
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));

return hasher.hash().asBytes();
}
Expand Down
Expand Up @@ -76,6 +76,32 @@ public DataStreamSink<T> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public DataStreamSink<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
Expand Down
Expand Up @@ -87,6 +87,32 @@ public SingleOutputStreamOperator<T> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public SingleOutputStreamOperator<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

/**
* Sets the parallelism for this operator. The degree must be 1 or more.
*
Expand Down
Expand Up @@ -17,18 +17,6 @@

package org.apache.flink.streaming.api.graph;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
Expand All @@ -41,6 +29,7 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -49,7 +38,6 @@
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
Expand All @@ -63,6 +51,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
Expand Down Expand Up @@ -472,10 +472,17 @@ public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
getStreamNode(vertexID).setInputFormat(inputFormat);
}

void setTransformationId(Integer nodeId, String transformationId) {
void setTransformationUID(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationUID(transformationId);
}
}

void setTransformationUserHash(Integer nodeId, String nodeHash) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationId(transformationId);
node.setUserHash(nodeHash);
}
}

Expand Down
Expand Up @@ -208,7 +208,10 @@ private Collection<Integer> transform(StreamTransformation<?> transform) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}

return transformedIds;
Expand Down
Expand Up @@ -148,7 +148,7 @@ private boolean generateNodeHash(
boolean isChainingEnabled) {

// Check for user-specified ID
String userSpecifiedHash = node.getTransformationId();
String userSpecifiedHash = node.getTransformationUID();

if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
Expand Down Expand Up @@ -210,7 +210,7 @@ private boolean generateNodeHash(
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));

return hasher.hash().asBytes();
}
Expand Down
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.graph;

import org.apache.flink.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
* StreamGraphHasher that works with user provided hashes. This is useful in case we want to set (alternative) hashes
* explicitly, e.g. to provide a way of manual backwards compatibility between versions when the mechanism of generating
* hashes has changed in an incompatible way.
*
*/
public class StreamGraphUserHashHasher implements StreamGraphHasher {

@Override
public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
HashMap<Integer, byte[]> hashResult = new HashMap<>();
for (StreamNode streamNode : streamGraph.getStreamNodes()) {

String userHash = streamNode.getUserHash();

if (null != userHash) {
for (StreamEdge inEdge : streamNode.getInEdges()) {
if (StreamingJobGraphGenerator.isChainable(inEdge, streamGraph)) {
throw new UnsupportedOperationException("Cannot assign user-specified hash "
+ "to intermediate node in chain. This will be supported in future "
+ "versions of Flink. As a work around start new chain at task "
+ streamNode.getOperatorName() + ".");
}
}

hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
}
}

return hashResult;
}
}
Expand Up @@ -69,7 +69,8 @@ public class StreamNode implements Serializable {

private InputFormat<?, ?> inputFormat;

private String transformationId;
private String transformationUID;
private String userHash;

public StreamNode(StreamExecutionEnvironment env,
Integer id,
Expand Down Expand Up @@ -272,12 +273,20 @@ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
this.stateKeySerializer = stateKeySerializer;
}

public String getTransformationId() {
return transformationId;
public String getTransformationUID() {
return transformationUID;
}

void setTransformationId(String transformationId) {
this.transformationId = transformationId;
void setTransformationUID(String transformationId) {
this.transformationUID = transformationId;
}

public String getUserHash() {
return userHash;
}

public void setUserHash(String userHash) {
this.userHash = userHash;
}

@Override
Expand Down
Expand Up @@ -51,8 +51,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -89,7 +89,7 @@ public class StreamingJobGraphGenerator {
public StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
}

private void init() {
Expand Down Expand Up @@ -185,7 +185,7 @@ private List<StreamEdge> createChain(
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge)) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
Expand Down Expand Up @@ -426,7 +426,7 @@ private void connect(Integer headOfChain, StreamEdge edge) {
}
}

private boolean isChainable(StreamEdge edge) {
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();

Expand Down

0 comments on commit f68fbe8

Please sign in to comment.