New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Cassandra and Elasticserch spark job #1
Conversation
@yurishkuro could you please review? The beauty of this is that it does not require any other service e.g. (kafka). Later some parts could be reused for new data pipeline processing. (e.g. the job will be the same but it will acquire data from other sources) |
microsUpper); | ||
|
||
JavaSparkContext sc = new JavaSparkContext(conf); | ||
List<Dependency> dependencies = javaFunctions(sc).cassandraTable(keyspace, "traces", mapRowTo(Span.class)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jiri-Kremser could you please have a look at these lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you please have a look at these lines?
nice lines :)
perhaps
.flatMapValues(foo)
.values()
.mapToPair(bar)
could be all expressed in the first flatMapValues
.
RDDs are fun to write, but can be slow sometimes, if you want optimization for free, consider using the DataFrame abstraction in should be in the C*-spark connector.
On my todo list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks good.
README.md
Outdated
and store them for later presentation in the UI. | ||
|
||
This job parses all traces in the current day in UTC time. This means you should schedule it to run | ||
just prior to midnight UTC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I follow this. The job we run internally is also based on UTC, but it runs after UTC midnight and processes the previous day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it would be more accurate to have something like:
This job parses all traces on a given day, based on UTC. By default, it processes the current day, but other days can be explicitly specified.
README.md
Outdated
@@ -0,0 +1,73 @@ | |||
# Jaeger Spark dependencies | |||
|
|||
This is a Spark job that will collect spans from your datastore, analyze links between services, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this emphasize that only Cassandra is currently supported?
Also, I would make it clear that this will only work with data collected via Jaeger client libraries using the default model of single-host spans, i.e. it will not work with spans collected from most Zipkin or jaeger-configured-as-zipkin libraries that share the span ID between client and server.
microsUpper); | ||
|
||
JavaSparkContext sc = new JavaSparkContext(conf); | ||
List<Dependency> dependencies = javaFunctions(sc).cassandraTable(keyspace, "traces", mapRowTo(Span.class)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mapRowTo(Span.class)
Ha, cheating! Nice! Was a lot more painful in Go.
@jpkrohling @objectiser could you please review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only half way through, but I wanted to share my comments before leaving for the weekend :)
README.md
Outdated
and store them for later presentation in the UI. | ||
|
||
This job parses all traces in the current day in UTC time. This means you should schedule it to run | ||
just prior to midnight UTC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it would be more accurate to have something like:
This job parses all traces on a given day, based on UTC. By default, it processes the current day, but other days can be explicitly specified.
README.md
Outdated
Cassandra is used when `STORAGE_TYPE=cassandra`. | ||
|
||
* `CASSANDRA_KEYSPACE`: The keyspace to use. Defaults to "zipkin". | ||
* `CASSANDRA_CONTACT_POINTS`: Comma separated list of hosts / ip addresses part of Cassandra cluster. Defaults to localhost |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/CASSANDRA_CONTACT_POINTS/CASSANDRA_HOSTS/
? The _HOSTS
version seems to be the "standard".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These terms are similar, however not interchangeable. Contact points
has slightly different meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the person using this job supposed to know the difference? Is this something common in Cassandra world?
README.md
Outdated
Example usage: | ||
|
||
```bash | ||
$ STORAGE_TYPE=cassandra CASSANDRA_USERNAME=user CASSANDRA_PASSWORD=pass java -jar jaeager-dependencies.jar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/jaeager/jaeger/
README.md
Outdated
$ STORAGE_TYPE=cassandra CASSANDRA_USERNAME=user CASSANDRA_PASSWORD=pass java -jar jaeager-dependencies.jar | ||
``` | ||
### Elasticsearch | ||
Elasticsearch is used when `STORAGE_TYPE=cassandra`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, what? Did you mean STORAGE_TYPE=elasticsearch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little puzzle to keep readers in focus 😆 .
String[] jars; | ||
|
||
// By default the job only works on traces whose first timestamp is today | ||
long day = Utils.midnightUTC(System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have no excuse on this project :) You can use Clock
instead of System.current.....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What method do you mean exactly? This https://docs.oracle.com/javase/8/docs/api/java/time/Clock.html#millis--?
I think for this use case call system.millis is fine. Maybe a simple refactor with passing a Clock/Date(I am not sure here) to Job would simplify something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that your code shouldn't care if it's UTC or not. You'd only get a clock, and whoever created the clock would set the timezone. This code here would only call the method you mentioned (#millis()
), if that's what you need.
public Builder hosts(String hosts) { | ||
Utils.checkNoTNull(hosts, "hosts"); | ||
this.hosts = hosts; | ||
sparkProperties.put("es.nodes.wan.only", "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be on the builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, I took the builder from zipkin. I suppose if it was needed it would be in the builder.
JavaEsSpark.saveJsonToEs(javaSparkContext.parallelize(Collections.singletonList(json)), resource); | ||
} | ||
|
||
static String parseHosts(String hosts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you share the motivation for this ? Looks like the only thing it's doing is add the port to the host, in case it's missing. Am I right? In any case, examples of the input and output would be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's taken from zipkin. It's a little bit magical, I am wondering if we should to keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it and see if it breaks :) Code that does nothing shouldn't exist.
|
||
public String getTimestamp() { | ||
// Jaeger ES dependency storage uses RFC3339Nano for timestamp | ||
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a constant for this format already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, Java 8 could have helped:
https://docs.oracle.com/javase/8/docs/api/java/time/OffsetDateTime.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(btw, I just noticed now that the method name is misleading: this is not returning a timestamp, but a date as string).
|
||
@Override | ||
protected void waitBetweenTraces() throws InterruptedException { | ||
// TODO otherwise elastic drops some spans |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a real TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well yes, I think we could do it completely without waits but it's more complicated therefore a would like to leave a comment here for further improvements.
protected void deriveDependencies() { | ||
ElasticsearchDependenciesJob.builder() | ||
.hosts("http://localhost:" + elasticsearch.getMappedPort(9200)) | ||
.day(System.currentTimeMillis()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know what I'm going to comment here, don't you? :) (Clock vs. System.currentTimeMillis)
.groupByKey(); | ||
|
||
// TODO remove for debug purposes | ||
traces.foreach(stringIterableTuple2 -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, then put something like "Remove before merging (or after 2017-10-01)", so that you signal to the next maintainer that this can be safely removed if you forget to remove "now".
JavaEsSpark.saveJsonToEs(javaSparkContext.parallelize(Collections.singletonList(json)), resource); | ||
} | ||
|
||
static String parseHosts(String hosts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it and see if it breaks :) Code that does nothing shouldn't exist.
|
||
public String getTimestamp() { | ||
// Jaeger ES dependency storage uses RFC3339Nano for timestamp | ||
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, Java 8 could have helped:
https://docs.oracle.com/javase/8/docs/api/java/time/OffsetDateTime.html
|
||
public String getTimestamp() { | ||
// Jaeger ES dependency storage uses RFC3339Nano for timestamp | ||
return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(btw, I just noticed now that the method name is misleading: this is not returning a timestamp, but a date as string).
*/ | ||
public class TracersGenerator { | ||
|
||
public static class Tuple<A, B> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you do have a Tuple
class somewhere else, don't you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in scala, This module does not depend on it.
return; | ||
} | ||
// +1 to assure that we generate all exact number of nodes | ||
int numOfDescendants = descendantsRandom.nextInt(maxNumberOfDescendants) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a random number of descendants? Wouldn't a fixed number do it? Adding randomness to a test should be done only when it adds real value, as it's hard to debug a test failure when something is random.
See https://stackoverflow.com/questions/36877897/detected-guava-issue-1635-which-indicates-that-a-version-of-guava-less-than-16 | ||
TODO: When cassandra-driver-core 4.x is out, revisit this as it no longer uses guava | ||
--> | ||
<relocations> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this module supposed to be used as a library to a downstream/client module? If so, then it might be a good idea to relocate everything.
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this build publishing both the shaded and no-shaded versions?
|
||
String storage = System.getenv("STORAGE"); | ||
if (storage == null) { | ||
throw new IllegalArgumentException("Missing environmental variable STORAGE"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the this level, it's better to just print out the message and exit with System.exit(rc)
than throwing an exception.
Forgot to mention on the review itself, so, I'll leave it here as comment: some of those files are based on Zipkin ones, so, it would be polite to mention this on the readme. Besides, it would also be nice to either not add your |
@jpkrohling thanks, done in the pavolloffay@8e48ca4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments to consider. LGTM.
README.md
Outdated
### Cassandra | ||
Cassandra is used when `STORAGE_TYPE=cassandra`. | ||
|
||
* `CASSANDRA_KEYSPACE`: The keyspace to use. Defaults to "jaeger_v1_test". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the keyspace default _test
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's the default what create.sh
creates. Maybe jaeger_v1_dc1
would be better (this creates out k8s deployment)
Elasticsearch is used when `STORAGE_TYPE=elasticsearch`. | ||
|
||
* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to jaeger. | ||
The final index look like jaeger-span-yyyy-DD-mm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this require a separate index per day - can't the data be stored in a single index with timestamps to distinguish the values per day?
How do the indexes get cleaned up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how jaeger ES storage implementation works
|
||
@Override | ||
protected void deriveDependencies() throws Exception { | ||
// flush all date to the storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean data
?
/** | ||
* @author Pavol Loffay | ||
*/ | ||
public class Process implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether it would be better to have the java model is some independent package, as it could be generally useful for Java tooling wanting to read data from the jaeger storage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, I would like to raise some issues about generic tests and reusable parts in java.
@Override | ||
public void createChildSpan(TracingWrapper<JaegerWrapper> parent) { | ||
io.opentracing.Tracer.SpanBuilder spanBuilder = tracer.buildSpan(UUID.randomUUID().toString().replace("-", "")) | ||
.ignoreActiveSpan(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the ignoreActiveSpan
required when you are setting the parent and starting manual?
} | ||
} | ||
|
||
static String pathToUberJar() throws UnsupportedEncodingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be UberJar
? Could it be JaegerJar
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more uber jar, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be I should have asked what is the Uber jar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a fat jar with all dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok - overloaded use of the term uber
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ROLF now I get it why did you ask... 😆
Node<ZipkinWrapper> root = treeGenerator.generateTree(150, 3); | ||
Traversals.inorder(root, (node, parent) -> node.getTracingWrapper().get().getSpan().finish()); | ||
waitBetweenTraces(); | ||
treeGenerator.getTracers().forEach(tracer -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be moved onto the TreeGenerator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jaeger tracer does not implement closeable
I will create a PR and add TODO here.
@jpkrohling I have updated it to use java 8 date API pavolloffay@f918317 |
The usage of the Java 8 date/time API could be better, but it's a strong first version. One future improvement could have been about setting the timezone on the main class, and let everything else be agnostic of the timezone. But that's not needed/required for this PR. I think it's very close to a LGTM, missing only the removal of the |
The current functionality is what OpenZipkin uses, and is probably enough. If you have a specific requirement feel free to open issue on this repo right away. About the relocation, see todo. There is a problem with guava. Based on the maturity of the original project I suspect it's there for a reason. |
I will merge soon and transform the repo to the jaegertracing. If you want a different repo name please comment. cc @yurishkuro |
@yurishkuro @Jiri-Kremser @objectiser @jpkrohling thanks. |
Upgrade jackson
This is dependencies spark job ported from https://github.com/openzipkin/zipkin-dependencies to use Jaeger model.