Skip to content

Commit

Permalink
Merge branch 'master' into zhangxin/feature/support-rest-template
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng committed Aug 11, 2017
2 parents 78bbfd0 + 70a59c5 commit 933b258
Show file tree
Hide file tree
Showing 84 changed files with 1,676 additions and 71 deletions.
7 changes: 6 additions & 1 deletion apm-collector/apm-collector-agentjvm/pom.xml
Expand Up @@ -13,6 +13,11 @@
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
Expand All @@ -25,7 +30,7 @@
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<artifactId>apm-collector-storage</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
@@ -1,17 +1,133 @@
package org.skywalking.apm.collector.agentjvm.grpc.handler;

import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author pengys5
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
super.collect(request, responseObserver);
int applicationInstanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", applicationInstanceId);

StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(context, applicationInstanceId, time, metric.getGcList());
});

responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}

private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, CPU cpu) {
CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric();
cpuMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId);
cpuMetric.setApplicationInstanceId(applicationInstanceId);
cpuMetric.setUsagePercent(cpu.getUsagePercent());
cpuMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId());
context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}

private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<Memory> memories) {

memories.forEach(memory -> {
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setApplicationInstanceId(applicationInstanceId);
memoryMetric.setHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
memoryMetric.setMax(memory.getMax());
memoryMetric.setUsed(memory.getUsed());
memoryMetric.setCommitted(memory.getCommitted());
memoryMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId());
context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}

private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<MemoryPool> memoryPools) {

memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setApplicationInstanceId(applicationInstanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setHeap(memoryPool.getIsHeap());
memoryPoolMetric.setInit(memoryPool.getInit());
memoryPoolMetric.setMax(memoryPool.getMax());
memoryPoolMetric.setUsed(memoryPool.getUsed());
memoryPoolMetric.setCommitted(memoryPool.getCommited());
memoryPoolMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId());
context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}

private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> {
GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric();
gcMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
gcMetric.setApplicationInstanceId(applicationInstanceId);
gcMetric.setPhrase(gc.getPhraseValue());
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
}
@@ -0,0 +1,71 @@
package org.skywalking.apm.collector.agentjvm.worker.cpu;

import org.skywalking.apm.collector.agentjvm.worker.cpu.dao.ICpuMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;

/**
* @author pengys5
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker {

public CpuMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}

@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}

@Override protected boolean needMergeDBData() {
return false;
}

@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ICpuMetricDAO.class.getName());
}

public static class Factory extends AbstractLocalAsyncWorkerProvider<CpuMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}

@Override
public CpuMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new CpuMetricPersistenceWorker(role(), clusterContext);
}

@Override
public int queueSize() {
return 1024;
}
}

public enum WorkerRole implements Role {
INSTANCE;

@Override
public String roleName() {
return CpuMetricPersistenceWorker.class.getSimpleName();
}

@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}

@Override public DataDefine dataDefine() {
return new CpuMetricDataDefine();
}
}
}
@@ -0,0 +1,34 @@
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;

import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;

/**
* @author pengys5
*/
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {

@Override public Data get(String id, DataDefine dataDefine) {
return null;
}

@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source);
}

@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
@@ -0,0 +1,9 @@
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;

import org.skywalking.apm.collector.storage.h2.dao.H2DAO;

/**
* @author pengys5
*/
public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {
}
@@ -0,0 +1,7 @@
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;

/**
* @author pengys5
*/
public interface ICpuMetricDAO {
}
@@ -0,0 +1,103 @@
package org.skywalking.apm.collector.agentjvm.worker.cpu.define;

import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;

/**
* @author pengys5
*/
public class CpuMetricDataDefine extends DataDefine {

@Override protected int initialCapacity() {
return 4;
}

@Override protected void attributeDefine() {
addAttribute(0, new Attribute(CpuMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(CpuMetricTable.COLUMN_USAGE_PERCENT, AttributeType.DOUBLE, new CoverOperation()));
addAttribute(3, new Attribute(CpuMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}

@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("cpu metric data did not need send to remote worker.");
}

@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("cpu metric data did not need send to remote worker.");
}

public static class CpuMetric implements Transform<CpuMetric> {
private String id;
private int applicationInstanceId;
private double usagePercent;
private long timeBucket;

public CpuMetric(String id, int applicationInstanceId, double usagePercent, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.usagePercent = usagePercent;
this.timeBucket = timeBucket;
}

public CpuMetric() {
}

@Override public Data toData() {
CpuMetricDataDefine define = new CpuMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataDouble(0, this.usagePercent);
data.setDataLong(0, this.timeBucket);
return data;
}

@Override public CpuMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.usagePercent = data.getDataDouble(0);
this.timeBucket = data.getDataLong(0);
return this;
}

public void setId(String id) {
this.id = id;
}

public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}

public void setUsagePercent(double usagePercent) {
this.usagePercent = usagePercent;
}

public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}

public String getId() {
return id;
}

public int getApplicationInstanceId() {
return applicationInstanceId;
}

public double getUsagePercent() {
return usagePercent;
}

public long getTimeBucket() {
return timeBucket;
}
}
}

0 comments on commit 933b258

Please sign in to comment.