Skip to content

Commit

Permalink
Merge branch 'master' into wu-sheng-3.2-readme
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng committed Aug 31, 2017
2 parents 1b8db8c + 61b4a08 commit 0003f7a
Show file tree
Hide file tree
Showing 134 changed files with 2,794 additions and 1,891 deletions.
Expand Up @@ -59,7 +59,7 @@ private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(applicationInstanceId));
heartBeat.setHeartBeatTime(heartBeatTime);
heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
heartBeat.setInstanceId(applicationInstanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
Expand Down
Expand Up @@ -4,9 +4,9 @@
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
Expand All @@ -26,23 +26,19 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp

@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
instanceIDService.heartBeat(request.getApplicationInstanceId(), request.getHeartbeatTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
Expand Down
@@ -1,10 +1,10 @@
package org.skywalking.apm.collector.agentregister.instance;

import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand Down Expand Up @@ -36,12 +36,6 @@ public int getOrCreate(int applicationId, String agentUUID, long registerTime, S
return applicationId;
}

public void heartBeat(int instanceId, long heartbeatTime) {
logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}

public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class TraceSegmentServletHandler extends JettyHandler {

private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);

reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
Expand Down
Expand Up @@ -2,25 +2,25 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;

/**
* @author pengys5
*/
public class ServiceNameCache {
public class ServiceCache {

private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(2000).build();
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().maximumSize(10000).build();

public static int get(int applicationId, String serviceName) {
public static String getServiceName(int serviceId) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
return CACHE.get(serviceId, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
return dao.getServiceName(serviceId);
});
} catch (Throwable e) {
return 0;
return Const.EMPTY_STRING;
}
}
}
Expand Up @@ -2,15 +2,13 @@

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand All @@ -22,48 +20,59 @@
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, LocalSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {

private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);

private List<String> nodeComponents = new ArrayList<>();
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
private long timeBucket;

@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());

String id;
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
nodeComponent.setComponentName(spanObject.getComponent());
id = nodeComponent.getComponentName();
} else {
nodeComponent.setComponentName(Const.EMPTY_STRING);
id = String.valueOf(nodeComponent.getComponentId());
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());

nodeComponent.setPeerId(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
peer = spanObject.getPeer();
nodeComponent.setPeer(spanObject.getPeer());
id = id + Const.ID_SPLIT + nodeComponent.getPeer();
} else {
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + nodeComponent.getPeerId();
}

String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
nodeComponent.setId(id);
nodeComponents.add(nodeComponent);
}

@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
buildEntryOrLocal(spanObject, applicationId);
}

@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
buildEntryOrLocal(spanObject, applicationId);
}

private void buildEntryOrLocal(SpanObject spanObject, int applicationId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());

String id;
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
nodeComponent.setComponentName(spanObject.getComponent());
id = nodeComponent.getComponentName();
} else {
id = String.valueOf(nodeComponent.getComponentId());
nodeComponent.setComponentName(Const.EMPTY_STRING);
}

String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
nodeComponent.setPeerId(applicationId);
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + String.valueOf(applicationId);
nodeComponent.setId(id);

nodeComponents.add(nodeComponent);
}

@Override
Expand All @@ -74,10 +83,8 @@ public void parseFirst(SpanObject spanObject, int applicationId, int application
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);

nodeComponents.forEach(agg -> {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponents.forEach(nodeComponent -> {
nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId());
nodeComponent.setTimeBucket(timeBucket);

try {
Expand Down
Expand Up @@ -5,11 +5,11 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;

/**
* @author pengys5
Expand All @@ -21,7 +21,10 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_AGG));
data.setDataInteger(0, ((Number)source.get(NodeComponentTable.COLUMN_COMPONENT_ID)).intValue());
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_COMPONENT_NAME));
data.setDataInteger(1, ((Number)source.get(NodeComponentTable.COLUMN_PEER_ID)).intValue());
data.setDataString(2, (String)source.get(NodeComponentTable.COLUMN_PEER));
data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return data;
} else {
Expand All @@ -31,15 +34,21 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer

@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
}

@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
Expand Down
@@ -1,8 +1,8 @@
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
* @author pengys5
Expand All @@ -26,7 +26,10 @@ public NodeComponentEsTableDefine() {
}

@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
@@ -1,8 +1,8 @@
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
* @author pengys5
Expand All @@ -15,7 +15,10 @@ public NodeComponentH2TableDefine() {

@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}

0 comments on commit 0003f7a

Please sign in to comment.