Skip to content

Commit

Permalink
Fix #368
Browse files Browse the repository at this point in the history
Introduce the @async annotation

Signed-off-by: Clement Escoffier <clement.escoffier@gmail.com>
  • Loading branch information
cescoffier committed Oct 30, 2014
1 parent 622506b commit 6de0e1b
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* #%L
* Wisdom-Framework
* %%
* Copyright (C) 2013 - 2014 Wisdom Framework
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package org.wisdom.akka.impl;

import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import com.google.common.collect.ImmutableList;
import org.apache.felix.ipojo.annotations.Component;
import org.apache.felix.ipojo.annotations.Instantiate;
import org.apache.felix.ipojo.annotations.Provides;
import org.apache.felix.ipojo.annotations.Requires;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wisdom.akka.AkkaSystemService;
import org.wisdom.api.annotations.scheduler.Async;
import org.wisdom.api.exceptions.HttpException;
import org.wisdom.api.http.AsyncResult;
import org.wisdom.api.http.Result;
import org.wisdom.api.interception.Interceptor;
import org.wisdom.api.interception.RequestContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.concurrent.Callable;

@Component
@Provides(specifications = Interceptor.class)
@Instantiate
public class AsyncInterceptor extends Interceptor<Async> {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncInterceptor.class);

@Requires
protected AkkaSystemService akka;

private static class ResultRetriever implements Callable<Result> {

private RequestContext context;
private volatile Thread currentThread;

public ResultRetriever(RequestContext context) {
this.context = context;
}

@Override
public Result call() throws Exception {
this.currentThread = Thread.currentThread();
return context.proceed();
}

public Thread getThread() {
return currentThread;
}
}

/**
* Wrap the action method as an asynchronous method. The result is computed asynchronously and returned to the
* client once computed. Optionally a timeout can be set to return an error if the result takes too much time to
* be computed.
*
* @param configuration the interception configuration
* @param context the interception context
* @return an async result wrapping the action method invocation.
*/
@Override
public Result call(final Async configuration, final RequestContext context) {
return new AsyncResult(new Callable<Result>() {
@Override
public Result call() throws Exception {
try {
if (configuration.timeout() > 0) {
final ResultRetriever resultRetriever = new ResultRetriever(context);
final Future<Result> resultFuture = Futures.future(resultRetriever, akka.fromThread());

Future<Result> timeoutFuture = Futures.future(new Callable<Result>() {
@Override
public Result call() throws Exception {
configuration.timeUnit().sleep(configuration.timeout());

This comment has been minimized.

Copy link
@magnet

magnet Nov 3, 2014

Contributor

I'm a bit late to the party, but did you check the other implementation of the AsyncInterceptor as well? This sleep() here is ugly, because it blocks a thread for each query. The other version uses a ScheduledExecutor, which is more efficient because it can trigger the timeout events using much fewer threads in the case of many concurrent requests.
I let the Akka version for information / tracking purposes, but there is no way, as it is now, to cancel a Future (or a promise) in stock Akka. There are ways to do it, but they're pretty painful from Java, and it's a bit overkill to implement it.

Since I didn't bother with making an Interruptible Scala Future, I went for the easy route using "firstCompletedOf" but it's probably resource inefficient. The other version addresses that problem.

throw new HttpException(Result.GATEWAY_TIMEOUT, "Request timeout");
}
}, akka.fromThread());

timeoutFuture.onSuccess(new OnSuccess<Result>() {
@Override
public final void onSuccess(Result r) {
if (resultFuture.isCompleted()) {
return;
}
Thread t = resultRetriever.getThread();
if (t != null) {
try {
t.interrupt();
} catch (SecurityException se) {
LOGGER.debug("Could not interrupt thread because of SecurityException", se);
throw se;
} catch (Throwable throwable) {
// We don't need this thread anymore
}

}
}
}, akka.fromThread());

Future<Result> firstCompleted = Futures.firstCompletedOf(
ImmutableList.of(timeoutFuture, resultFuture), akka.fromThread());
return Await.result(firstCompleted, Duration.Inf());
}
return context.proceed();
} catch (InterruptedException ie) {
throw new HttpException(Result.RESET_CONTENT, "Interrupted process", ie);
} catch (Exception t) {
throw new HttpException(Result.INTERNAL_SERVER_ERROR, "Computation error", t);
}
}
});
}

