Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when callback timeout, throw timeout exp as sync #359

Merged
merged 3 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package com.alipay.sofa.rpc.message.bolt;

import com.alipay.remoting.rpc.exception.InvokeTimeoutException;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.AsyncRuntime;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand Down Expand Up @@ -136,8 +138,14 @@ public void onException(Throwable e) {
EventBus.post(new ClientEndInvokeEvent(request, null, e));
}

SofaRpcException sofaRpcException = new SofaRpcException(
RpcErrorType.SERVER_UNDECLARED_ERROR, e.getMessage(), e);
//judge is timeout or others
SofaRpcException sofaRpcException = null;
if (e instanceof InvokeTimeoutException) {
sofaRpcException = new SofaTimeOutException(e);
} else {
sofaRpcException = new SofaRpcException(
RpcErrorType.SERVER_UNDECLARED_ERROR, e.getMessage(), e);
}
callback.onSofaException(sofaRpcException, request.getMethodName(), request);
} finally {
Thread.currentThread().setContextClassLoader(cl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.RequestBase;
import com.alipay.sofa.rpc.filter.Filter;
Expand All @@ -30,6 +31,7 @@
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -38,31 +40,33 @@
import java.util.concurrent.TimeUnit;

/**
*
*
* @author <a href="mailto:zhanggeng.zg@antfin.com">GengZhang</a>
*/
public class AsyncCallbackTest extends ActivelyDestroyTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallbackTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallbackTest.class);

private ServerConfig serverConfig;
private ProviderConfig<HelloService> CProvider;
private ConsumerConfig<HelloService> BConsumer;

@Test
public void testAll() {

ServerConfig serverConfig2 = new ServerConfig()
serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false);

// C服务的服务端
ProviderConfig<HelloService> CProvider = new ProviderConfig<HelloService>()
CProvider = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl(1000))
.setServer(serverConfig2);
.setServer(serverConfig);
CProvider.export();

// B调C的客户端
Filter filter = new TestAsyncFilter();
ConsumerConfig<HelloService> BConsumer = new ConsumerConfig<HelloService>()
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(50000)
Expand Down Expand Up @@ -110,4 +114,151 @@ public void onSofaException(SofaRpcException sofaException, String methodName,

RpcInvokeContext.removeContext();
}

@Test
public void testTimeoutException() {

serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false);

// C服务的服务端
CProvider = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl(500))
.setServer(serverConfig);
CProvider.export();

// B调C的客户端
Filter filter = new TestAsyncFilter();
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(1)
.setFilterRef(Arrays.asList(filter))
// .setOnReturn() // 不设置 调用级别设置
.setDirectUrl("bolt://127.0.0.1:22222");
HelloService helloService = BConsumer.refer();

final CountDownLatch latch = new CountDownLatch(1);
final String[] ret = { null };

final boolean[] hasExp = { false };
RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
LOGGER.info("B get result: {}", appResponse);
latch.countDown();
}

@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
LOGGER.info("B get app exception: {}", throwable);
latch.countDown();
}

@Override
public void onSofaException(SofaRpcException sofaException, String methodName,
RequestBase request) {
LOGGER.info("B get sofa exception: {}", sofaException);

if (sofaException instanceof SofaTimeOutException) {
hasExp[0] = true;
}

latch.countDown();
}
});

String ret0 = helloService.sayHello("xxx", 22);
Assert.assertNull(ret0); // 第一次返回null

try {
latch.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
// 一定是一个超时异常
Assert.assertTrue(hasExp[0]);

RpcInvokeContext.removeContext();
}

@Test
public void testNoProviderException() {
//use bolt, so callback will throw connection closed exception
serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false)
.setProtocol("rest");

serverConfig.buildIfAbsent().start();

// B调C的客户端
Filter filter = new TestAsyncFilter();
BConsumer = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
.setTimeout(1000)
.setFilterRef(Arrays.asList(filter))
// .setOnReturn() // 不设置 调用级别设置
.setDirectUrl("bolt://127.0.0.1:22222");
HelloService helloService = BConsumer.refer();

final CountDownLatch latch = new CountDownLatch(1);
final String[] ret = { null };

final boolean[] hasExp = { false };
RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
LOGGER.info("B get result: {}", appResponse);
latch.countDown();
}

@Override
public void onAppException(Throwable throwable, String methodName, RequestBase request) {
LOGGER.info("B get app exception: {}", throwable);
latch.countDown();
}

@Override
public void onSofaException(SofaRpcException sofaException, String methodName,
RequestBase request) {
LOGGER.info("B get sofa exception: {}", sofaException);

if ((sofaException instanceof SofaTimeOutException)) {
hasExp[0] = false;
} else {
hasExp[0] = true;
}

latch.countDown();
}
});

String ret0 = helloService.sayHello("xxx", 22);
Assert.assertNull(ret0); // 第一次返回null

try {
latch.await(1500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
// 一定是一个超时异常
Assert.assertTrue(hasExp[0]);

RpcInvokeContext.removeContext();
}

@After
public void after() {
if (CProvider != null) {
CProvider.unExport();
}
if (BConsumer != null) {
BConsumer.unRefer();
}
if (serverConfig != null) {
serverConfig.destroy();
}
}
}