Skip to content

Commit

Permalink
fix fabric8io#4910: speculative changes for upload
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Mar 13, 2023
1 parent ddfab72 commit 69be736
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.fabric8.kubernetes.client.dsl.TailPrettyLoggable;
import io.fabric8.kubernetes.client.dsl.TimeTailPrettyLoggable;
import io.fabric8.kubernetes.client.dsl.TtyExecErrorChannelable;
import io.fabric8.kubernetes.client.dsl.TtyExecErrorable;
import io.fabric8.kubernetes.client.dsl.TtyExecOutputErrorable;
import io.fabric8.kubernetes.client.dsl.TtyExecable;
import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener;
Expand Down Expand Up @@ -562,13 +561,13 @@ public PodOperationsImpl redirectingInput(Integer bufferSize) {
}

@Override
public TtyExecErrorable writingOutput(OutputStream out) {
public PodOperationsImpl writingOutput(OutputStream out) {
checkForPiped(out);
return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(out)).build(), context);
}

@Override
public TtyExecErrorable redirectingOutput() {
public PodOperationsImpl redirectingOutput() {
return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext()).build(), context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,32 @@
*/
package io.fabric8.kubernetes.client.dsl.internal.uploadable;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarConstants;
import org.apache.commons.compress.utils.CountingOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote;

public class PodUpload {

private static final String FABRIC8DONE = ".fabric8done";
private static final String TAR_PATH_DELIMITER = "/";

private PodUpload() {
Expand All @@ -47,90 +50,117 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) {
return uploadFile(operation, pathToUpload);
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar));
} else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) {
return uploadDirectory(operation, pathToUpload);
return uploadTar(operation, operation.getContext().getDir(), tar -> {
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file.getName(), file, tar);
}
});
}
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private interface UploadProcessor {
private static String getDirectoryFromFile(PodOperationsImpl operation) {
return Optional.ofNullable(new File(operation.getContext().getFile()).getParent()).orElse("/");
}

private interface UploadProcessor<T extends OutputStream> {

void process(OutputStream out) throws IOException;
void process(T out) throws IOException;

}

private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
operation = operation.redirectingInput().terminateOnError();
private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor<OutputStream> processor)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
operation = operation.redirectingInput().writingOutput(baos).terminateOnError();
CompletableFuture<Integer> exitFuture;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {
OutputStream out = execWatch.getInput();
processor.process(out);
out.close(); // also flushes
exitFuture = execWatch.exitCode();
ExecWatch execWatch = operation.exec("sh", "-c", command);
OutputStream out = execWatch.getInput();
processor.process(out);
out.close(); // also flushes
exitFuture = execWatch.exitCode();
while (baos.size() == 0) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
}
}
// TODO: should this timeout be from the start of the upload?
if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(),
TimeUnit.MILLISECONDS)) {
if (!baos.toString().trim().equals(FABRIC8DONE)) {
return false;
}

// TODO: should this timeout be from the start of the upload?
//if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(),
// TimeUnit.MILLISECONDS)) {
// return false;
//}
Integer exitCode = exitFuture.getNow(null);
return exitCode == null || exitCode.intValue() == 0;
}

public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
String command = createExecCommandForUpload(operation.getContext().getFile());

return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write));
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(new File(operation.getContext().getFile()).getName(), inputStream, tar));
}

private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload)
private static boolean uploadTar(PodOperationsImpl operation, String directory,
UploadProcessor<TarArchiveOutputStream> processor)
throws IOException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFileData(operation, fis);
}
}

private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

final String command = String.format(
"mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir()));
final String command = uploadTarCommand(directory);

return upload(operation, command, os -> {
try (final GZIPOutputStream gzip = new GZIPOutputStream(os);
final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) {
try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) {
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file, tar);
}
processor.process(tar);
tar.putArchiveEntry(new TarArchiveEntry(FABRIC8DONE));
tar.closeArchiveEntry();
tar.flush();
}
});
}

private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar)
static String uploadTarCommand(String directory) {
return String.format(
"{ while [[ ! -f %1$s/%2$s ]]; do sleep 1; done; rm %1$s/%2$s; echo %2$s; } & mkdir -p %1$s && tar -C %1$s -xf -",
shellQuote(directory), FABRIC8DONE);
}

private static void addFileToTar(String fileName, InputStream file, TarArchiveOutputStream tar)
throws IOException {
TarArchiveEntry archiveEntry = new TarArchiveEntry(fileName);
archiveEntry.setSize(TarConstants.MAXSIZE);
tar.putArchiveEntry(archiveEntry);
CountingOutputStream countingOutputStream = new CountingOutputStream(tar);
InputStreamPumper.transferTo(file, countingOutputStream::write);
archiveEntry.setSize(countingOutputStream.getBytesWritten());
try {
Field field = TarArchiveOutputStream.class.getDeclaredField("currSize");
field.setAccessible(true);
field.set(tar, countingOutputStream.getBytesWritten());
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
tar.closeArchiveEntry();
}

final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName();
private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar)
throws IOException {
tar.putArchiveEntry(new TarArchiveEntry(file, fileName));
if (file.isFile()) {
Files.copy(file.toPath(), tar);
tar.closeArchiveEntry();
} else if (file.isDirectory()) {
tar.closeArchiveEntry();
String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName;
for (File fileInDirectory : file.listFiles()) {
addFileToTar(fileName, fileInDirectory, tar);
addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar);
}
}
}

static String createExecCommandForUpload(String file) {
String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/'));
final String directory = directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath;
return String.format(
"mkdir -p %s && cat - > %s", shellQuote(directory), shellQuote(file));
}

}

0 comments on commit 69be736

Please sign in to comment.