Skip to content
Permalink
Browse files

FilePart and MultipartFile provide transferTo(Path) variant

Also, ZeroCopyHttpOutputMessage provides writeWith(Path, int, int), enforcing that variant as the implementation target in 5.1 (analogous to FilePart).

Issue: SPR-16925
  • Loading branch information...
jhoeller committed Jun 13, 2018
1 parent c38cb43 commit 1e5f8cc2321d0f466cfb3133d169781c480ab3c7
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.http;

import java.io.File;
import java.nio.file.Path;

import reactor.core.publisher.Mono;

@@ -25,6 +26,7 @@
* file transfers.
*
* @author Arjen Poutsma
* @author Juergen Hoeller
* @since 5.0
* @see <a href="https://en.wikipedia.org/wiki/Zero-copy">Zero-copy</a>
*/
@@ -38,6 +40,19 @@
* @param count the number of bytes to be transferred
* @return a publisher that indicates completion or error.
*/
Mono<Void> writeWith(File file, long position, long count);
default Mono<Void> writeWith(File file, long position, long count) {
return writeWith(file.toPath(), position, count);
}

/**
* Use the given {@link Path} to write the body of the message to the underlying
* HTTP layer.
* @param file the file to transfer
* @param position the position within the file from which the transfer is to begin
* @param count the number of bytes to be transferred
* @return a publisher that indicates completion or error.
* @since 5.1
*/
Mono<Void> writeWith(Path file, long position, long count);

}
@@ -16,8 +16,8 @@

package org.springframework.http.client.reactive;

import java.io.File;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collection;

import io.netty.buffer.ByteBuf;
@@ -97,8 +97,8 @@ public URI getURI() {
}

@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.outbound.sendFile(file.toPath(), position, count).then());
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() -> this.outbound.sendFile(file, position, count).then());
}

@Override
@@ -17,6 +17,7 @@
package org.springframework.http.codec.multipart;

import java.io.File;
import java.nio.file.Path;

import reactor.core.publisher.Mono;

@@ -25,6 +26,7 @@
* a multipart request.
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 5.0
*/
public interface FilePart extends Part {
@@ -38,10 +40,26 @@
* Convenience method to copy the content of the file in this part to the
* given destination file. If the destination file already exists, it will
* be truncated first.
* <p>The default implementation delegates to {@link #transferTo(Path)}.
* @param dest the target file
* @return completion {@code Mono} with the result of the file transfer,
* possibly {@link IllegalStateException} if the part isn't a file
* @see #transferTo(Path)
*/
Mono<Void> transferTo(File dest);
default Mono<Void> transferTo(File dest) {
return transferTo(dest.toPath());
}

/**
* Convenience method to copy the content of the file in this part to the
* given destination file. If the destination file already exists, it will
* be truncated first.
* @param dest the target file
* @return completion {@code Mono} with the result of the file transfer,
* possibly {@link IllegalStateException} if the part isn't a file
* @since 5.1
* @see #transferTo(File)
*/
Mono<Void> transferTo(Path dest);

}
@@ -16,14 +16,14 @@

package org.springframework.http.codec.multipart;

import java.io.File;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
@@ -90,19 +90,14 @@ public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType


@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {

public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory));
}


@Override
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {

return Mono.error(new UnsupportedOperationException(
"Can't read a multipart request body into a single Part."));
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part"));
}


@@ -118,15 +113,14 @@ public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType

private final PartBodyStreamStorageFactory streamStorageFactory;


SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory bufferFactory,
PartBodyStreamStorageFactory streamStorageFactory) {

this.inputMessage = inputMessage;
this.bufferFactory = bufferFactory;
this.streamStorageFactory = streamStorageFactory;
}


