Skip to content

Commit

Permalink
remove futures from Java filesystem interface
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Feb 28, 2012
1 parent b238f56 commit 1c95927
Show file tree
Hide file tree
Showing 11 changed files with 443 additions and 395 deletions.
Expand Up @@ -16,6 +16,8 @@

package org.vertx.java.examples.upload;

import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.CompletionHandler;
import org.vertx.java.core.Future;
import org.vertx.java.core.Handler;
Expand Down Expand Up @@ -53,22 +55,22 @@ public void handle(HttpClientResponse response) {
// For a chunked upload you don't need to specify size, just do:
// req.setChunked(true);

FileSystem.instance.open(filename).handler(new CompletionHandler<AsyncFile>() {
public void handle(Future<AsyncFile> completion) {
final AsyncFile file = completion.result();
FileSystem.instance.open(filename, new AsyncResultHandler<AsyncFile>() {
public void handle(AsyncResult<AsyncFile> ar) {
final AsyncFile file = ar.result;
Pump pump = new Pump(file.getReadStream(), req);
pump.start();

file.getReadStream().endHandler(new SimpleHandler() {
public void handle() {

file.close().handler(new CompletionHandler<Void>() {
public void handle(Future<Void> completion) {
if (completion.succeeded()) {
file.close(new AsyncResultHandler<Void>() {
public void handle(AsyncResult<Void> ar) {
if (ar.exception == null) {
req.end();
System.out.println("Sent request");
} else {
completion.exception().printStackTrace(System.err);
ar.exception.printStackTrace(System.err);
}
}
});
Expand Down
Expand Up @@ -16,6 +16,8 @@

package org.vertx.java.examples.upload;

import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.CompletionHandler;
import org.vertx.java.core.Future;
import org.vertx.java.core.Handler;
Expand Down Expand Up @@ -43,21 +45,21 @@ public void handle(final HttpServerRequest req) {

final String filename = "upload/file-" + UUID.randomUUID().toString() + ".upload";

FileSystem.instance.open(filename).handler(new CompletionHandler<AsyncFile>() {
public void handle(Future<AsyncFile> deferred) {
final AsyncFile file = deferred.result();
FileSystem.instance.open(filename, new AsyncResultHandler<AsyncFile>() {
public void handle(AsyncResult<AsyncFile> ar) {
final AsyncFile file = ar.result;
final Pump pump = new Pump(req, file.getWriteStream());
final long start = System.currentTimeMillis();
req.endHandler(new SimpleHandler() {
public void handle() {
file.close().handler(new CompletionHandler<Void>() {
public void handle(Future<Void> deferred) {
if (deferred.succeeded()) {
file.close(new AsyncResultHandler<Void>() {
public void handle(AsyncResult<Void> ar) {
if (ar.exception == null) {
req.response.end();
long end = System.currentTimeMillis();
System.out.println("Uploaded " + pump.getBytesPumped() + " bytes to " + filename + " in " + (end - start) + " ms");
} else {
deferred.exception().printStackTrace(System.err);
ar.exception.printStackTrace(System.err);
}
}
});
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/vertx/java/core/AsyncResult.java
Expand Up @@ -21,16 +21,16 @@
*/
public class AsyncResult<T> {

final T result;
final Exception e;
public final T result;
public final Exception exception;

public AsyncResult(T result) {
this.result = result;
this.e = null;
this.exception = null;
}

public AsyncResult(Exception e) {
this.e = e;
public AsyncResult(Exception exception) {
this.exception = exception;
this.result = null;
}
}
23 changes: 23 additions & 0 deletions src/main/java/org/vertx/java/core/AsyncResultHandler.java
@@ -0,0 +1,23 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.vertx.java.core;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public interface AsyncResultHandler<T> extends Handler<AsyncResult<T>> {
}
30 changes: 30 additions & 0 deletions src/main/java/org/vertx/java/core/SimpleAsyncResultHandler.java
@@ -0,0 +1,30 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.vertx.java.core;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class SimpleAsyncResultHandler implements AsyncResultHandler<Void> {

@Override
public void handle(AsyncResult<Void> event) {
//To change body of implemented methods use File | Settings | File Templates.
}


}
49 changes: 33 additions & 16 deletions src/main/java/org/vertx/java/core/file/AsyncFile.java
Expand Up @@ -16,6 +16,8 @@

package org.vertx.java.core.file;

import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.BlockingAction;
import org.vertx.java.core.CompletionHandler;
import org.vertx.java.core.Context;
Expand Down Expand Up @@ -89,7 +91,7 @@ public class AsyncFile {
* is called on the Deferred instance returned by this method.
* @return a Deferred representing the as-yet unexecuted action.
*/
public Deferred<Void> closeDeferred() {
private Deferred<Void> closeDeferred() {
check();

closed = true;
Expand Down Expand Up @@ -122,21 +124,38 @@ public void run() {
return deferred;
}

private <T> void wrapHandler(Future<T> fut, final AsyncResultHandler<T> handler) {
fut.handler(new CompletionHandler<T>() {
public void handle(Future<T> event) {
if (event.succeeded()) {
handler.handle(new AsyncResult<T>(event.result()));
} else {
handler.handle(new AsyncResult<T>(event.exception()));
}
}
});
}


/**
* Close the file asynchronously.<p>
* This method must be called using the same event loop the file was opened from.
* @return a Future representing the future result of closing the file.
*/
public Future<Void> close() {
return closeDeferred().execute();
public void close() {
closeDeferred().execute();
}

public void close(AsyncResultHandler handler) {
wrapHandler(closeDeferred().execute(), handler);
}

/**
* The same as {@link #write} but the write does not start until the {@link Deferred#execute} method
* is called on the Deferred instance returned by this method.
* @return a Deferred representing the as-yet unexecuted action.
*/
public Deferred<Void> writeDeferred(Buffer buffer, int position) {
private Deferred<Void> writeDeferred(Buffer buffer, int position) {
check();
ByteBuffer bb = buffer.getChannelBuffer().toByteBuffer();
return doWrite(bb, position);
Expand All @@ -151,16 +170,16 @@ public Deferred<Void> writeDeferred(Buffer buffer, int position) {
* This method must be called using the same event loop the file was opened from.
* @return a Future representing the future result of the write.
*/
public Future<Void> write(Buffer buffer, int position) {
return writeDeferred(buffer, position).execute();
public void write(Buffer buffer, int position, AsyncResultHandler handler) {
wrapHandler(writeDeferred(buffer, position).execute(), handler);
}

/**
* The same as {@link #read} but the read does not start until the {@link Deferred#execute} method
* is called on the Deferred instance returned by this method.
* @return a Deferred representing the as-yet unexecuted action.
*/
public Deferred<Buffer> readDeferred(Buffer buffer, int offset, int position, int length) {
private Deferred<Buffer> readDeferred(Buffer buffer, int offset, int position, int length) {
check();
ByteBuffer bb = ByteBuffer.allocate(length);
return doRead(buffer, offset, bb, position);
Expand All @@ -176,8 +195,8 @@ public Deferred<Buffer> readDeferred(Buffer buffer, int offset, int position, in
* This method must be called using the same event loop the file was opened from.
* @return a Future representing the future result of the write.
*/
public Future<Buffer> read(Buffer buffer, int offset, int position, int length) {
return readDeferred(buffer, offset, position, length).execute();
public void read(Buffer buffer, int offset, int position, int length, AsyncResultHandler handler) {
wrapHandler(readDeferred(buffer, offset, position, length).execute(), handler);
}

/**
Expand Down Expand Up @@ -279,14 +298,12 @@ void doRead() {
if (!readInProgress) {
readInProgress = true;
Buffer buff = Buffer.create(BUFFER_SIZE);
Future<Buffer> deferred = read(buff, 0, pos, BUFFER_SIZE);

deferred.handler(new CompletionHandler<Buffer>() {
read(buff, 0, pos, BUFFER_SIZE, new AsyncResultHandler<Buffer>() {

public void handle(Future<Buffer> deferred) {
if (deferred.succeeded()) {
public void handle(AsyncResult<Buffer> ar) {
if (ar.exception == null) {
readInProgress = false;
Buffer buffer = deferred.result();
Buffer buffer = ar.result;
if (buffer.length() == 0) {
// Empty buffer represents end of file
handleEnd();
Expand All @@ -298,7 +315,7 @@ public void handle(Future<Buffer> deferred) {
}
}
} else {
handleException(deferred.exception());
handleException(ar.exception);
}
}
});
Expand Down

0 comments on commit 1c95927

Please sign in to comment.