Skip to content

Commit

Permalink
The rest interface for instance health web ui test success.
Browse files Browse the repository at this point in the history
  • Loading branch information
peng-yongsheng committed Aug 19, 2017
1 parent 1187136 commit c1c7d9b
Show file tree
Hide file tree
Showing 22 changed files with 193 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time);
senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
Expand All @@ -59,8 +59,8 @@ private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(applicationInstanceId));
heartBeat.setHeartbeatTime(heartBeatTime);
heartBeat.setApplicationInstanceId(applicationInstanceId);
heartBeat.setHeartBeatTime(heartBeatTime);
heartBeat.setInstanceId(applicationInstanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.skywalking.apm.collector.agentjvm.worker.heartbeat;

import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.IInstanceHeartBeatDAO;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
Expand All @@ -27,11 +27,11 @@ public InstHeartBeatPersistenceWorker(Role role, ClusterWorkerContext clusterCon
}

@Override protected boolean needMergeDBData() {
return false;
return true;
}

@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName());
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IInstanceHeartBeatDAO.class.getName());
}

public static class Factory extends AbstractLocalAsyncWorkerProvider<InstHeartBeatPersistenceWorker> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,47 @@

import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author pengys5
*/
public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {

private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class);

@Override public Data get(String id, DataDefine dataDefine) {
return null;
GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID));
data.setDataLong(0, (Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME));
logger.debug("id: {} is exists", id);
return data;
} else {
logger.debug("id: {} is not exists", id);
return null;
}
}

@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
return null;
throw new UnexpectedException("There is no need to merge stream data with database data.");
}

@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0));
return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.define;

import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
Expand All @@ -22,27 +21,35 @@ public class InstanceHeartBeatDataDefine extends DataDefine {

@Override protected void attributeDefine() {
addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
}

@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("instance heart beat data did not need send to remote worker.");
String id = remoteData.getDataStrings(0);
int instanceId = remoteData.getDataIntegers(0);
long heartBeatTime = remoteData.getDataLongs(0);
return new InstanceHeartBeat(id, heartBeatTime, instanceId);
}

@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("instance heart beat data did not need send to remote worker.");
InstanceHeartBeat instanceHeartBeat = (InstanceHeartBeat)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(instanceHeartBeat.getId());
builder.addDataIntegers(instanceHeartBeat.getInstanceId());
builder.addDataLongs(instanceHeartBeat.getHeartBeatTime());
return builder.build();
}

public static class InstanceHeartBeat implements Transform<InstanceHeartBeat> {
private String id;
private int applicationInstanceId;
private long heartbeatTime;
private long heartBeatTime;
private int instanceId;

public InstanceHeartBeat(String id, int applicationInstanceId, long heartbeatTime) {
public InstanceHeartBeat(String id, long heartBeatTime, int instanceId) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.heartbeatTime = heartbeatTime;
this.heartBeatTime = heartBeatTime;
this.instanceId = instanceId;
}

public InstanceHeartBeat() {
Expand All @@ -52,40 +59,40 @@ public InstanceHeartBeat() {
InstanceHeartBeatDataDefine define = new InstanceHeartBeatDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataLong(0, this.heartbeatTime);
data.setDataInteger(0, this.instanceId);
data.setDataLong(0, this.heartBeatTime);
return data;
}

@Override public InstanceHeartBeat toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.heartbeatTime = data.getDataLong(0);
this.instanceId = data.getDataInteger(0);
this.heartBeatTime = data.getDataLong(0);
return this;
}

public void setId(String id) {
this.id = id;
public String getId() {
return id;
}

public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
public void setId(String id) {
this.id = id;
}

public String getId() {
return id;
public long getHeartBeatTime() {
return heartBeatTime;
}

public int getApplicationInstanceId() {
return applicationInstanceId;
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}

public long getHeartbeatTime() {
return heartbeatTime;
public int getInstanceId() {
return instanceId;
}

public void setHeartbeatTime(long heartbeatTime) {
this.heartbeatTime = heartbeatTime;
public void setInstanceId(int instanceId) {
this.instanceId = instanceId;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatH2DAO
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@ public class JVMMetricsServiceHandlerTestCase {

private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandlerTestCase.class);

private JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub;
private static JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub;

public void test() {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);

buildJvmMetric(2);
buildJvmMetric(3);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void buildJvmMetric(int instanceId) {
JVMMetrics.Builder jvmMetricsBuilder = JVMMetrics.newBuilder();
jvmMetricsBuilder.setApplicationInstanceId(1);
jvmMetricsBuilder.setApplicationInstanceId(instanceId);

JVMMetric.Builder jvmMetric = JVMMetric.newBuilder();
jvmMetric.setTime(System.currentTimeMillis());
Expand All @@ -41,13 +52,13 @@ public void test() {
stub.collect(jvmMetricsBuilder.build());
}

private void buildCpuMetric(JVMMetric.Builder jvmMetric) {
private static void buildCpuMetric(JVMMetric.Builder jvmMetric) {
CPU.Builder cpuBuilder = CPU.newBuilder();
cpuBuilder.setUsagePercent(70);
jvmMetric.setCpu(cpuBuilder);
}

private void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
private static void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
Memory.Builder builder_1 = Memory.newBuilder();
builder_1.setIsHeap(true);
builder_1.setInit(20);
Expand All @@ -65,7 +76,7 @@ private void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
jvmMetric.addMemory(builder_2.build());
}

private void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
private static void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
MemoryPool.Builder builder_1 = MemoryPool.newBuilder();
builder_1.setType(PoolType.NEWGEN_USAGE);
builder_1.setIsHeap(true);
Expand All @@ -76,7 +87,7 @@ private void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
jvmMetric.addMemoryPool(builder_1.build());
}

private void buildGcMetric(JVMMetric.Builder jvmMetric) {
private static void buildGcMetric(JVMMetric.Builder jvmMetric) {
GC.Builder gcBuilder = GC.newBuilder();
gcBuilder.setPhrase(GCPhrase.NEW);
gcBuilder.setCount(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public int getOrCreate(int applicationId, String agentUUID, long registerTime) {

if (instanceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0, registerTime);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
Expand All @@ -46,7 +46,7 @@ public void recover(int instanceId, int applicationId, long registerTime) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());

InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId, registerTime);
dao.save(instance);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;

import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable;

/**
* @author pengys5
*/
public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine {

public InstPerformanceEsTableDefine() {
super(InstPerformanceTable.TABLE);
}

@Override public int refreshInterval() {
return 2;
}

@Override public int numberOfShards() {
return 2;
}

@Override public int numberOfReplicas() {
return 0;
}

@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;

import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable;

/**
* @author pengys5
*/
public class InstPerformanceH2TableDefine extends H2TableDefine {

public InstPerformanceH2TableDefine() {
super(InstPerformanceTable.TABLE);
}

@Override public void initialize() {
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public ApplicationEsTableDefine() {
}

@Override public int refreshInterval() {
return 0;
return 2;
}

@Override public int numberOfShards() {
Expand Down
Loading

0 comments on commit c1c7d9b

Please sign in to comment.