Skip to content

Commit

Permalink
Provide new data protocol for segment messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng committed Jun 5, 2017
1 parent 7885b4b commit 951192d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 53 deletions.
@@ -1,13 +1,6 @@
package org.skywalking.apm.trace;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -18,7 +11,6 @@
*
* @author wusheng
*/
@JsonAdapter(SegmentsMessage.Serializer.class)
public class SegmentsMessage {
private List<TraceSegment> segments;

Expand All @@ -34,40 +26,23 @@ public List<TraceSegment> getSegments() {
return Collections.unmodifiableList(segments);
}

public static class Serializer extends TypeAdapter<SegmentsMessage> {

@Override
public void write(JsonWriter out, SegmentsMessage value) throws IOException {
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();

out.beginArray();
try {
for (TraceSegment segment : value.segments) {
out.jsonValue(gson.toJson(segment));
}
} finally {
out.endArray();
}
}

@Override
public SegmentsMessage read(JsonReader in) throws IOException {
SegmentsMessage message = new SegmentsMessage();
in.beginArray();
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
try {
while (in.hasNext()) {
TraceSegment traceSegment = gson.fromJson(in, TraceSegment.class);
message.append(traceSegment);
}
} finally {
in.endArray();
}
return message;
/**
* This serialization mechanism started from 3.1, it is similar to network package.
* The data protocol is
*
* segment1.json.length + ' '(one blank space) + segment1.json
* + segment2.json.length + ' '(one blank space) + segment2.json
* + etc.
*
* @param gson the serializer for {@link TraceSegment}
* @return the string represents the <code>SegmentMessage</code>
*/
public String serialize(Gson gson) {
StringBuilder buffer = new StringBuilder();
for (TraceSegment segment : segments) {
String segmentJson = gson.toJson(segment);
buffer.append(segmentJson.length()).append(' ').append(segmentJson);
}
return buffer.toString();
}
}
Expand Up @@ -108,15 +108,18 @@ public void testSerialize() {

SegmentsMessage message = new SegmentsMessage();
message.append(segment);
String json = gson.toJson(message);

message = gson.fromJson(json, SegmentsMessage.class);
String jsonString = message.serialize(gson);
int length = Integer.parseInt(jsonString.substring(0, 4));

TraceSegment newSegment = message.getSegments().get(0);
String segmentJson = jsonString.substring(5);

Assert.assertEquals(segment.getSpans().size(), newSegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), newSegment.getRefs().get(0).getTraceSegmentId());
Assert.assertEquals(Tags.SPAN_LAYER.get(segment.getSpans().get(1)), Tags.SPAN_LAYER.get(newSegment.getSpans().get(1)));
Assert.assertEquals(segment.getSpans().get(1).getLogs().get(0).getTime(), newSegment.getSpans().get(1).getLogs().get(0).getTime());
Assert.assertEquals(length, segmentJson.length());
TraceSegment recoverySegment = gson.fromJson(segmentJson, TraceSegment.class);

Assert.assertEquals(segment.getSpans().size(), recoverySegment.getSpans().size());
Assert.assertEquals(segment.getRefs().get(0).getTraceSegmentId(), recoverySegment.getRefs().get(0).getTraceSegmentId());
Assert.assertEquals(Tags.SPAN_LAYER.get(segment.getSpans().get(1)), Tags.SPAN_LAYER.get(recoverySegment.getSpans().get(1)));
Assert.assertEquals(segment.getSpans().get(1).getLogs().get(0).getTime(), recoverySegment.getSpans().get(1).getLogs().get(0).getTime());
}
}
Expand Up @@ -32,13 +32,17 @@ public class CollectorClient implements Runnable {
private static long SLEEP_TIME_MILLIS = 500;
private String[] serverList;
private volatile int selectedServer = -1;
private Gson serializer;

public CollectorClient() {
serverList = Config.Collector.SERVERS.split(",");
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
serializer = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
}

@Override
Expand Down Expand Up @@ -84,10 +88,7 @@ private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusE
if (message == null) {
return;
}
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String messageJson = gson.toJson(message);
String messageJson = message.serialize(serializer);
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
HttpPost httpPost = ready2Send(messageJson);
Expand Down

0 comments on commit 951192d

Please sign in to comment.