diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 5ac5c4ea4fde93..c0ffb54e023865 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -182,14 +182,17 @@ public void setLocalJarPath(Path localJarPath) { this.flinkJarPath = localJarPath; } + /** + * Adds the given files to the list of files to ship. + * + *

Note that any file matching "flink-dist*.jar" will be excluded from the upload by + * {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)} + * since we upload the Flink uber jar ourselves and do not need to deploy it multiple times. + * + * @param shipFiles files to ship + */ public void addShipFiles(List shipFiles) { - for (File shipFile: shipFiles) { - // remove uberjar from ship list (by default everything in the lib/ folder is added to - // the list of files to ship, but we handle the uberjar separately. - if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { - this.shipFiles.add(shipFile); - } - } + this.shipFiles.addAll(shipFiles); } public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { @@ -1048,6 +1051,27 @@ private static Path setupSingleLocalResource( return resource.f0; } + /** + * Recursively uploads (and registers) any (user and system) files in shipFiles except + * for files matching "flink-dist*.jar" which should be uploaded separately. + * + * @param shipFiles + * files to upload + * @param fs + * file system to upload to + * @param targetHomeDir + * remote home directory to upload to + * @param appId + * application ID + * @param remotePaths + * paths of the remote resources (uploaded resources will be added) + * @param localResources + * map of resources (uploaded resources will be added) + * @param envShipFileList + * list of shipped files in a format understood by {@link Utils#createTaskExecutorContext} + * + * @return list of class paths with the the proper resource keys from the registration + */ static List uploadAndRegisterFiles( Collection shipFiles, FileSystem fs, @@ -1068,40 +1092,48 @@ static List uploadAndRegisterFiles( @Override public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { - java.nio.file.Path relativePath = parentPath.relativize(file); - - String key = relativePath.toString(); - try { - Path remotePath = setupSingleLocalResource( - key, - fs, - appId, - new Path(file.toUri()), - localResources, - targetHomeDir, - relativePath.getParent().toString()); - remotePaths.add(remotePath); - envShipFileList.append(key).append("=").append(remotePath).append(","); - - // add files to the classpath - classPaths.add(key); - - return FileVisitResult.CONTINUE; - } catch (URISyntaxException e) { - throw new IOException(e); + + if (!(file.getFileName().startsWith("flink-dist") && + file.getFileName().endsWith("jar"))) { + + java.nio.file.Path relativePath = parentPath.relativize(file); + + String key = relativePath.toString(); + try { + Path remotePath = setupSingleLocalResource( + key, + fs, + appId, + new Path(file.toUri()), + localResources, + targetHomeDir, + relativePath.getParent().toString()); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=") + .append(remotePath).append(","); + + // add files to the classpath + classPaths.add(key); + } catch (URISyntaxException e) { + throw new IOException(e); + } } + + return FileVisitResult.CONTINUE; } }); } else { - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - String key = shipFile.getName(); - Path remotePath = setupSingleLocalResource( - key, fs, appId, shipLocalPath, localResources, targetHomeDir, ""); - remotePaths.add(remotePath); - envShipFileList.append(key).append("=").append(remotePath).append(","); - - // add files to the classpath - classPaths.add(key); + if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { + Path shipLocalPath = new Path(shipFile.toURI()); + String key = shipFile.getName(); + Path remotePath = setupSingleLocalResource( + key, fs, appId, shipLocalPath, localResources, targetHomeDir, ""); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + + // add files to the classpath + classPaths.add(key); + } } }