Skip to content

Commit

Permalink
upgrade opentracing api to 0.32.0
Browse files Browse the repository at this point in the history
Signed-off-by: Sergei Malafeev <sergeymalafeev@gmail.com>
  • Loading branch information
malafeev committed Apr 1, 2019
1 parent bcb5637 commit c16aceb
Show file tree
Hide file tree
Showing 24 changed files with 180 additions and 140 deletions.
24 changes: 12 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ dist: trusty

language: java
jdk:
- oraclejdk8
- oraclejdk8

cache:
directories:
- $HOME/.m2/repository
- $HOME/.m2/repository

before_install:
# allocate commits to CI, not the owner of the deploy key
Expand All @@ -34,13 +34,13 @@ after_success:

env:
global:
# Ex. travis encrypt BINTRAY_USER=your_github_account
- secure: "eng7HMp7Zd1X4o59WzsyfTSjvkUGsYLPNKR2RIdZiDXKd/eqTXzrxtQX5BldKtOxA8ZfLF8UWMBE7k9hJq+1eN9fYF6FgLClwZw6OfuLJ6C/kBUne9MA2h5pE+cuV8p6zZsF0AmUM5EKnsP++FmgLijMdMNG8Hx4yTvZeMtCyvF5GZ8jQFXgCnnn8K1h0rwKI7mQH5HPo3rOWd6OCZpRYic0/XIW4CLlY2g/7uUl9IDqBNnFKVbU6sdVrK+O648l5spjGHS9agm/5f1JpNMuVMQ37QlWr+T7HC+X6dhLv9dlqPtXSJp7VWV0AnRgzqc0M5T492+05Ng5nXnX6Fo+lIc+08JCfjq5tDvwuzhKjIepZUBR9jaZwUQxSgeNWv+xwjBJLQuMY16Nyu0f3QUmqnL2toZ+Co6+P0MUlqRTkkCoFKbL2Z/lpTj78Nct/QJnzuafUvCOgyZLYSwniVVP/cqwqJajnhAr2kXP77QiBTKrwxLCISgDwrnnHKwnIBZ6TLpCyoJ/v84c6qagzh/ltcfvJSfDj8LdrJP5sIvOB8tuRqUXWKkZDr5igXPEfUotv+VR82VqPCS8IZgT4Str87UAIU/J5v4NM7gWF207ri0lILjbU4WcVD80AmJHxWpCJHwk0jNnXpHTxSZACoRwNeju4NBkHeKN8McOdU3DOR8="
# Ex. travis encrypt BINTRAY_KEY=xxx-https://bintray.com/profile/edit-xxx --add
- secure: "FzmEYmwU8RnH5y394X8pFkKjEtvrnMvas4DgoJJRum+8IMMkFcuiHTe1gw+kVQTXtj9q/YQ0rcdhu80A+EllcIKwc7wxf3tysnckF/l3lfSpcCHyiZ/e58yjpXg1XB/yAiVbRKgjubBJHI0RN005/M86PUlEi0HtA01jRgQvr3aQpicPuyRjleIkVx490clyTdAN5HXAJNfalWXcsEIcIl4GmpqA4o0MJ20DBqWrQYp25fgWDP9WYC+8oIQExZUtR/KWXdPWolYRE3tlBj6C6XSUTynaPzjcwveVGvfXAis6+NafowrRi1ohFzbdx3MbKhXxIdfVidw69BP1NMxqkPQaWWXtNEjpNbIMJvNQL/sFGkvsamYbEa1e7VALxMtjR19BYieZdYW4r33FeYH11zVAK3I3EjqpdnaLB6cTwgyWnblzDqIJ5WGWZQiomBE2MwqsYx5FOM0oXbukQlTWR/RxT/97f3nt8GsE5EqPk/uDWAiPY383R2wIY8FaehwA7iq9nKwbr7oQO2ZNtcR8VW0JeFc3b0r4F8uL4GDP7Le3JDe+o6Aoi+SFu+adjVQtRCX4IwH3CznhBP4wkfcTIBLxSVGzW2Fe3YHki6lulf6jBs1Of3VpzdPADADIzd0k1nOUVYZuUQU5IUlbtIdiIXPNhfNCkP5uMHBnIV8lc24="
# Ex. travis encrypt GH_TOKEN=XXX-https://github.com/settings/tokens-XXX --add
- secure: "ns5to8XeWdzLtfc47Jh4XIIH97SRNaC3Rx6dlvM+7ic+WnVJETMneOcXc1yFfklwUzVj77ImMDCbgt+dE8nsnrKVsXYBT3jhCoz/S/m171ESlSPElrKrWUXDXpbxnsPlu3Ch2rhlkYAlN/AYC0vTwaDQvmKkoc6q7Uhqza8h4i6PxTIDwmv5zgR9EK+DqxoF3gpTv169hlQNXa1eIv5XeskjVdLtLPENxrUyaOJsEHljCEGtF9QZSicm+udSj1dbdoa/Fd5T2cFXvsdHaNZUT5I4WZTvBSw25BqhvkpKePuLV7naDoeUDhqnrBdtb2ZPFiR2icClm8HTpNH6MdTJ6mPWf8gl27P97D7Za0qFgriz9XkiuEpQJ0ukwm62EQmCxsdABB3VvxOG8Y6TUy2aHxDPQZrTszm+2jKxEVRzTlqkUbKaVDG0dMr8OejCML84JSWyYwq1XoO8vZ9gQEkmhY1Gp37SdaHD8qneX/rLiXCw4M/N9+Lx9+NzvaCrq6LpE7YBUznu4TBJFoz+/lv6osS/iviLIIBZM9PRinBn6NxPoeCQ9QzzyWikhRR/OnNyPZUiXI+toYdIt8BwulBvB6oNL7JPyQzmPoUo6+/CeYhHiAq+fQh2kYSSc4F4sowxFKY8DuDz3gMGdyQVds4+LHu523IR20QFaRTJGA9AD0I="
# Ex. travis encrypt SONATYPE_USER=your_sonatype_account
- secure: "VA+eY7OL05AW3KyGiKret73Gz79W/DfQsB2rV68pTK5ioWVl0HyiTxftSZt1nfuaedPy1Tka+nL9hZSKi3K+Yweh3PsAsY2hGAw7brenOXT0c0pFhr4L7ryuCRnt9v+qDdZQY9m3RY97SsD80dytRYeLKoruCyzqJfU95a/5BJFqKpR7+AMrlMcyOP5qMgFwPVTz9w2JmkNPKdkGUsQPE+rW6p5JdOTlhNQi4nxmaCsYPJ42aV9gZUdBoaWfbei3PZLlrFl6Y0HU3ve4MzwUPZ22py/pp8zo310Pg+9oBjEnBtjDAKhJLn74NAIJbU1XqeyE6ybRXY9S7x1BLn+YXolTerCpZZUIc/kJNGfMM6N2qKELrZArE3cGLKOimeEzMVM2GCjRlJWtyTANpaTCH5Zenciiqst6/OPyGHC5zUXrdhzsRV8MHSKWwMTO6XI9dVTODUyJDfhKg9uhfV3wmPUw6pc9zifxiju4tqaGykwFewT8uwZW2uzPBBKYm4GndfeyMm/yR5XIFOADoXk62JmqczYM+yq8r1V/+yv2F4iYvl1YWOTRdDLzxkmj8gJR43+czFsOf4v54kFpLJB3JHWz9LAj0O/lH+JL6IKGcAyAWc47WrHgib/lEp43ERKhI8P7kF+L1Qpifi2uZ1QM0DdvfigbVhJfbMyCFjxvtuw="
# Ex. travis encrypt SONATYPE_PASSWORD=your_sonatype_password
- secure: "VEV7do1Q4G5LEPwEicnAGMDg6kZKWrE6+HpWOBN8h//cuwRyXu+UcUvPhy5YQx2SSWYJtqem+PC32/3zgVrCpQWx0JPry8h+5V+IAyCtNv3Oi9udvggbY03T+DlpelXZzsUV0Fnbthc3m2uQgnkN6O7vswVLFXN7lO7GJESdEkapmWETp/v/IROsA9cwCBAGCkSgth4AEFUdihToCVd34PwiEPZV6ZmHYF9pZIWJZ+K+b23YiurGibyhdfChTvy6Y5bhG6VSrwofPXigVV5s/jy6QnBmdSYNjrKY9g1/ahFeNVhd/NTJU3SnyaEOJERiGtfw1xvvSFbCO8Hpo5QPx8fkMTXs2xpT9Cc0FuQkBF0kXeFy/lWklgW0vXcvbMo9R9ePTYul2oQtjxCrXApaLm2x9PsOpL7IkNT1acjkcOpBS3AIj2BjS2bSrPinf9eOj3bu8a9mhmG6LowczsqRHxY90JGfWoVGOZC5AvntvPteBRMpW46YY/z8atSNR24Fxp6cXNe6BOY32do+ZRVpHT/hu0jzaxHpYqyriUjWjtPq0rqfWEwrn/dtasBB7c/Xm0Clo1k7ePLK86QVIKGtYXqAE3S8yWD1f690nmLlMly8W1P2AGFhurydda86Oaz5mltsY39t1hLfpE2VGbsoLVJva/6Plt5HwbKI4jFYmrw="
# Ex. travis encrypt BINTRAY_USER=your_github_account
- secure: "eng7HMp7Zd1X4o59WzsyfTSjvkUGsYLPNKR2RIdZiDXKd/eqTXzrxtQX5BldKtOxA8ZfLF8UWMBE7k9hJq+1eN9fYF6FgLClwZw6OfuLJ6C/kBUne9MA2h5pE+cuV8p6zZsF0AmUM5EKnsP++FmgLijMdMNG8Hx4yTvZeMtCyvF5GZ8jQFXgCnnn8K1h0rwKI7mQH5HPo3rOWd6OCZpRYic0/XIW4CLlY2g/7uUl9IDqBNnFKVbU6sdVrK+O648l5spjGHS9agm/5f1JpNMuVMQ37QlWr+T7HC+X6dhLv9dlqPtXSJp7VWV0AnRgzqc0M5T492+05Ng5nXnX6Fo+lIc+08JCfjq5tDvwuzhKjIepZUBR9jaZwUQxSgeNWv+xwjBJLQuMY16Nyu0f3QUmqnL2toZ+Co6+P0MUlqRTkkCoFKbL2Z/lpTj78Nct/QJnzuafUvCOgyZLYSwniVVP/cqwqJajnhAr2kXP77QiBTKrwxLCISgDwrnnHKwnIBZ6TLpCyoJ/v84c6qagzh/ltcfvJSfDj8LdrJP5sIvOB8tuRqUXWKkZDr5igXPEfUotv+VR82VqPCS8IZgT4Str87UAIU/J5v4NM7gWF207ri0lILjbU4WcVD80AmJHxWpCJHwk0jNnXpHTxSZACoRwNeju4NBkHeKN8McOdU3DOR8="
# Ex. travis encrypt BINTRAY_KEY=xxx-https://bintray.com/profile/edit-xxx --add
- secure: "FzmEYmwU8RnH5y394X8pFkKjEtvrnMvas4DgoJJRum+8IMMkFcuiHTe1gw+kVQTXtj9q/YQ0rcdhu80A+EllcIKwc7wxf3tysnckF/l3lfSpcCHyiZ/e58yjpXg1XB/yAiVbRKgjubBJHI0RN005/M86PUlEi0HtA01jRgQvr3aQpicPuyRjleIkVx490clyTdAN5HXAJNfalWXcsEIcIl4GmpqA4o0MJ20DBqWrQYp25fgWDP9WYC+8oIQExZUtR/KWXdPWolYRE3tlBj6C6XSUTynaPzjcwveVGvfXAis6+NafowrRi1ohFzbdx3MbKhXxIdfVidw69BP1NMxqkPQaWWXtNEjpNbIMJvNQL/sFGkvsamYbEa1e7VALxMtjR19BYieZdYW4r33FeYH11zVAK3I3EjqpdnaLB6cTwgyWnblzDqIJ5WGWZQiomBE2MwqsYx5FOM0oXbukQlTWR/RxT/97f3nt8GsE5EqPk/uDWAiPY383R2wIY8FaehwA7iq9nKwbr7oQO2ZNtcR8VW0JeFc3b0r4F8uL4GDP7Le3JDe+o6Aoi+SFu+adjVQtRCX4IwH3CznhBP4wkfcTIBLxSVGzW2Fe3YHki6lulf6jBs1Of3VpzdPADADIzd0k1nOUVYZuUQU5IUlbtIdiIXPNhfNCkP5uMHBnIV8lc24="
# Ex. travis encrypt GH_TOKEN=XXX-https://github.com/settings/tokens-XXX --add
- secure: "ns5to8XeWdzLtfc47Jh4XIIH97SRNaC3Rx6dlvM+7ic+WnVJETMneOcXc1yFfklwUzVj77ImMDCbgt+dE8nsnrKVsXYBT3jhCoz/S/m171ESlSPElrKrWUXDXpbxnsPlu3Ch2rhlkYAlN/AYC0vTwaDQvmKkoc6q7Uhqza8h4i6PxTIDwmv5zgR9EK+DqxoF3gpTv169hlQNXa1eIv5XeskjVdLtLPENxrUyaOJsEHljCEGtF9QZSicm+udSj1dbdoa/Fd5T2cFXvsdHaNZUT5I4WZTvBSw25BqhvkpKePuLV7naDoeUDhqnrBdtb2ZPFiR2icClm8HTpNH6MdTJ6mPWf8gl27P97D7Za0qFgriz9XkiuEpQJ0ukwm62EQmCxsdABB3VvxOG8Y6TUy2aHxDPQZrTszm+2jKxEVRzTlqkUbKaVDG0dMr8OejCML84JSWyYwq1XoO8vZ9gQEkmhY1Gp37SdaHD8qneX/rLiXCw4M/N9+Lx9+NzvaCrq6LpE7YBUznu4TBJFoz+/lv6osS/iviLIIBZM9PRinBn6NxPoeCQ9QzzyWikhRR/OnNyPZUiXI+toYdIt8BwulBvB6oNL7JPyQzmPoUo6+/CeYhHiAq+fQh2kYSSc4F4sowxFKY8DuDz3gMGdyQVds4+LHu523IR20QFaRTJGA9AD0I="
# Ex. travis encrypt SONATYPE_USER=your_sonatype_account
- secure: "VA+eY7OL05AW3KyGiKret73Gz79W/DfQsB2rV68pTK5ioWVl0HyiTxftSZt1nfuaedPy1Tka+nL9hZSKi3K+Yweh3PsAsY2hGAw7brenOXT0c0pFhr4L7ryuCRnt9v+qDdZQY9m3RY97SsD80dytRYeLKoruCyzqJfU95a/5BJFqKpR7+AMrlMcyOP5qMgFwPVTz9w2JmkNPKdkGUsQPE+rW6p5JdOTlhNQi4nxmaCsYPJ42aV9gZUdBoaWfbei3PZLlrFl6Y0HU3ve4MzwUPZ22py/pp8zo310Pg+9oBjEnBtjDAKhJLn74NAIJbU1XqeyE6ybRXY9S7x1BLn+YXolTerCpZZUIc/kJNGfMM6N2qKELrZArE3cGLKOimeEzMVM2GCjRlJWtyTANpaTCH5Zenciiqst6/OPyGHC5zUXrdhzsRV8MHSKWwMTO6XI9dVTODUyJDfhKg9uhfV3wmPUw6pc9zifxiju4tqaGykwFewT8uwZW2uzPBBKYm4GndfeyMm/yR5XIFOADoXk62JmqczYM+yq8r1V/+yv2F4iYvl1YWOTRdDLzxkmj8gJR43+czFsOf4v54kFpLJB3JHWz9LAj0O/lH+JL6IKGcAyAWc47WrHgib/lEp43ERKhI8P7kF+L1Qpifi2uZ1QM0DdvfigbVhJfbMyCFjxvtuw="
# Ex. travis encrypt SONATYPE_PASSWORD=your_sonatype_password
- secure: "VEV7do1Q4G5LEPwEicnAGMDg6kZKWrE6+HpWOBN8h//cuwRyXu+UcUvPhy5YQx2SSWYJtqem+PC32/3zgVrCpQWx0JPry8h+5V+IAyCtNv3Oi9udvggbY03T+DlpelXZzsUV0Fnbthc3m2uQgnkN6O7vswVLFXN7lO7GJESdEkapmWETp/v/IROsA9cwCBAGCkSgth4AEFUdihToCVd34PwiEPZV6ZmHYF9pZIWJZ+K+b23YiurGibyhdfChTvy6Y5bhG6VSrwofPXigVV5s/jy6QnBmdSYNjrKY9g1/ahFeNVhd/NTJU3SnyaEOJERiGtfw1xvvSFbCO8Hpo5QPx8fkMTXs2xpT9Cc0FuQkBF0kXeFy/lWklgW0vXcvbMo9R9ePTYul2oQtjxCrXApaLm2x9PsOpL7IkNT1acjkcOpBS3AIj2BjS2bSrPinf9eOj3bu8a9mhmG6LowczsqRHxY90JGfWoVGOZC5AvntvPteBRMpW46YY/z8atSNR24Fxp6cXNe6BOY32do+ZRVpHT/hu0jzaxHpYqyriUjWjtPq0rqfWEwrn/dtasBB7c/Xm0Clo1k7ePLK86QVIKGtYXqAE3S8yWD1f690nmLlMly8W1P2AGFhurydda86Oaz5mltsY39t1hLfpE2VGbsoLVJva/6Plt5HwbKI4jFYmrw="
6 changes: 4 additions & 2 deletions opentracing-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>opentracing-kafka-parent</artifactId>
<groupId>io.opentracing.contrib</groupId>
<version>0.0.21-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

