Skip to content

Commit

Permalink
fix fabric8io#4910: using tar for most uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Mar 22, 2023
1 parent 300d21f commit ba8e635
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.fabric8.kubernetes.client.dsl.TailPrettyLoggable;
import io.fabric8.kubernetes.client.dsl.TimeTailPrettyLoggable;
import io.fabric8.kubernetes.client.dsl.TtyExecErrorChannelable;
import io.fabric8.kubernetes.client.dsl.TtyExecErrorable;
import io.fabric8.kubernetes.client.dsl.TtyExecOutputErrorable;
import io.fabric8.kubernetes.client.dsl.TtyExecable;
import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener;
Expand Down Expand Up @@ -562,13 +561,13 @@ public PodOperationsImpl redirectingInput(Integer bufferSize) {
}

@Override
public TtyExecErrorable writingOutput(OutputStream out) {
public PodOperationsImpl writingOutput(OutputStream out) {
checkForPiped(out);
return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(out)).build(), context);
}

@Override
public TtyExecErrorable redirectingOutput() {
public PodOperationsImpl redirectingOutput() {
return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext()).build(), context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -32,7 +31,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote;

Expand All @@ -47,20 +45,30 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) {
return uploadFile(operation, pathToUpload);
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar));
} else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) {
return uploadDirectory(operation, pathToUpload);
return uploadTar(operation, operation.getContext().getDir(), tar -> {
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file.getName(), file, tar);
}
});
}
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private interface UploadProcessor {
private static String getDirectoryFromFile(PodOperationsImpl operation) {
return Optional.ofNullable(new File(operation.getContext().getFile()).getParent()).orElse("/");
}

private interface UploadProcessor<T extends OutputStream> {

void process(OutputStream out) throws IOException;
void process(T out) throws IOException;

}

private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor<OutputStream> processor)
throws IOException {
operation = operation.redirectingInput().terminateOnError();
CompletableFuture<Integer> exitFuture;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {
Expand All @@ -85,43 +93,35 @@ public static boolean uploadFileData(PodOperationsImpl operation, InputStream in
return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write));
}

private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload)
throws IOException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFileData(operation, fis);
}
}

private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload)
private static boolean uploadTar(PodOperationsImpl operation, String directory,
UploadProcessor<TarArchiveOutputStream> processor)
throws IOException {

final String command = String.format(
"mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir()));
final String command = uploadTarCommand(directory);

return upload(operation, command, os -> {
try (final GZIPOutputStream gzip = new GZIPOutputStream(os);
final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) {
try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) {
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file, tar);
}
tar.flush();
processor.process(tar);
}
});
}

private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar)
throws IOException {
static String uploadTarCommand(String directory) {
return String.format("mkdir -p %1$s && tar -C %1$s -xmf -", shellQuote(directory));
}

final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName();
private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar)
throws IOException {
tar.putArchiveEntry(new TarArchiveEntry(file, fileName));
if (file.isFile()) {
Files.copy(file.toPath(), tar);
tar.closeArchiveEntry();
} else if (file.isDirectory()) {
tar.closeArchiveEntry();
String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName;
for (File fileInDirectory : file.listFiles()) {
addFileToTar(fileName, fileInDirectory, tar);
addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ void uploadInvalidParametersShouldThrowException(@TempDir Path pathToUpload) {
void upload_withFile_shouldUploadFile() throws IOException, InterruptedException {
final Path toUpload = new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile())
.toPath();
uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload));
uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload), false);
}

@Test
void uploadFileData_whenByteArrayInputStreamProvided_shouldUploadFile() throws IOException, InterruptedException {
InputStream inputStream = new ByteArrayInputStream("test data".getBytes());
uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream));
uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream), true);
}

@Test
Expand Down Expand Up @@ -183,7 +183,8 @@ void createExecCommandForUpload_withMultipleSingleQuotesInPath() {
assertThat(result).isEqualTo("mkdir -p '/tmp/f\'\\'\'o\'\\'\'o' && cat - > '/tmp/f\'\\'\'o\'\\'\'o/c\'\\'\'p.log'");
}

void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws IOException, InterruptedException {
void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest, boolean stream)
throws IOException, InterruptedException {
operation = operation.file("/mock/dir/file");
WebSocket.Builder builder = mock(WebSocket.Builder.class, Mockito.RETURNS_SELF);
when(builder.buildAsync(any())).thenAnswer(newWebSocket -> {
Expand Down Expand Up @@ -212,9 +213,15 @@ void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
if (stream) {
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
} else {
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
}
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

Expand Down Expand Up @@ -249,7 +256,7 @@ private void uploadDirectoryAndVerify(PodUploadTester<Boolean> directoryUpload)
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true",
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}
Expand Down

0 comments on commit ba8e635

Please sign in to comment.