Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async futures #953

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 112 additions & 52 deletions src/main/java/spark/http/matching/MatcherFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package spark.http.matching;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.servlet.AsyncContext;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -123,72 +126,129 @@ public void doFilter(ServletRequest servletRequest,
.withResponse(response)
.withHttpMethod(httpMethod);

Optional<CompletableFuture> future = Optional.empty();

try {
try {

BeforeFilters.execute(context);
Routes.execute(context);
AfterFilters.execute(context);
BeforeFilters.execute(context);
future = Routes.execute(context);

} catch (HaltException halt) {
} catch (HaltException halt) {

Halt.modify(httpResponse, body, halt);
Halt.modify(httpResponse, body, halt);

} catch (Exception generalException) {
} catch (Exception generalException) {

GeneralError.modify(
httpRequest,
httpResponse,
body,
requestWrapper,
responseWrapper,
generalException);
GeneralError.modify(
httpRequest,
httpResponse,
body,
requestWrapper,
responseWrapper,
generalException);
}

}
if(future.isPresent()){
AsyncContext ac = httpRequest.startAsync();
future.get().exceptionally(ex -> {
System.out.println("ex: "+ex);
handleRouteException(context, (Exception) ex);
processRouteResult(context, chain);
ac.complete();
return ex;
});
future.get().thenAccept(result -> {
processRouteResult(context, chain);
ac.complete();
});
} else {
processRouteResult(context, chain);
}
}

// If redirected and content is null set to empty string to not throw NotConsumedException
if (body.notSet() && responseWrapper.isRedirected()) {
body.set("");
}
private void handleRouteException(RouteContext context, Exception ex) {
if(ex instanceof HaltException){
Halt.modify(context.response().raw(), context.body(), (HaltException) ex);
} else {
GeneralError.modify(
context.httpRequest(),
context.response().raw(),
context.body(),
context.requestWrapper(),
context.responseWrapper(),
ex);
}
}

if (body.notSet() && hasOtherHandlers) {
if (servletRequest instanceof HttpRequestWrapper) {
((HttpRequestWrapper) servletRequest).notConsumed(true);
return;
}
}
private void processRouteResult(RouteContext context, FilterChain chain){
afterFilter(context);
checkBodyResult(context);
afterAfterFilter(context);
try {
bodySerialize(context, chain);
} catch (IOException | ServletException e) {
e.printStackTrace();
}
}

