Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for es-tags-as-fields through env #105

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.jaegertracing.spark.dependencies.Utils;
import io.jaegertracing.spark.dependencies.model.KeyValue;
import io.jaegertracing.spark.dependencies.model.Process;
import io.jaegertracing.spark.dependencies.model.Reference;
import io.jaegertracing.spark.dependencies.model.Span;
import org.apache.parquet.Files;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -38,9 +43,28 @@ public class SpanDeserializer extends StdDeserializer<Span> {

// TODO Spark incorrectly serializes object mapper, therefore reinitializing here
private ObjectMapper objectMapper = JsonHelper.configure(new ObjectMapper());
private final boolean tagsAsFieldsAll;
private final List<String> tagsAsFields;
private final String tagsAsFieldsDotReplacement;

public SpanDeserializer() {
public SpanDeserializer() throws IOException {
super(Span.class);

tagsAsFieldsAll = Boolean.parseBoolean(Utils.getEnv("ES_TAGS_AS_FIELDS_ALL", "false"));
tagsAsFieldsDotReplacement = Utils.getEnv("ES_TAGS_AS_FIELDS_DOT_REPLACEMENT", "@");
tagsAsFields = new ArrayList<>();

if (!tagsAsFieldsAll) {
String configFile = Utils.getEnv("ES_TAGS_AS_FIELDS_CONFIG_FILE", null);
if (configFile != null) {
tagsAsFields.addAll(Files.readAllLines(new File(configFile), Charset.defaultCharset()));
}

String include = Utils.getEnv("ES_TAGS_AS_FIELDS_INCLUDE", null);
if (include != null) {
tagsAsFields.addAll(Arrays.asList(include.split(",")));
}
}
}

@Override
Expand Down Expand Up @@ -78,14 +102,26 @@ private List<KeyValue> addTagFields(List<KeyValue> tags, Map<String, Object> tag
result.addAll(tags);
List<KeyValue> collect = tagFields.entrySet().stream().map(stringObjectEntry -> {
KeyValue kv = new KeyValue();
kv.setKey(stringObjectEntry.getKey());
kv.setKey(mapTag(stringObjectEntry.getKey()));
kv.setValueString(stringObjectEntry.getValue().toString());
return kv;
}).collect(Collectors.toList());
result.addAll(collect);
return result;
}

private String mapTag(String value) {
if (tagsAsFieldsAll) {
return value.replaceAll(tagsAsFieldsDotReplacement, ".");
}

if (tagsAsFields.contains(value)) {
return value.replaceAll(tagsAsFieldsDotReplacement, ".");
}

return value;
}

private List<Reference> deserializeReferences(JsonNode node) throws JsonProcessingException {
List<Reference> references = new ArrayList<>();
JsonNode parentSpanID = node.get("parentSpanID");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2017 The Jaeger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.jaegertracing.spark.dependencies.elastic;

import static org.junit.Assert.assertEquals;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.jaegertracing.spark.dependencies.DependenciesSparkHelper;
import io.jaegertracing.spark.dependencies.elastic.json.JsonHelper;
import io.jaegertracing.spark.dependencies.model.Dependency;
import io.jaegertracing.spark.dependencies.model.Span;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;

public class TagsAsFieldsTest {
@Test
public void shouldMapAllTags() throws IOException, ReflectiveOperationException {
ObjectMapper objectMapper = JsonHelper.configure(new ObjectMapper());
updateEnv("ES_TAGS_AS_FIELDS_ALL", "true");

List<Span> spans = objectMapper.readValue(this.getClass().getClassLoader().getResource("spans.json"), new TypeReference<List<Span>>(){});

JavaSparkContext context = new JavaSparkContext("local[*]", "test");
JavaPairRDD<String, Iterable<Span>> traces = context.parallelize(spans)
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces, "placeholder");

context.stop();

assertEquals(10, dependencyLinks.size());
}

@SuppressWarnings("unchecked")
private static void updateEnv(String name, String val) throws ReflectiveOperationException {
Map<String, String> env = System.getenv();
Field field = env.getClass().getDeclaredField("m");
field.setAccessible(true);
((Map<String, String>) field.get(env)).put(name, val);
}
}