/**
* Gets the annotation class configuring the current interceptor.
*
* @return the annotation
*/
@Override
public Class<Async> annotation() {
return Async.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* #%L
* Wisdom-Framework
* %%
* Copyright (C) 2013 - 2014 Wisdom Framework
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package org.wisdom.akka.impl;

import akka.dispatch.OnComplete;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.wisdom.api.annotations.scheduler.Async;
import org.wisdom.api.annotations.scheduler.Every;
import org.wisdom.api.exceptions.HttpException;
import org.wisdom.api.http.*;
import org.wisdom.api.interception.RequestContext;
import org.wisdom.api.scheduler.Scheduled;
import scala.concurrent.Future;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;

/**
* Checks the Async Interceptor.
*/
public class AsyncInterceptorTest {

AkkaScheduler scheduler = new AkkaScheduler();
AsyncInterceptor interceptor = new AsyncInterceptor();
private AkkaBootstrap akka;

@Before
public void setUp() throws ClassNotFoundException {
BundleContext context = mock(BundleContext.class);
Bundle bundle = mock(Bundle.class);
when(bundle.getBundleId()).thenReturn(1L);
when(context.getBundle()).thenReturn(bundle);
doAnswer(
new Answer<Class>() {
@Override
public Class answer(InvocationOnMock invocation) throws Throwable {
return AsyncInterceptorTest.class.getClassLoader().loadClass((String) invocation.getArguments()[0]);
}
}
).when(bundle).loadClass(anyString());
akka = new AkkaBootstrap(context);
akka.start();
scheduler.akka = akka;
interceptor.akka = akka;
}

@After
public void tearDown() {
((AkkaBootstrap) scheduler.akka).stop();
}

@Test
public void testWithoutTimeout() throws Exception {
RequestContext rc = mock(RequestContext.class);
when(rc.proceed()).thenReturn(new Result(Status.OK));

Async async = mock(Async.class);
when(async.timeout()).thenReturn(0l);
when(async.timeUnit()).thenReturn(TimeUnit.SECONDS);

Result result = interceptor.call(async, rc);
assertThat(result).isInstanceOf(AsyncResult.class);

Future<Result> r = akka.dispatch(((AsyncResult) result).callable(), akka.fromThread());
final int[] code = {0};
r.onComplete(new OnComplete<Result>() {

@Override
public void onComplete(Throwable failure, Result success) throws Throwable {
code[0] = success.getStatusCode();
}
}, akka.fromThread());

Thread.sleep(100);
assertThat(code[0]).isEqualTo(200);
}

@Test
public void testWithTimeout() throws Exception {
RequestContext rc = mock(RequestContext.class);
when(rc.proceed()).thenReturn(new Result(Status.OK));

Async async = mock(Async.class);
when(async.timeout()).thenReturn(1l);
when(async.timeUnit()).thenReturn(TimeUnit.SECONDS);

Result result = interceptor.call(async, rc);
assertThat(result).isInstanceOf(AsyncResult.class);

Future<Result> r = akka.dispatch(((AsyncResult) result).callable(), akka.fromThread());
final int[] code = {0};
r.onComplete(new OnComplete<Result>() {

@Override
public void onComplete(Throwable failure, Result success) throws Throwable {
code[0] = success.getStatusCode();
}
}, akka.fromThread());

Thread.sleep(100);
assertThat(code[0]).isEqualTo(200);
}

@Test
public void testWithFunctionalError() throws Exception {
RequestContext rc = mock(RequestContext.class);
doAnswer(new Answer<Result>() {

@Override
public Result answer(InvocationOnMock invocation) throws Throwable {
throw new IllegalAccessException("Bad, but expected");
}
}).when(rc).proceed();

Async async = mock(Async.class);
when(async.timeout()).thenReturn(10l);
when(async.timeUnit()).thenReturn(TimeUnit.SECONDS);

Result result = interceptor.call(async, rc);
assertThat(result).isInstanceOf(AsyncResult.class);

Future<Result> r = akka.dispatch(((AsyncResult) result).callable(), akka.fromThread());
final Result[] retrieved = {null};
final Throwable[] errors = {null};
r.onComplete(new OnComplete<Result>() {

@Override
public void onComplete(Throwable failure, Result success) throws Throwable {
retrieved[0] = success;
errors[0] = failure;
}
}, akka.fromThread());

Thread.sleep(100);
assertThat(retrieved[0]).isNull();
assertThat(errors[0]).isNotNull().isInstanceOf(HttpException.class);
assertThat(errors[0].getCause().getMessage())
.contains("Bad, but expected");
}

@Test
public void testWithTimeoutReached() throws Exception {
RequestContext rc = mock(RequestContext.class);
doAnswer(new Answer<Result>() {

@Override
public Result answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(1000);
return Results.ok("Done");
}
}).when(rc).proceed();

Async async = mock(Async.class);
// Must be below the thread.sleep from the action.
when(async.timeout()).thenReturn(10l);
when(async.timeUnit()).thenReturn(TimeUnit.MILLISECONDS);

Result result = interceptor.call(async, rc);
assertThat(result).isInstanceOf(AsyncResult.class);

Future<Result> r = akka.dispatch(((AsyncResult) result).callable(), akka.fromThread());
final Result[] retrieved = {null};
final Throwable[] errors = {null};
r.onComplete(new OnComplete<Result>() {

@Override
public void onComplete(Throwable failure, Result success) throws Throwable {
retrieved[0] = success;
errors[0] = failure;
}
}, akka.fromThread());

Thread.sleep(100);
assertThat(retrieved[0]).isNull();
assertThat(errors[0]).isNotNull().isInstanceOf(HttpException.class);
assertThat(errors[0].getCause().getMessage())
.contains("Request timeout");
}


}
Loading

0 comments on commit 6de0e1b

Please sign in to comment.