Skip to content

Commit

Permalink
add support for tracing using skywalking
Browse files Browse the repository at this point in the history
Signed-off-by: wanjunlei <wanjunlei@kubesphere.io>
  • Loading branch information
wanjunlei committed Aug 17, 2023
1 parent c722869 commit 2569fc5
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 43 deletions.
2 changes: 1 addition & 1 deletion functions-framework-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<groupId>dev.openfunction.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,5 @@ public interface Context {
*/
DaprClient getDaprClient();

CloudEvent packageAsCloudevent(String payload);
byte[] packageAsCloudevent(String payload);
}
11 changes: 9 additions & 2 deletions functions-framework-invoker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>dev.openfunction.functions</groupId>
<artifactId>functions-framework-invoker</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>

<properties>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
Expand All @@ -20,6 +20,7 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<cloudevents.sdk.version>2.4.2</cloudevents.sdk.version>
<skywalking.version>8.16.0</skywalking.version>
</properties>

<licenses>
Expand Down Expand Up @@ -60,7 +61,7 @@
<dependency>
<groupId>dev.openfunction.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
Expand Down Expand Up @@ -179,6 +180,12 @@
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>

<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>${skywalking.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,6 @@ public void setPluginsTracing(TracingConfig pluginsTracing) {
this.pluginsTracing = pluginsTracing;
}

public boolean isTracingEnabled() {
if (tracing != null ) {
return tracing.isEnabled();
} else if (pluginsTracing != null) {
return pluginsTracing.isEnabled();
} else {
return false;
}
}

public Map<String, Component> getStates() {
return states;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,41 @@ public RuntimeContext(String context, ClassLoader classLoader) throws Exception

loadHooks(classLoader);

if (functionContext.isTracingEnabled() && functionContext.getPluginsTracing().getProvider() != null) {
String provider = functionContext.getPluginsTracing().getProvider().getName();
TracingConfig tracingConfig = getTracingConfig();
if (tracingConfig != null && tracingConfig.isEnabled() && tracingConfig.getProvider() != null) {
String provider = tracingConfig.getProvider().getName();
if (!Objects.equals(provider, TracingSkywalking) && !Objects.equals(provider, TracingOpentelemetry)) {
throw new IllegalArgumentException("unsupported tracing provider " + provider);
}

switch (provider) {
case TracingSkywalking:
tracingProvider = new SkywalkingProvider();
tracingProvider = new SkywalkingProvider(tracingConfig,
functionContext.getName(),
System.getenv(RuntimeContext.PodNameEnvName),
System.getenv(RuntimeContext.PodNamespaceEnvName));
break;
case TracingOpentelemetry:
tracingProvider = new OpenTelemetryProvider(functionContext.getPluginsTracing(),
tracingProvider = new OpenTelemetryProvider(tracingConfig,
functionContext.getName(),
System.getenv(RuntimeContext.PodNameEnvName),
System.getenv(RuntimeContext.PodNamespaceEnvName));
break;
}
}

EventFormatProvider.getInstance().registerFormat(new JsonEventFormat());
}

private TracingConfig getTracingConfig() {
TracingConfig tracingConfig = functionContext.getTracing();
if (tracingConfig != null) {
return tracingConfig;
}

return functionContext.getPluginsTracing();
}

private void loadHooks(ClassLoader classLoader) {
String[] preHookNames = functionContext.getPreHooks();
if (ArrayUtils.isEmpty(preHookNames)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public Error send(String outputName, String data) {
// binding must be in CloudEvent format, otherwise pubsub cannot parse the data.
byte[] payload = data.getBytes();
if (MiddlewaresCloudEventFormatRequired.contains(output.getComponentType())) {
CloudEvent event = packageAsCloudevent(data);
payload = new JsonEventFormat().serialize(event);
payload = packageAsCloudevent(data);
}

daprClient.invokeBinding(output.getComponentName(), output.getOperation(), payload).block();
Expand Down Expand Up @@ -169,14 +168,15 @@ public DaprClient getDaprClient() {
}

@Override
public CloudEvent packageAsCloudevent(String payload) {
return new CloudEventBuilder()
public byte[] packageAsCloudevent(String payload) {
CloudEvent event = new CloudEventBuilder()
.withId(UUID.randomUUID().toString())
.withType("dapr.invoke")
.withSource(URI.create("openfunction/invokeBinding"))
.withData(payload.getBytes())
.withDataContentType(JsonEventFormat.CONTENT_TYPE)
.build();
return new JsonEventFormat().serialize(event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class OpenTelemetryProvider implements TracingProvider {
private static final String Protocol_HTTP = "http";

private final String functionName;
private final Map<String, String> tags;
private Map<String, String> tags;
private final Map<String, String> baggage;
private final TextMapGetter<Map<String, String>> getter;

Expand Down Expand Up @@ -107,6 +107,9 @@ public String get(Map<String, String> carrier, @NotNull String key) {
};

tags = config.getTags();
if (tags == null) {
tags = new HashMap<>();
}
if (!Objects.equals(pod, "")) {
tags.put("instance", pod);
}
Expand Down Expand Up @@ -238,7 +241,7 @@ public void executeWithTracing(UserContext ctx, Callback callback) throws Except
executeWithTracing(ctx.getFunctionClass().getSimpleName(), kind, tags, callback);
}

public void executeWithTracing(Map<String, String> carrier, Callback callback) throws Exception {
private void executeWithTracing(Map<String, String> carrier, Callback callback) throws Exception {
TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
Context parentContext = propagator.extract(Context.root(), carrier, getter);
Tracer tracer = GlobalOpenTelemetry.getTracer(OTEL_LIBRARY_NAME, OTEL_LIBRARY_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,164 @@

import dev.openfunction.functions.*;
import dev.openfunction.invoker.Callback;
import dev.openfunction.invoker.context.TracingConfig;
import dev.openfunction.invoker.context.UserContext;
import io.cloudevents.CloudEvent;
import org.apache.skywalking.apm.toolkit.trace.*;

import java.util.Map;
import java.util.*;

public class SkywalkingProvider implements TracingProvider {
/**
* <a href="https://github.com/apache/skywalking/blob/master/oap-server/server-starter/src/main/resources/component-libraries.yml#L515">...</a>
*/
private static final int componentIDOpenFunction = 5013;

// private final Map<String, String> tags;
// private final Map<String, String> baggage;

public SkywalkingProvider() {

// private static final int componentIDOpenFunction = 5013;

private final String functionName;
private Map<String, String> tags;
private final Map<String, String> baggage;

public SkywalkingProvider(TracingConfig config, String functionName, String pod, String namespace) {
this.functionName = functionName;

tags = config.getTags();
tags = config.getTags();
if (tags == null) {
tags = new HashMap<>();
}
if (!Objects.equals(pod, "")) {
tags.put("instance", pod);
}
if (!Objects.equals(namespace, "")) {
tags.put("namespace", pod);
}
baggage = config.getBaggage();
}

@Override
public void executeWithTracing(HttpRequest httpRequest, Callback callback) throws Exception {

Map<String, String> carrier = new HashMap<>();
Map<String, List<String>> headers = httpRequest.getHeaders();
if (headers != null) {
for (String key : headers.keySet()) {
if (headers.get(key).size() > 0) {
carrier.put(key, headers.get(key).get(0));
}
}
}

HashMap<String, String> newTags = new HashMap<>(tags);
newTags.put("Method", httpRequest.getMethod());
newTags.put("URI", httpRequest.getUri());

executeWithTracing(carrier, newTags, callback);
}

@Override
public void executeWithTracing(CloudEvent event, Callback callback) throws Exception {

Map<String, String> carrier = new HashMap<>();
for (String key : event.getExtensionNames()) {
Object obj = event.getExtension(key);
carrier.put(key, obj == null ? "" : obj.toString());
}
executeWithTracing(carrier, tags, callback);
}

@Override
public void executeWithTracing(BindingEvent event, Callback callback) throws Exception {

executeWithTracing(new HashMap<>(), tags, callback);
}

@Override
public void executeWithTracing(TopicEvent event, Callback callback) throws Exception {
executeWithTracing(new HashMap<>(), tags, callback);
}

private void executeWithTracing(Map<String, String> carrier, Map<String, String> tags, Callback callback) throws Exception {
ContextCarrierRef contextCarrierRef = new ContextCarrierRef();
if (carrier != null) {
CarrierItemRef next = contextCarrierRef.items();
while (next.hasNext()) {
next = next.next();
if (carrier.get(next.getHeadKey()) != null) {
next.setHeadValue(carrier.get(next.getHeadKey()));
}
}
}

SpanRef span = Tracer.createEntrySpan(functionName, contextCarrierRef);
if (tags != null) {
for (String key : tags.keySet()) {
span.tag(key, tags.get(key));
}
}

if (baggage != null) {
for (String key : baggage.keySet()) {
TraceContext.putCorrelation(key, baggage.get(key));
}
}

Error err = callback.execute();
if (err != null) {
ActiveSpan.error(err);
}
Tracer.stopSpan();
}

@Override
public void executeWithTracing(Callback callback) throws Exception {

executeWithTracing("function", null, callback);
}

@Override
public void executeWithTracing(Plugin plugin, Callback callback) throws Exception {

Map<String, String> tags = new HashMap<>();
tags.put("kind", "Plugin");
tags.put("name", plugin.name());
tags.put("version", plugin.version());
if (plugin.tagsAddToTracing() != null) {
tags.putAll(plugin.tagsAddToTracing());
}

executeWithTracing(plugin.name(), tags, callback);
}

@Override
public void executeWithTracing(Hook hook, Callback callback) throws Exception {

Map<String, String> tags = new HashMap<>();
tags.put("kind", "Hook");
tags.put("name", hook.name());
tags.put("version", hook.version());
if (hook.tagsAddToTracing() != null) {
tags.putAll(hook.tagsAddToTracing());
}

executeWithTracing(hook.name(), tags, callback);
}

@Override
public void executeWithTracing(UserContext ctx, Callback callback) throws Exception {
Map<String, String> tags = new HashMap<>();
tags.put("function", ctx.getFunctionClass().getName());

executeWithTracing(ctx.getFunctionClass().getSimpleName(), tags, callback);
}

private void executeWithTracing(String name, Map<String, String> tags, Callback callback) throws Exception {
SpanRef span = Tracer.createLocalSpan(name);

if (tags != null) {
for (String key : tags.keySet()) {
span.tag(key, tags.get(key));
}
}

Error err = callback.execute();
if (err != null) {
ActiveSpan.error(err);
}

Tracer.stopSpan();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash

mvn clean compile dependency:copy-dependencies
mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005 dev.openfunction.invoker.Runner"
mvn exec:exec -Dexec.executable="java" -Dexec.args="-javaagent:./skywalking-agent/skywalking-agent.jar -classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005 dev.openfunction.invoker.Runner"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash

mvn clean compile dependency:copy-dependencies
mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* dev.openfunction.invoker.Runner"
mvn exec:exec -Dexec.executable="java" -Dexec.args="-javaagent:./skywalking-agent/skywalking-agent.jar -classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* dev.openfunction.invoker.Runner"
2 changes: 1 addition & 1 deletion samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>dev.openfunction.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public Out accept(Context context, String payload) throws Exception {
for (String key : context.getOutputs().keySet()) {
Component output = context.getOutputs().get(key);
if (output.isPubsub()) {
daprClient.publishEvent(output.getComponentName(), output.getTopic(), payload, output.getMetadata());
daprClient.publishEvent(output.getComponentName(), output.getTopic(), payload, output.getMetadata()).block();
System.out.println("send data to " + output.getComponentName());
} else if (output.isBinding()) {
// We recommend using CloudEvents to pass data between Dapr components.
daprClient.invokeBinding(output.getComponentName(), output.getOperation(), context.packageAsCloudevent(payload));
daprClient.invokeBinding(output.getComponentName(), output.getOperation(), context.packageAsCloudevent(payload), output.getMetadata()).block();
System.out.println("send data to " + output.getComponentName());
}
}
}
Expand Down

0 comments on commit 2569fc5

Please sign in to comment.