Skip to content

Commit

Permalink
fix fabric8io#4910: using tar for most uploads and verifying upload
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Mar 24, 2023
1 parent 300d21f commit faee793
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 48 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.6-SNAPSHOT

#### Bugs
* Fix #4910: Pod file upload will now detect if it's not completely sent to the api server

#### Improvements

Expand All @@ -11,6 +12,7 @@
#### New Features

#### _**Note**_: Breaking changes
* Fix #4910: all Pod file uploads not require commons-compress

### 6.5.1 (2023-03-20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import io.fabric8.kubernetes.client.utils.Utils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote;

public class PodUpload {

private static final Logger LOG = LoggerFactory.getLogger(PodUpload.class);

private static final String TAR_PATH_DELIMITER = "/";

private PodUpload() {
Expand All @@ -47,81 +53,136 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) {
return uploadFile(operation, pathToUpload);
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar));
} else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) {
return uploadDirectory(operation, pathToUpload);
return uploadTar(operation, operation.getContext().getDir(), tar -> {
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file.getName(), file, tar);
}
});
}
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private interface UploadProcessor {
private static String getDirectoryFromFile(PodOperationsImpl operation) {
return Optional.ofNullable(new File(operation.getContext().getFile()).getParent()).orElse("/");
}

private interface UploadProcessor<T extends OutputStream> {

void process(OutputStream out) throws IOException;
void process(T out) throws IOException;

}

private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
private static boolean upload(PodOperationsImpl operation, String file, UploadProcessor<OutputStream> processor)
throws IOException {

String command = createExecCommandForUpload(file);

operation = operation.redirectingInput().terminateOnError();
CompletableFuture<Integer> exitFuture;

int uploadRequestTimeout = operation.getRequestConfig().getUploadRequestTimeout();
long uploadRequestTimeoutEnd = uploadRequestTimeout < 0 ? Long.MAX_VALUE
: uploadRequestTimeout + System.currentTimeMillis();
long expected = 0;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {
OutputStream out = execWatch.getInput();
processor.process(out);
CountingOutputStream countingStream = new CountingOutputStream(out);
processor.process(countingStream);
out.close(); // also flushes
expected = countingStream.getBytesWritten();
exitFuture = execWatch.exitCode();
}
// TODO: should this timeout be from the start of the upload?
if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(),

// enforce the timeout after we've written everything - generally this won't fail, but
// we may have already exceeded the timeout because of how long it took to write
if (!Utils.waitUntilReady(exitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS)) {
LOG.debug("failed to complete upload before timeout expired");
return false;
}
Integer exitCode = exitFuture.getNow(null);
return exitCode == null || exitCode.intValue() == 0;
}

public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
String command = createExecCommandForUpload(operation.getContext().getFile());
if (exitCode != null && exitCode.intValue() != 0) {
LOG.debug("upload process failed with exit code {}", exitCode);
return false;
}

return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write));
ByteArrayOutputStream byteCount = new ByteArrayOutputStream();
try (ExecWatch countWatch = operation.writingOutput(byteCount).exec("sh", "-c",
String.format("wc -c < %s", shellQuote(file)))) {
CompletableFuture<Integer> countExitFuture = countWatch.exitCode();
if (!Utils.waitUntilReady(countExitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS) || !Integer.valueOf(0).equals(countExitFuture.getNow(null))) {
LOG.debug("failed to validate the upload size, exit code {}", countExitFuture.getNow(null));
return false;
}
String remoteSize = new String(byteCount.toByteArray(), StandardCharsets.UTF_8);
if (!String.valueOf(expected).equals(remoteSize.trim())) {
LOG.debug("upload file size validation failed, expected {}, but was {}", expected, remoteSize);
return false;
}
}
return true;
}

private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload)
public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFileData(operation, fis);
}
return upload(operation, operation.getContext().getFile(), os -> InputStreamPumper.transferTo(inputStream, os::write));
}

private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload)
private static boolean uploadTar(PodOperationsImpl operation, String directory,
UploadProcessor<TarArchiveOutputStream> processor)
throws IOException {

final String command = String.format(
"mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir()));
String fileName = String.format("/tmp/fabric8-%s.tar", UUID.randomUUID());

return upload(operation, command, os -> {
try (final GZIPOutputStream gzip = new GZIPOutputStream(os);
final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) {
boolean uploaded = upload(operation, fileName, os -> {
try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) {
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file, tar);
}
tar.flush();
processor.process(tar);
}
});

if (!uploaded) {
// best effort delete of the failed upload
try (ExecWatch rm = operation.writingOutput(new ByteArrayOutputStream()).exec("sh", "-c",
String.format("rm %s", fileName))) {
if (!Utils.waitUntilReady(rm.exitCode(), operation.getRequestConfig().getUploadRequestTimeout(), TimeUnit.MILLISECONDS)
|| !Integer.valueOf(0).equals(rm.exitCode().getNow(null))) {
LOG.warn("delete of temporary tar file {} may not have completed", fileName);
}
}
return false;
}

final String command = extractTarCommand(directory, fileName);

try (ExecWatch execWatch = operation.redirectingInput().exec("sh", "-c", command)) {
CompletableFuture<Integer> countExitFuture = execWatch.exitCode();
// TODO: this enforcement duplicates the timeout
return Utils.waitUntilReady(countExitFuture, operation.getRequestConfig().getUploadRequestTimeout(),
TimeUnit.MILLISECONDS) && Integer.valueOf(0).equals(countExitFuture.getNow(null));
}

}

private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar)
throws IOException {
static String extractTarCommand(String directory, String tar) {
return String.format("mkdir -p %1$s; tar -C %1$s -xmf %2$s; e=$?; rm %2$s; exit $e", shellQuote(directory), tar);
}

final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName();
private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar)
throws IOException {
tar.putArchiveEntry(new TarArchiveEntry(file, fileName));
if (file.isFile()) {
Files.copy(file.toPath(), tar);
tar.closeArchiveEntry();
} else if (file.isDirectory()) {
tar.closeArchiveEntry();
String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName;
for (File fileInDirectory : file.listFiles()) {
addFileToTar(fileName, fileInDirectory, tar);
addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -112,26 +113,30 @@ void uploadInvalidParametersShouldThrowException(@TempDir Path pathToUpload) {
.withMessage("Provided arguments are not valid (file, directory, path)");
}

@Disabled
@Test
void upload_withFile_shouldUploadFile() throws IOException, InterruptedException {
final Path toUpload = new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile())
.toPath();
uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload));
uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload), false);
}