@Override
public void accept(FluxSink<Part> emitter) {
HttpHeaders headers = this.inputMessage.getHeaders();
@@ -189,14 +183,12 @@ public void accept(FluxSink<Part> emitter) {

private final AtomicInteger terminated = new AtomicInteger(0);


FluxSinkAdapterListener(FluxSink<Part> sink, DataBufferFactory factory, MultipartContext context) {
this.sink = sink;
this.bufferFactory = factory;
this.context = context;
}


@Override
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) {
HttpHeaders httpHeaders = new HttpHeaders();
@@ -250,16 +242,14 @@ public void onNestedPartFinished() {

private final DataBufferFactory bufferFactory;


AbstractSynchronossPart(HttpHeaders headers, DataBufferFactory bufferFactory) {
Assert.notNull(headers, "HttpHeaders is required");
Assert.notNull(bufferFactory, "'bufferFactory' is required");
Assert.notNull(bufferFactory, "DataBufferFactory is required");
this.name = MultipartUtils.getFieldName(headers);
this.headers = headers;
this.bufferFactory = bufferFactory;
}


@Override
public String name() {
return this.name;
@@ -280,14 +270,12 @@ DataBufferFactory getBufferFactory() {

private final StreamStorage storage;


SynchronossPart(HttpHeaders headers, StreamStorage storage, DataBufferFactory factory) {
super(headers, factory);
Assert.notNull(storage, "'storage' is required");
Assert.notNull(storage, "StreamStorage is required");
this.storage = storage;
}


@Override
public Flux<DataBuffer> content() {
return DataBufferUtils.readInputStream(getStorage()::getInputStream, getBufferFactory(), 4096);
@@ -301,33 +289,28 @@ protected StreamStorage getStorage() {

private static class SynchronossFilePart extends SynchronossPart implements FilePart {

private static final OpenOption[] FILE_CHANNEL_OPTIONS = {
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE };

private static final OpenOption[] FILE_CHANNEL_OPTIONS =
{StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE};

private final String filename;


SynchronossFilePart(HttpHeaders headers, String filename, StreamStorage storage,
DataBufferFactory factory) {

SynchronossFilePart(HttpHeaders headers, String filename, StreamStorage storage, DataBufferFactory factory) {
super(headers, storage, factory);
this.filename = filename;
}


@Override
public String filename() {
return this.filename;
}

@Override
public Mono<Void> transferTo(File destination) {
public Mono<Void> transferTo(Path dest) {
ReadableByteChannel input = null;
FileChannel output = null;
try {
input = Channels.newChannel(getStorage().getInputStream());
output = FileChannel.open(destination.toPath(), FILE_CHANNEL_OPTIONS);
output = FileChannel.open(dest, FILE_CHANNEL_OPTIONS);
long size = (input instanceof FileChannel ? ((FileChannel) input).size() : Long.MAX_VALUE);
long totalWritten = 0;
while (totalWritten < size) {
@@ -366,13 +349,11 @@ public String filename() {

private final String content;


SynchronossFormFieldPart(HttpHeaders headers, DataBufferFactory bufferFactory, String content) {
super(headers, bufferFactory);
this.content = content;
}


@Override
public String value() {
return this.content;
@@ -16,7 +16,7 @@

package org.springframework.http.server.reactive;

import java.io.File;
import java.nio.file.Path;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -112,8 +112,8 @@ protected void applyCookies() {
}

@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.response.sendFile(file.toPath(), position, count).then());
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() -> this.response.sendFile(file, position, count).then());
}

private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
@@ -16,10 +16,10 @@

package org.springframework.http.server.reactive;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import io.undertow.server.HttpServerExchange;
@@ -106,10 +106,10 @@ protected void applyCookies() {
}

@Override
public Mono<Void> writeWith(File file, long position, long count) {
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() ->
Mono.defer(() -> {
try (FileChannel source = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
try (FileChannel source = FileChannel.open(file, StandardOpenOption.READ)) {
StreamSinkChannel destination = this.exchange.getResponseChannel();
Channels.transferBlocking(destination, source, position, count);
return Mono.empty();
@@ -19,10 +19,13 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;

import org.springframework.core.io.InputStreamSource;
import org.springframework.core.io.Resource;
import org.springframework.lang.Nullable;
import org.springframework.util.FileCopyUtils;

/**
* A representation of an uploaded file received in a multipart request.
@@ -127,4 +130,15 @@ default Resource getResource() {
*/
void transferTo(File dest) throws IOException, IllegalStateException;

/**
* Transfer the received file to the given destination file.
* <p>The default implementation simply copies the file input stream.
* @since 5.1
* @see #getInputStream()
* @see #transferTo(File)
*/
default void transferTo(Path dest) throws IOException, IllegalStateException {
FileCopyUtils.copy(getInputStream(), Files.newOutputStream(dest));
}

}
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.web.multipart;

import java.io.IOException;
@@ -35,7 +36,7 @@


public MultipartFileResource(MultipartFile multipartFile) {
Assert.notNull(multipartFile, "MultipartFile must not be null.");
Assert.notNull(multipartFile, "MultipartFile must not be null");
this.multipartFile = multipartFile;
}

@@ -86,9 +87,8 @@ public String getDescription() {

@Override
public boolean equals(Object obj) {
return (obj == this ||
(obj instanceof MultipartFileResource &&
((MultipartFileResource) obj).multipartFile.equals(this.multipartFile)));
return (obj == this || (obj instanceof MultipartFileResource &&
((MultipartFileResource) obj).multipartFile.equals(this.multipartFile)));
}

@Override
Oops, something went wrong.

0 comments on commit 1e5f8cc

Please sign in to comment.
You can’t perform that action at this time.