Skip to content

Commit

Permalink
refactor:优化服务调用监控上报逻辑,减少上报数据量 (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Mar 1, 2024
1 parent 4aff5b5 commit 7f6341a
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 86 deletions.
1 change: 1 addition & 0 deletions dubbo/dubbo-examples/dubbo-router-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-router-example</artifactId>

<packaging>pom</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
if (serviceInfos.isEmpty() || Objects.isNull(operator)) {
return invoker.invoke(invocation);
}
// 如果是熔断,只处理第一个的请求
// 如果是熔断,只选择第一个 DubboServiceInfo 进行作为熔断信息
DubboServiceInfo firstService = serviceInfos.get(0);

InvokeContext.RequestContext context = new InvokeContext.RequestContext(createCalleeService(firstService), firstService.getDubboInterface());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.tencent.polaris.configuration.api.core.ConfigFile;
import com.tencent.polaris.configuration.api.core.ConfigFilePublishService;
import com.tencent.polaris.configuration.api.core.ConfigFileService;
import com.tencent.polaris.configuration.api.rpc.ConfigPublishRequest;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigChangedEvent;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
Expand Down Expand Up @@ -99,27 +100,49 @@ public Object getInternalProperty(String key) {
@Override
public boolean publishConfig(String key, String group, String content) throws UnsupportedOperationException {
try {
ConfigFileResponse response = filePublisher.createConfigFile(polarisConfig.getNamespace(), group, key, content);
if (response.getCode() == ServerCodes.EXISTED_RESOURCE) {
response = filePublisher.updateConfigFile(polarisConfig.getNamespace(), group, key, content);
}
ConfigPublishRequest request = new ConfigPublishRequest();
request.setNamespace(polarisConfig.getNamespace());
request.setGroup(group);
request.setFilename(key);
request.setContent(content);
ConfigFileResponse response = filePublisher.upsertAndPublish(request);
if (response.getCode() != ServerCodes.EXECUTE_SUCCESS) {
logger.error(
formatCode(response.getCode()),
response.getMessage(),
String.format("key(%s) group(%s)", key, group),
"upsert config fail"
"release config fail"
);
return false;
}
response = filePublisher.releaseConfigFile(polarisConfig.getNamespace(), group, key);
return true;
} catch (PolarisException e) {
logger.error(
formatCode(e.getCode()),
e.getMessage(),
String.format("key(%s) group(%s)", key, group),
"publish config fail");
return false;
}
}

@Override
public boolean publishConfigCas(String key, String group, String content, Object ticket) throws UnsupportedOperationException {
try {
ConfigPublishRequest request = new ConfigPublishRequest();
request.setNamespace(polarisConfig.getNamespace());
request.setGroup(group);
request.setFilename(key);
request.setContent(content);
request.setCasMd5(String.valueOf(ticket));
ConfigFileResponse response = filePublisher.upsertAndPublish(request);
if (response.getCode() != ServerCodes.EXECUTE_SUCCESS) {
logger.error(
formatCode(response.getCode()),
response.getMessage(),
response.getMessage(),
String.format("key(%s) group(%s)", key, group),
"release config fail"
);
);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,80 +44,79 @@
@Activate(group = CommonConstants.CONSUMER, order = Integer.MIN_VALUE)
public class ReportFilter extends PolarisOperatorDelegate implements Filter, Filter.Listener {

private static final String LABEL_START_TIME = "reporter_filter_start_time";
private static final String LABEL_START_TIME = "reporter_filter_start_time";

private static final Logger LOGGER = LoggerFactory.getLogger(ReportFilter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ReportFilter.class);

private ApplicationModel applicationModel;
private ApplicationModel applicationModel;

public ReportFilter() {
LOGGER.info("[POLARIS] init polaris reporter");
}
public ReportFilter() {
LOGGER.info("[POLARIS] init polaris reporter");
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.put(LABEL_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.put(LABEL_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
PolarisOperator polarisOperator = getGovernancePolarisOperator();
if (null == polarisOperator) {
return;
}
Long startTimeMilli = (Long) invocation.get(LABEL_START_TIME);
RetStatus retStatus = RetStatus.RetSuccess;
int code = 0;
if (appResponse.hasException()) {
retStatus = RetStatus.RetFail;
code = -1;
}
URL url = invoker.getUrl();
long delay = System.currentTimeMillis() - startTimeMilli;
List<DubboServiceInfo> serviceInfos = DubboUtils.analyzeRemoteDubboServiceInfo(invoker, invocation);
for (DubboServiceInfo serviceInfo : serviceInfos) {
polarisOperator.reportInvokeResult(serviceInfo.getService(), serviceInfo.getReportMethodName(), url.getHost(),
url.getPort(), RpcContext.getServiceContext().getLocalHost(), delay, retStatus, code);
}
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
PolarisOperator polarisOperator = getGovernancePolarisOperator();
if (null == polarisOperator) {
return;
}
Long startTimeMilli = (Long) invocation.get(LABEL_START_TIME);
RetStatus retStatus = RetStatus.RetSuccess;
int code = 0;
if (appResponse.hasException()) {
retStatus = RetStatus.RetFail;
code = -1;
}
URL providerUrl = invoker.getUrl();
// 判断是否是应用级注册
long delay = System.currentTimeMillis() - startTimeMilli;
List<DubboServiceInfo> serviceInfos = DubboUtils.analyzeRemoteDubboServiceInfo(invoker, invocation);
DubboServiceInfo dubboServiceInfo = serviceInfos.get(0);
polarisOperator.reportInvokeResult(dubboServiceInfo.getService(), dubboServiceInfo.getReportMethodName(), providerUrl.getHost(),
providerUrl.getPort(), RpcContext.getServiceContext().getLocalHost(), delay, retStatus, code);
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
PolarisOperator polarisOperator = getGovernancePolarisOperator();
if (null == polarisOperator) {
return;
}
Long startTimeMilli = (Long) invocation.get(LABEL_START_TIME);
RetStatus retStatus = RetStatus.RetFail;
int code = -1;
if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
code = rpcException.getCode();
if (isFlowControl(rpcException)) {
retStatus = RetStatus.RetFlowControl;
}
if (rpcException.isTimeout()) {
retStatus = RetStatus.RetTimeout;
}
if (rpcException.getCause() instanceof CallAbortedException) {
retStatus = RetStatus.RetReject;
}
}
URL url = invoker.getUrl();
long delay = System.currentTimeMillis() - startTimeMilli;
List<DubboServiceInfo> serviceInfos = DubboUtils.analyzeRemoteDubboServiceInfo(invoker, invocation);
for (DubboServiceInfo serviceInfo : serviceInfos) {
polarisOperator.reportInvokeResult(serviceInfo.getService(), serviceInfo.getReportMethodName(), url.getHost(),
url.getPort(), RpcContext.getServiceContext().getLocalHost(), delay, retStatus, code);
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
PolarisOperator polarisOperator = getGovernancePolarisOperator();
if (null == polarisOperator) {
return;
}
Long startTimeMilli = (Long) invocation.get(LABEL_START_TIME);
RetStatus retStatus = RetStatus.RetFail;
int code = -1;
if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
code = rpcException.getCode();
if (isFlowControl(rpcException)) {
retStatus = RetStatus.RetFlowControl;
}
if (rpcException.isTimeout()) {
retStatus = RetStatus.RetTimeout;
}
if (rpcException.getCause() instanceof CallAbortedException) {
retStatus = RetStatus.RetReject;
}
}
URL url = invoker.getUrl();
long delay = System.currentTimeMillis() - startTimeMilli;
List<DubboServiceInfo> serviceInfos = DubboUtils.analyzeRemoteDubboServiceInfo(invoker, invocation);
DubboServiceInfo dubboServiceInfo = serviceInfos.get(0);
polarisOperator.reportInvokeResult(dubboServiceInfo.getService(), dubboServiceInfo.getReportMethodName(), url.getHost(),
url.getPort(), RpcContext.getServiceContext().getLocalHost(), delay, retStatus, code);
}

private boolean isFlowControl(RpcException rpcException) {
boolean a = StringUtils.isNotBlank(rpcException.getMessage()) && rpcException.getMessage()
.contains(PolarisBlockException.PREFIX);
boolean b = rpcException.isLimitExceed();
return a || b;
}
private boolean isFlowControl(RpcException rpcException) {
boolean a = StringUtils.isNotBlank(rpcException.getMessage()) && rpcException.getMessage()
.contains(PolarisBlockException.PREFIX);
boolean b = rpcException.isLimitExceed();
return a || b;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.tencent.polaris.api.core.ProviderAPI;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.plugin.circuitbreaker.ResourceStat;
import com.tencent.polaris.api.plugin.circuitbreaker.entity.InstanceResource;
import com.tencent.polaris.api.plugin.circuitbreaker.entity.Resource;
import com.tencent.polaris.api.pojo.CircuitBreakerStatus;
Expand Down Expand Up @@ -277,13 +276,8 @@ public void reportInvokeResult(String service, String method, String host, int p
serviceCallResult.setRetCode(code);
serviceCallResult.setCallerIp(callerIp);
serviceCallResult.setCallerService(new ServiceKey(polarisConfig.getNamespace(), ""));

InstanceResource resource = new InstanceResource(new ServiceKey(polarisConfig.getNamespace(), service), host, port, null);
ResourceStat stat = new ResourceStat(resource, code, delay, retStatus);

try {
consumerAPI.updateServiceCallResult(serviceCallResult);
circuitBreakAPI.report(stat);
} catch (PolarisException e) {
DUBBO_LOGGER.error(formatCode(e.getCode()),
e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ public static <T> List<DubboServiceInfo> analyzeRemoteDubboServiceInfo(Invoker<T
.build());
}

URL url = invoker.getUrl();
serviceInfos.add(DubboServiceInfo.builder()
.service(url.getServiceInterface())
.service(providerUrl.getServiceInterface())
.methodName(invocation.getMethodName())
.build());
return serviceInfos;
Expand Down

0 comments on commit 7f6341a

Please sign in to comment.