if (body.notSet()) {
LOG.info("The requested route [{}] has not been mapped in Spark for {}: [{}]",
uri, ACCEPT_TYPE_REQUEST_MIME_HEADER, acceptType);
httpResponse.setStatus(HttpServletResponse.SC_NOT_FOUND);

if (CustomErrorPages.existsFor(404)) {
requestWrapper.setDelegate(RequestResponseFactory.create(httpRequest));
responseWrapper.setDelegate(RequestResponseFactory.create(httpResponse));
body.set(CustomErrorPages.getFor(404, requestWrapper, responseWrapper));
} else {
body.set(String.format(CustomErrorPages.NOT_FOUND));
}
private void afterFilter(RouteContext context){
try{
AfterFilters.execute(context);

} catch (Exception ex) {
handleRouteException(context, ex);

}
}

private void bodySerialize(RouteContext context, FilterChain chain) throws IOException, ServletException {
if (context.body().isSet()) {
context.body().serializeTo(context.response().raw(), serializerChain, context.httpRequest());
} else if (chain != null) {
chain.doFilter(context.httpRequest(), context.response().raw());
}
}

private void checkBodyResult(RouteContext context) {

// If redirected and content is null set to empty string to not throw NotConsumedException
if (context.body().notSet() && context.responseWrapper().isRedirected()) {
context.body().set("");
}

if (context.body().notSet() && hasOtherHandlers) {
if (context.httpRequest() instanceof HttpRequestWrapper) {
((HttpRequestWrapper) context.httpRequest()).notConsumed(true);
return;
}
} finally {
try {
AfterAfterFilters.execute(context);
} catch (Exception generalException) {
GeneralError.modify(
httpRequest,
httpResponse,
body,
requestWrapper,
responseWrapper,
generalException);
}

if (context.body().notSet()) {
LOG.info("The requested route [{}] has not been mapped in Spark for {}: [{}]",
context.uri(), ACCEPT_TYPE_REQUEST_MIME_HEADER, context.acceptType());
context.response().raw().setStatus(HttpServletResponse.SC_NOT_FOUND);

if (CustomErrorPages.existsFor(404)) {
context.requestWrapper().setDelegate(RequestResponseFactory.create(context.httpRequest()));
context.responseWrapper().setDelegate(RequestResponseFactory.create(context.response().raw()));
context.body().set(CustomErrorPages.getFor(404, context.requestWrapper(), context.responseWrapper()));
} else {
context.body().set(String.format(CustomErrorPages.NOT_FOUND));
}
}
}

if (body.isSet()) {
body.serializeTo(httpResponse, serializerChain, httpRequest);
} else if (chain != null) {
chain.doFilter(httpRequest, httpResponse);
private void afterAfterFilter(RouteContext context) {
try {
AfterAfterFilters.execute(context);
} catch (Exception generalException) {
GeneralError.modify(
context.httpRequest(),
context.response().raw(),
context.body(),
context.requestWrapper(),
context.responseWrapper(),
generalException);
}
}

Expand Down
80 changes: 51 additions & 29 deletions src/main/java/spark/http/matching/Routes.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,68 @@
import spark.route.HttpMethod;
import spark.routematch.RouteMatch;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Created by Per Wendel on 2016-01-28.
*/
final class Routes {

static void execute(RouteContext context) throws Exception {

Object content = context.body().get();
static Optional<CompletableFuture> execute(RouteContext context) throws Exception {

RouteMatch match = context.routeMatcher().find(context.httpMethod(), context.uri(), context.acceptType());

Object target = null;
if (match != null) {
target = match.getTarget();
if(match != null){
return handleRouteMatch(context, match);
} else if (context.httpMethod() == HttpMethod.head && context.body().notSet()) {
// See if get is mapped to provide default head mapping
content =
context.routeMatcher().find(HttpMethod.get, context.uri(), context.acceptType())
!= null ? "" : null;
context.body().set(defaultHeadMapping(context));
}
return Optional.empty();
}

if (target != null) {
Object result = null;

if (target instanceof RouteImpl) {
RouteImpl route = ((RouteImpl) target);

if (context.requestWrapper().getDelegate() == null) {
Request request = RequestResponseFactory.create(match, context.httpRequest());
context.requestWrapper().setDelegate(request);
} else {
context.requestWrapper().changeMatch(match);
}
private static Optional<CompletableFuture> handleRouteMatch(RouteContext context, RouteMatch match) throws Exception {
Object target = match.getTarget();

if (target instanceof RouteImpl) {
RouteImpl route = ((RouteImpl) target);
handleDelegate(context, match);

Object element = route.handle(context.requestWrapper(), context.responseWrapper());
if(element instanceof CompletableFuture){
CompletableFuture future = (CompletableFuture)element;
return Optional.of(future.thenAccept(futureElement -> {
System.out.println("future el: "+futureElement);
try {
setBodyFromResult(context, route, futureElement);
} catch (Exception e) {
future.completeExceptionally(e);
}
}));
} else {
setBodyFromResult(context, route, element);
}
}
return Optional.empty();
}

context.responseWrapper().setDelegate(context.response());
private static void handleDelegate(RouteContext context, RouteMatch match) {
if (context.requestWrapper().getDelegate() == null) {
Request request = RequestResponseFactory.create(match, context.httpRequest());
context.requestWrapper().setDelegate(request);
} else {
context.requestWrapper().changeMatch(match);
}

Object element = route.handle(context.requestWrapper(), context.responseWrapper());
if (!context.responseWrapper().isRedirected()) {
result = route.render(element);
}
}
context.responseWrapper().setDelegate(context.response());
}

private static void setBodyFromResult(RouteContext context, RouteImpl route, Object element) throws Exception {
if (!context.responseWrapper().isRedirected() && element!=null) {
Object result = route.render(element);
if (result != null) {
content = result;
Object content = result;

if (content instanceof String) {
String contentStr = (String) content;
Expand All @@ -74,10 +92,14 @@ static void execute(RouteContext context) throws Exception {
context.responseWrapper().body(contentStr);
}
}
context.body().set(content);
}
}
}

context.body().set(content);
private static Object defaultHeadMapping(RouteContext context) {
return context.routeMatcher().find(HttpMethod.get, context.uri(), context.acceptType())
!= null ? "" : null;
}

}
64 changes: 64 additions & 0 deletions src/test/java/spark/servlet/MyApp.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package spark.servlet;

import spark.HaltException;

import java.io.File;
import java.io.FileWriter;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import static spark.Spark.after;
import static spark.Spark.before;
Expand Down Expand Up @@ -40,6 +52,58 @@ public synchronized void init() {
return "Hello World!";
});

get("/async", (request, response) -> {
Path path = Paths.get(ClassLoader.getSystemResource("public/page.html").toURI());
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
CompletableFuture future = new CompletableFuture<>();
fileChannel.read(buffer, 0, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
try {
// just so the future doesn't return too quickly
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.complete("Hello Async!");
}

@Override
public void failed(Throwable exc, Object attachment) {
future.completeExceptionally(exc);
}
});
return future;
});

get("/async-exception", (request, response) -> {
Path path = Paths.get(ClassLoader.getSystemResource("public/page.html").toURI());
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
CompletableFuture future = new CompletableFuture<>();
fileChannel.read(buffer, 0, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
try {
// just so the future doesn't return too quickly
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.completeExceptionally(new Exception("Async Exception!"));
}

@Override
public void failed(Throwable exc, Object attachment) {
future.completeExceptionally(exc);
}
});
return future;
});

get("/:param", (request, response) -> {
return "echo: " + request.params(":param");
});
Expand Down
Loading