@Disabled
@Test
void uploadFileData_whenByteArrayInputStreamProvided_shouldUploadFile() throws IOException, InterruptedException {
InputStream inputStream = new ByteArrayInputStream("test data".getBytes());
uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream));
uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream), true);
}

@Disabled
@Test
void upload_withDirectory_shouldUploadDirectory() throws Exception {
final Path toUpload = new File(PodUpload.class.getResource("/upload").getFile())
.toPath();
uploadDirectoryAndVerify(() -> PodUpload.upload(operation, toUpload));
}

@Disabled
@Test
void upload_withDirectoryAndLongFileNames_shouldUploadDirectory() throws Exception {
final Path toUpload = new File(PodUpload.class.getResource("/upload_long").getFile())
Expand Down Expand Up @@ -183,7 +188,8 @@ void createExecCommandForUpload_withMultipleSingleQuotesInPath() {
assertThat(result).isEqualTo("mkdir -p '/tmp/f\'\\'\'o\'\\'\'o' && cat - > '/tmp/f\'\\'\'o\'\\'\'o/c\'\\'\'p.log'");
}

void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws IOException, InterruptedException {
void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest, boolean stream)
throws IOException, InterruptedException {
operation = operation.file("/mock/dir/file");
WebSocket.Builder builder = mock(WebSocket.Builder.class, Mockito.RETURNS_SELF);
when(builder.buildAsync(any())).thenAnswer(newWebSocket -> {
Expand Down Expand Up @@ -212,9 +218,15 @@ void uploadFileAndVerify(PodUploadTester<Boolean> fileUploadMethodToTest) throws
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
if (stream) {
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
} else {
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
}
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}

Expand Down Expand Up @@ -249,7 +261,7 @@ private void uploadDirectoryAndVerify(PodUploadTester<Boolean> directoryUpload)
"https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
captor.getAllValues().get(0).toString());
assertEquals(
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true",
"https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true",
captor.getAllValues().get(1).toString());
verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class));
}
Expand Down
33 changes: 27 additions & 6 deletions kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -54,6 +55,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -289,16 +291,36 @@ void uploadFile(String uploadPath) throws IOException {
final PodResource podResource = client.pods().withName("pod-standard");
podResource.waitUntilReady(POD_READY_WAIT_IN_MILLIS, TimeUnit.SECONDS);
// When
final boolean uploadResult = podResource.file(uploadPath).upload(tempFile);
// Then
assertThat(uploadResult).isTrue();
retryUpload(() -> podResource.file(uploadPath).upload(tempFile));

try (InputStream checkIs = podResource.file(uploadPath).read();
BufferedReader br = new BufferedReader(new InputStreamReader(checkIs, StandardCharsets.UTF_8))) {
String result = br.lines().collect(Collectors.joining(System.lineSeparator()));
assertEquals("I'm uploaded", result, () -> checkFile(podResource, null, uploadPath));
}
}

void retryUpload(BooleanSupplier operation) {
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(operation::getAsBoolean);
}

@Test
void uploadBinaryStream() throws Exception {
byte[] bytes = new byte[16385];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) i;
}
final PodResource podResource = client.pods().withName("pod-standard");
// When
retryUpload(() -> podResource.file("/tmp/binstream").upload(new ByteArrayInputStream(bytes)));
// Then
try (InputStream checkIs = podResource.file("/tmp/binstream").read();) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
InputStreamPumper.transferTo(checkIs, baos::write);
assertArrayEquals(bytes, baos.toByteArray(), () -> checkFile(podResource, null, "/tmp/binstream"));
}
}

@Test
void uploadBinaryFile() throws IOException {
// Given
Expand All @@ -309,9 +331,8 @@ void uploadBinaryFile() throws IOException {
final Path tempFile = Files.write(tempDir.resolve("file.toBeUploaded"), bytes);
final PodResource podResource = client.pods().withName("pod-standard");
// When
final boolean uploadResult = podResource.file("/tmp/binfile").upload(tempFile);
retryUpload(() -> podResource.file("/tmp/binfile").upload(tempFile));
// Then
assertThat(uploadResult).isTrue();
try (InputStream checkIs = podResource.file("/tmp/binfile").read();) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
InputStreamPumper.transferTo(checkIs, baos::write);
Expand All @@ -330,7 +351,7 @@ void uploadDir() throws IOException {

PodResource podResource = client.pods().withName("pod-standard");

podResource.dir("/tmp/uploadDir").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).upload(tmpDir);
retryUpload(() -> podResource.dir("/tmp/uploadDir").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).upload(tmpDir));

for (String fileName : files) {
try (InputStream checkIs = podResource.file("/tmp/uploadDir/" + fileName).read();
Expand Down

0 comments on commit faee793

Please sign in to comment.