Skip to content

Commit

Permalink
[FLINK-6177] Add support for "Distributed Cache" in streaming applica…
Browse files Browse the repository at this point in the history
…tions

This closes apache#3741.
  • Loading branch information
Zohar Mizrahi authored and zentol committed May 3, 2017
1 parent 0dd85f0 commit 93b8d76
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.io.FileInputFormat;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
Expand Down Expand Up @@ -136,6 +138,8 @@ public abstract class StreamExecutionEnvironment {
/** The time characteristic used by the data streams. */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;

protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();


// --------------------------------------------------------------------------------------------
// Constructor and Properties
Expand All @@ -148,6 +152,13 @@ public ExecutionConfig getConfig() {
return config;
}

/**
* Get the list of cached files that were registered for distribution among the task managers
*/
public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCacheFile() {
return cacheFile;
}

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as map,
Expand Down Expand Up @@ -1774,4 +1785,41 @@ protected static void initializeContextEnvironment(StreamExecutionEnvironmentFac
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
}

/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name) {
registerCachedFile(filePath, name, false);
}

/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
* @param executable flag indicating whether the file should be executable
*/
public void registerCachedFile(String filePath, String name, boolean executable) {
this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.StringUtils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
Expand Down Expand Up @@ -151,6 +152,11 @@ private JobGraph createJobGraph() {

configureCheckpointing();

// add registered cache file into job configuration
for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCacheFile()) {
DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
}

// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def getConfig = javaEnv.getConfig

/**
* Gets cache files.
*/
def getCacheFile = javaEnv.getCacheFile

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
Expand Down Expand Up @@ -668,6 +673,49 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
f
}


/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a
* distributed file system. The runtime will copy the files temporarily to a local cache,
* if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
* via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
* provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
* "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
def registerCachedFile(filePath: String, name: String): Unit = {
javaEnv.registerCachedFile(filePath, name)
}


/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (as long as all relevant workers have access to it), or files in a
* distributed file system. The runtime will copy the files temporarily to a local cache,
* if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs
* via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
* provides access {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
* "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
* @param executable flag indicating whether the file should be executable
*/
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
javaEnv.registerCachedFile(filePath, name, executable)
}
}

object StreamExecutionEnvironment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,58 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.distributedCache;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
package org.apache.flink.test.distributedCache;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

import java.io.*;

import java.util.*;

/**
* Tests the distributed cache by comparing a text file with a distributed copy.
*/
public class DistributedCacheTest extends JavaProgramTestBase {

public class DistributedCacheTest extends StreamingMultipleProgramsTestBase {
public static final String data
= "machen\n"
+ "zeit\n"
+ "heerscharen\n"
+ "keiner\n"
+ "meine\n";

protected String textPath;

@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("count.txt", data);
@Test
public void testStreamingDistributedCache() throws Exception {
String textPath = createTempFile("count.txt", data);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(textPath, "cache_test");
env.readTextFile(textPath).flatMap(new WordChecker());
env.execute();
}

@Override
protected void testProgram() throws Exception {
@Test
public void testBatchDistributedCache() throws Exception {
String textPath = createTempFile("count.txt", data);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(textPath, "cache_test");

List<Tuple1<String>> result = env
.readTextFile(textPath)
.flatMap(new WordChecker())
.collect();

compareResultAsTuples(result, data);
env.readTextFile(textPath).flatMap(new WordChecker()).count();
}

public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
private static final long serialVersionUID = 1L;

private final Set<String> wordList = new HashSet<>();
private final List<String> wordList = new ArrayList<>();

@Override
public void open(Configuration conf) throws FileNotFoundException, IOException {
public void open(Configuration conf) throws IOException {
File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
BufferedReader reader = new BufferedReader(new FileReader(file));
String tempString;
Expand All @@ -83,9 +78,10 @@ public void open(Configuration conf) throws FileNotFoundException, IOException {

@Override
public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
if (wordList.contains(word)) {
out.collect(new Tuple1<>(word));
}
assertTrue("Unexpected word in stream! wordFromStream: " + word + ", shouldBeOneOf: " +
wordList.toString(), wordList.contains(word));

out.collect(new Tuple1<>(word));
}
}
}

0 comments on commit 93b8d76

Please sign in to comment.