Skip to content

Commit

Permalink
Add agent instance register and heartbeat report
Browse files Browse the repository at this point in the history
  • Loading branch information
ascrutae committed Jun 11, 2017
1 parent f2e3359 commit b4ae1c6
Show file tree
Hide file tree
Showing 60 changed files with 1,568 additions and 247 deletions.
Expand Up @@ -22,6 +22,10 @@ public static class NodeMappingHourAgg {
public static class NodeMappingMinuteAgg {
public static int VALUE = 2;
}

public static class HeartBeatSave {
public static int VALUE = 2;
}
}

public static class NodeRef {
Expand Down Expand Up @@ -98,6 +102,10 @@ public static class NodeMappingHourAnalysis {
public static class NodeMappingMinuteAnalysis {
public static int SIZE = 1024;
}

public class HeartBeatAnalysis {
public static final int SIZE = 64;
}
}

public static class NodeRef {
Expand Down
Expand Up @@ -2,60 +2,46 @@

import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.*;
import org.skywalking.apm.collector.worker.segment.entity.Segment;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import org.skywalking.apm.collector.actor.AbstractLocalSyncWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalSyncWorkerRef;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.worker.instance.entity.RegistryInfo;
import org.skywalking.apm.collector.worker.instance.entity.HeartBeat;
import org.skywalking.apm.collector.worker.segment.entity.Segment;

/**
* @author pengys5
*/

public abstract class AbstractPost extends AbstractLocalAsyncWorker {
public abstract class AbstractPost extends AbstractLocalSyncWorker {

public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}

@Override
final public void onWork(Object message) throws Exception {
onReceive(message);
protected void onWork(Object request, Object response) throws Exception {
try {
onReceive(request, (JsonObject)response);
} catch (Exception e) {
((JsonObject)response).addProperty("isSuccess", false);
((JsonObject)response).addProperty("reason", e.getMessage());
}
}

protected abstract void onReceive(Object message) throws Exception;
protected abstract void onReceive(Object message, JsonObject response) throws Exception;

static class PostWithHttpServlet extends AbstractHttpServlet {
public static class SegmentPostWithHttpServlet extends AbstractPostWithHttpServlet {

private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);

private final LocalAsyncWorkerRef ownerWorkerRef;

PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
public SegmentPostWithHttpServlet(WorkerRef ownerWorkerRef) {
super(ownerWorkerRef);
}

@Override
final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
streamReader(bufferedReader);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
logger.error(e);
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}

private void streamReader(BufferedReader bufferedReader) throws Exception {
protected void doWork(BufferedReader bufferedReader, JsonObject resJson) throws Exception {
try (JsonReader reader = new JsonReader(bufferedReader)) {
readSegmentArray(reader);
}
Expand All @@ -71,4 +57,42 @@ private void readSegmentArray(JsonReader reader) throws Exception {
reader.endArray();
}
}

public static class RegisterPostWithHttpServlet extends AbstractPostWithHttpServlet {

public RegisterPostWithHttpServlet(WorkerRef ownerWorkerRef) {
super(ownerWorkerRef);
}

@Override
protected void doWork(BufferedReader bufferedReader, JsonObject resJson) throws Exception {
JsonReader reader = new JsonReader(bufferedReader);
reader.beginObject();
if (reader.nextName().equals("ac")) {
RegistryInfo registryParam = new RegistryInfo(reader.nextString());
((LocalSyncWorkerRef)ownerWorkerRef).ask(registryParam, resJson);
}
reader.endObject();
}

}

public static class HeartBeatPostWithHttpServlet extends AbstractPostWithHttpServlet {

public HeartBeatPostWithHttpServlet(WorkerRef ownerWorkerRef) {
super(ownerWorkerRef);
}

@Override
protected void doWork(BufferedReader bufferedReader, JsonObject resJson) throws Exception {
JsonReader reader = new JsonReader(bufferedReader);
reader.beginObject();
if (reader.nextName().equals("ac")) {
HeartBeat registryParam = new HeartBeat(reader.nextString());
ownerWorkerRef.tell(registryParam);
}
}

}

}
Expand Up @@ -7,14 +7,15 @@
/**
* @author pengys5
*/
public abstract class AbstractPostProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalAsyncWorkerProvider<T> {
public abstract class AbstractPostProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {

public abstract String servletPath();

final protected void create(
ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractPost.PostWithHttpServlet postWithHttpServlet = new AbstractPost.PostWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
WorkerRef workerRef = super.create(AbstractWorker.noOwner());
context.addServlet(new ServletHolder(handleServlet(workerRef)), servletPath());
}

public abstract AbstractPostWithHttpServlet handleServlet(WorkerRef workerRef);
}
@@ -0,0 +1,38 @@
package org.skywalking.apm.collector.worker.httpserver;

import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.WorkerRef;

public abstract class AbstractPostWithHttpServlet extends AbstractHttpServlet {

private Logger logger = LogManager.getFormatterLogger(AbstractPostWithHttpServlet.class);
protected final WorkerRef ownerWorkerRef;

AbstractPostWithHttpServlet(WorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}

@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
doWork(bufferedReader, resJson);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
logger.error(e);
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}

protected abstract void doWork(BufferedReader bufferedReader, JsonObject resJson) throws Exception;
}

@@ -0,0 +1,70 @@
package org.skywalking.apm.collector.worker.instance;

import com.google.gson.JsonObject;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.LocalWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
import org.skywalking.apm.collector.worker.httpserver.AbstractPost;
import org.skywalking.apm.collector.worker.httpserver.AbstractPostProvider;
import org.skywalking.apm.collector.worker.httpserver.AbstractPostWithHttpServlet;
import org.skywalking.apm.collector.worker.instance.entity.HeartBeat;
import org.skywalking.apm.collector.worker.instance.heartbeat.HeartBeatAnalysis;

public class HeartBeatReportPost extends AbstractPost {

public HeartBeatReportPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}

@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(HeartBeatAnalysis.Role.INSTANCE).create(this);
}

@Override
protected void onReceive(Object message, JsonObject response) throws Exception {
if (message instanceof HeartBeat) {
getSelfContext().lookup(HeartBeatAnalysis.Role.INSTANCE).tell(message);
}
}

public static class Factory extends AbstractPostProvider<HeartBeatReportPost> {
@Override
public Role role() {
return HeartBeatReportPost.WorkerRole.INSTANCE;
}

@Override
public HeartBeatReportPost workerInstance(ClusterWorkerContext clusterContext) {
return new HeartBeatReportPost(role(), clusterContext, new LocalWorkerContext());
}

@Override
public String servletPath() {
return "/heartbeat";
}

@Override
public AbstractPostWithHttpServlet handleServlet(WorkerRef workerRef) {
return new HeartBeatPostWithHttpServlet(workerRef);
}
}

public enum WorkerRole implements Role {
INSTANCE;

@Override
public String roleName() {
return HeartBeatReportPost.class.getSimpleName();
}

@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
@@ -0,0 +1,57 @@
package org.skywalking.apm.collector.worker.instance;

class IdentificationCache {
private static final IdentificationCache INSTANCE = new IdentificationCache();

State state;
IdentificationSegment segment;

private IdentificationCache() {
state = State.NORMAL;
}

public synchronized long fetchInstanceId() {
if (state == State.ABNORMAL) {
return -1;
}

if (!segment.hasNext()) {
IdentificationSegmentFetcher.INSTANCE.fetchSegment(new IdentificationSegmentFetcher.Listener() {
@Override
public void failed() {
state = State.ABNORMAL;
IdentificationSegmentFetcher.INSTANCE.fetchSegmentInBackGround(INSTANCE);
}

@Override
public void success(IdentificationSegment idSegment) {
segment = idSegment;
}
});
}

return segment.nextInstanceId();
}

public static IdentificationCache initCache() {
IdentificationSegmentFetcher.INSTANCE.fetchSegment(new IdentificationSegmentFetcher.Listener() {
@Override
public void failed() {
INSTANCE.state = State.ABNORMAL;
IdentificationSegmentFetcher.INSTANCE.fetchSegmentInBackGround(INSTANCE);
}

@Override
public void success(IdentificationSegment idSegment) {
INSTANCE.segment = idSegment;
}
});

return INSTANCE;
}

enum State {
NORMAL,
ABNORMAL
}
}
@@ -0,0 +1,19 @@
package org.skywalking.apm.collector.worker.instance;

public class IdentificationSegment {
private long startInstanceId;
private long endInstanceId;

IdentificationSegment(long start, long end) {
this.startInstanceId = start;
this.endInstanceId = end;
}

public long nextInstanceId() {
return startInstanceId++;
}

public boolean hasNext() {
return startInstanceId + 1 >= endInstanceId;
}
}

0 comments on commit b4ae1c6

Please sign in to comment.