Skip to content

Commit

Permalink
Merge pull request #398 from malafeev/pulsar-functions
Browse files Browse the repository at this point in the history
add support of Apache Pulsar Functions
  • Loading branch information
safris committed Feb 7, 2020
2 parents e6d211c + a8b7ff9 commit 4913a9c
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ Direction for development of [<ins>Instrumentation Rules</ins>](#45-instrumentat
| Play Framework | [`play`][play] | 2.6.0 | 2.7.3 |
| Play WS | [`play:ws`][play-ws] | 2.0.0 | 2.0.7 |
| Pulsar Client | [`pulsar:client`][pulsar-client] | 2.2.0 | 2.4.2 |
| Pulsar Functions | [`pulsar:functions`][pulsar-functions] | 2.2.0 | 2.4.2 |
| [RabbitMQ Client](https://github.com/opentracing-contrib/java-rabbitmq-client) | [`rabbitmq:client`][rabbitmq-client] | 5.0.0 | 5.7.3 |
| [Reactor](https://github.com/opentracing-contrib/java-reactor) | [`reactor`][reactor] | 3.2.3.RELEASE | **3.2.3.RELEASE** |
| [Redisson](https://github.com/opentracing-contrib/java-redis-client/tree/master/opentracing-redis-redisson) | [`redisson`][redisson] | 3.11.0 | 3.11.5 |
Expand Down Expand Up @@ -653,6 +654,7 @@ This project is licensed under the Apache 2 License - see the [LICENSE.txt](LICE
[play-ws]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/play-ws
[play]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/play
[pulsar-client]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/pulsar-client
[pulsar-functions]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/pulsar-functions
[rabbitmq-client]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/rabbitmq-client
[reactor]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/reactor
[redisson]: https://github.com/opentracing-contrib/java-specialagent/tree/master/rule/redisson
Expand Down
6 changes: 6 additions & 0 deletions opentracing-specialagent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentracing.contrib.specialagent.rule</groupId>
<artifactId>pulsar-functions</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentracing.contrib.specialagent.rule</groupId>
<artifactId>dynamic</artifactId>
Expand Down
1 change: 1 addition & 0 deletions rule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<module>neo4j-driver</module>
<module>netty</module>
<module>pulsar-client</module>
<module>pulsar-functions</module>
<module>dynamic</module>
</modules>
<build>
Expand Down
44 changes: 44 additions & 0 deletions rule/pulsar-functions/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
Copyright 2020 The OpenTracing Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.opentracing.contrib.specialagent.rule</groupId>
<artifactId>rule</artifactId>
<version>1.5.8-SNAPSHOT</version>
</parent>
<artifactId>pulsar-functions</artifactId>
<name>SpecialAgent Rule for Apache Pulsar Functions</name>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<version.pulsar>2.2.0</version.pulsar>
<sa.plugin.name>pulsar:functions</sa.plugin.name>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-instance</artifactId>
<version>${version.pulsar}</version>
<optional>true</optional>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/* Copyright 2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentracing.contrib.specialagent.rule.pulsar.functions;

import io.opentracing.References;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.propagation.Format.Builtin;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.JavaExecutionResult;

public class PulsarFunctionsAgentIntercept {
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
static final String COMPONENT_NAME = "java-pulsar-functions";

private static class Context {
private Scope scope;
private Span span;
}

public static void handleMessageEnter(Object function, Object contextArg, Object arg0) {
final Tracer tracer = GlobalTracer.get();
final SpanBuilder spanBuilder = tracer
.buildSpan(getFunctionName(function, contextArg))
.withTag(Tags.COMPONENT, COMPONENT_NAME)
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_SERVER);

if (arg0 != null) {
Record<?> record = (Record<?>) arg0;
final SpanContext spanContext = tracer
.extract(Builtin.TEXT_MAP, new TextMapExtractAdapter(record.getProperties()));
if (spanContext != null) {
spanBuilder.addReference(References.FOLLOWS_FROM, spanContext);
}
}

final Span span = spanBuilder.start();
final Scope scope = tracer.activateSpan(span);

final Context context = new Context();
contextHolder.set(context);
context.scope = scope;
context.span = span;
}

private static String getFunctionName(Object function, Object contextArg) {
if (contextArg != null) {
final org.apache.pulsar.functions.api.Context contextImpl = (org.apache.pulsar.functions.api.Context) contextArg;
if (contextImpl.getFunctionName() != null) {
return contextImpl.getFunctionName();
}
}
final String simpleName = function.getClass().getSimpleName();
if(simpleName.length() == 0) {
return function.getClass().getName();
}
return simpleName;
}

public static void handleMessageEnd(Object returned, Throwable thrown) {
final Context context = contextHolder.get();
if (context == null) {
return;
}

context.scope.close();
final Span span = context.span;
contextHolder.remove();

if (thrown != null) {
onError(thrown, span);
span.finish();
return;
}

JavaExecutionResult result = (JavaExecutionResult) returned;
if (result.getSystemException() != null) {
onError(result.getSystemException(), span);
} else if (result.getUserException() != null) {
onError(result.getUserException(), span);
}

span.finish();
}

private static void onError(final Throwable t, final Span span) {
Tags.ERROR.set(span, Boolean.TRUE);
if (t != null) {
span.log(errorLogs(t));
}
}

private static Map<String, Object> errorLogs(final Throwable t) {
final Map<String, Object> errorLogs = new HashMap<>(2);
errorLogs.put("event", Tags.ERROR.getKey());
errorLogs.put("error.object", t);
return errorLogs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* Copyright 2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentracing.contrib.specialagent.rule.pulsar.functions;

import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentracing.contrib.specialagent.AgentRule;
import java.util.Arrays;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.agent.builder.AgentBuilder.Transformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType.Builder;
import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import net.bytebuddy.utility.JavaModule;

public class PulsarFunctionsAgentRule extends AgentRule {
@Override
public Iterable<? extends AgentBuilder> buildAgent(final AgentBuilder builder) {
return Arrays.asList(builder
.type(named("org.apache.pulsar.functions.instance.JavaInstance"))
.transform(new Transformer() {
@Override
public Builder<?> transform(final Builder<?> builder, final TypeDescription typeDescription, final ClassLoader classLoader, final JavaModule module) {
return builder.visit(Advice.to(PulsarFunctionsAgentRule.class).on(named("handleMessage")));
}}));
}

@Advice.OnMethodEnter
public static void enter(final @Advice.Origin String origin, final @Advice.Argument(value = 0, typing = Typing.DYNAMIC) Object arg0, final @Advice.FieldValue(value = "function") Object function, final @Advice.FieldValue(value = "javaUtilFunction") Object javaUtilFunction, final @Advice.FieldValue(value = "context") Object context) {
if (isEnabled("PulsarFunctionsAgentRule", origin))
PulsarFunctionsAgentIntercept.handleMessageEnter(function != null ? function : javaUtilFunction, context, arg0);
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void exit(final @Advice.Origin String origin, @Advice.Return(typing = Typing.DYNAMIC) Object returned, final @Advice.Thrown Throwable thrown) {
if (isEnabled("PulsarFunctionsAgentRule", origin))
PulsarFunctionsAgentIntercept.handleMessageEnd(returned, thrown);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright 2020 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentracing.contrib.specialagent.rule.pulsar.functions;

import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;


public final class TextMapExtractAdapter implements TextMap {
final Map<String,String> map;

public TextMapExtractAdapter(final Map<String,String> map) {
this.map = map;
}

@Override
public Iterator<Map.Entry<String,String>> iterator() {
return map.entrySet().iterator();
}

@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("This class should be used only with Tracer.inject()!");
}
}
15 changes: 15 additions & 0 deletions rule/pulsar-functions/src/main/resources/otarules.mf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2020 The OpenTracing Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

io.opentracing.contrib.specialagent.rule.pulsar.functions.PulsarFunctionsAgentRule

0 comments on commit 4913a9c

Please sign in to comment.