package io.opentracing.contrib.kafka;

import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.function.BiFunction;

/**
* @author Jordan J Lopez
* Returns a string to be used as the name of the spans, based on
* the operation preformed and the record the span is based off of.
* @author Jordan J Lopez Returns a string to be used as the name of the spans, based on the
* operation preformed and the record the span is based off of.
*/
public class ClientSpanNameProvider {

Expand All @@ -32,11 +30,14 @@ public class ClientSpanNameProvider {
public static BiFunction<String, ProducerRecord, String> PRODUCER_OPERATION_NAME =
(operationName, producerRecord) -> replaceIfNull(operationName, "unknown");

public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_OPERATION_NAME(final String prefix) {
public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_OPERATION_NAME(
final String prefix) {
return (operationName, consumerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(operationName, "unknown");
}
public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_OPERATION_NAME(final String prefix) {

public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_OPERATION_NAME(
final String prefix) {
return (operationName, producerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(operationName, "unknown");
}
Expand All @@ -47,29 +48,35 @@ public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_OPERA
public static BiFunction<String, ProducerRecord, String> PRODUCER_TOPIC =
(operationName, producerRecord) -> replaceIfNull(producerRecord, "unknown");

public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_TOPIC(final String prefix) {
public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_TOPIC(
final String prefix) {
return (operationName, consumerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(consumerRecord, "unknown");
}
public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_TOPIC(final String prefix) {

public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_TOPIC(
final String prefix) {
return (operationName, producerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(producerRecord, "unknown");
}

// Operation Name and Topic as Span Name
public static BiFunction<String, ConsumerRecord, String> CONSUMER_OPERATION_NAME_TOPIC =
(operationName, consumerRecord) -> replaceIfNull(operationName, "unknown")
+ " - " + replaceIfNull(consumerRecord, "unknown");
+ " - " + replaceIfNull(consumerRecord, "unknown");
public static BiFunction<String, ProducerRecord, String> PRODUCER_OPERATION_NAME_TOPIC =
(operationName, producerRecord) -> replaceIfNull(operationName, "unknown")
+ " - " + replaceIfNull(producerRecord, "unknown");
+ " - " + replaceIfNull(producerRecord, "unknown");

public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(final String prefix) {
public static BiFunction<String, ConsumerRecord, String> CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(
final String prefix) {
return (operationName, consumerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(operationName, "unknown")
+ " - " + replaceIfNull(consumerRecord, "unknown");
}
public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(final String prefix) {

public static BiFunction<String, ProducerRecord, String> PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(
final String prefix) {
return (operationName, producerRecord) -> replaceIfNull(prefix, "")
+ replaceIfNull(operationName, "unknown")
+ " - " + replaceIfNull(producerRecord, "unknown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public HeadersMapExtractAdapter(Headers headers, boolean second) {
}
} else {
byte[] headerValue = header.value();
map.put(header.key(), headerValue == null ? null : new String(headerValue, StandardCharsets.UTF_8));
map.put(header.key(),
headerValue == null ? null : new String(headerValue, StandardCharsets.UTF_8));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ private static Map<String, Object> errorLogs(Throwable throwable) {
return errorLogs;
}

private static void setCommonTags(Span span){
Tags.COMPONENT.set(span, COMPONENT_NAME);
Tags.PEER_SERVICE.set(span, KAFKA_SERVICE);
private static void setCommonTags(Span span) {
Tags.COMPONENT.set(span, COMPONENT_NAME);
Tags.PEER_SERVICE.set(span, KAFKA_SERVICE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
SpanDecorator.onError(exception, span);
}

try (Scope ignored = tracer.scopeManager().activate(span, true)) {
try (Scope ignored = tracer.scopeManager().activate(span)) {
if (callback != null) {
callback.onCompletion(metadata, exception);
}
} finally {
span.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void assign(Collection<TopicPartition> partitions) {
}

@Override
@Deprecated
public ConsumerRecords<K, V> poll(long timeout) {
ConsumerRecords<K, V> records = consumer.poll(timeout);

Expand Down Expand Up @@ -286,6 +287,7 @@ public void close() {
}

@Override
@Deprecated
public void close(long l, TimeUnit timeUnit) {
consumer.close(l, timeUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.List;
Expand Down Expand Up @@ -112,9 +113,9 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
record.headers());
*/

try (Scope scope = TracingKafkaUtils
.buildAndInjectSpan(record, tracer, producerSpanNameProvider)) {
Callback wrappedCallback = new TracingCallback(callback, scope.span(), tracer);
Span span = TracingKafkaUtils.buildAndInjectSpan(record, tracer, producerSpanNameProvider);
try (Scope ignored = tracer.activateSpan(span)) {
Callback wrappedCallback = new TracingCallback(callback, span, tracer);
return producer.send(record, wrappedCallback);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.opentracing.contrib.kafka;

import io.opentracing.References;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
Expand Down Expand Up @@ -79,11 +78,11 @@ static void injectSecond(SpanContext spanContext, Headers headers,
new HeadersMapInjectAdapter(headers, true));
}

public static <K, V> Scope buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer) {
public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer) {
return buildAndInjectSpan(record, tracer, ClientSpanNameProvider.PRODUCER_OPERATION_NAME);
}

public static <K, V> Scope buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer,
public static <K, V> Span buildAndInjectSpan(ProducerRecord<K, V> record, Tracer tracer,
BiFunction<String, ProducerRecord, String> producerSpanNameProvider) {

String producerOper =
Expand All @@ -98,17 +97,17 @@ public static <K, V> Scope buildAndInjectSpan(ProducerRecord<K, V> record, Trace
spanBuilder.asChildOf(spanContext);
}

Scope scope = spanBuilder.startActive(false);
SpanDecorator.onSend(record, scope.span());
Span span = spanBuilder.start();
SpanDecorator.onSend(record, span);

try {
TracingKafkaUtils.inject(scope.span().context(), record.headers(), tracer);
TracingKafkaUtils.inject(span.context(), record.headers(), tracer);
} catch (Exception e) {
// it can happen if headers are read only (when record is sent second time)
logger.error("failed to inject span context. sending record second time?", e);
}

return scope;
return span;
}

public static <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record, Tracer tracer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.opentracing.contrib.kafka;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
Expand All @@ -24,9 +23,7 @@ public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K,

@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
try (Scope scope = TracingKafkaUtils.buildAndInjectSpan(producerRecord, GlobalTracer.get())) {
scope.span().finish();
}
TracingKafkaUtils.buildAndInjectSpan(producerRecord, GlobalTracer.get()).finish();
return producerRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@
*/
package io.opentracing.contrib.kafka;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.Map.Entry;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Test;

import java.util.Map.Entry;

import static org.junit.Assert.*;


public class HeadersMapExtractAdapterTest {

@Test
public void verifyNullHeaderHandled() {
Headers headers = new RecordHeaders();
headers.add("test_null_header", null);
HeadersMapExtractAdapter headersMapExtractAdapter = new HeadersMapExtractAdapter(headers, false);
HeadersMapExtractAdapter headersMapExtractAdapter = new HeadersMapExtractAdapter(headers,
false);
Entry<String, String> header = headersMapExtractAdapter.iterator().next();
assertNotNull(header);
assertEquals(header.getKey(), "test_null_header");
Expand Down
Loading

0 comments on commit c16aceb

Please sign in to comment.