-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingester Main #952
Ingester Main #952
Conversation
Codecov Report
@@ Coverage Diff @@
## master #952 +/- ##
=====================================
Coverage 100% 100%
=====================================
Files 138 139 +1
Lines 6373 6400 +27
=====================================
+ Hits 6373 6400 +27
Continue to review full report at Codecov.
|
85a2a48
to
30117f3
Compare
cmd/ingester/app/options.go
Outdated
|
||
const ( | ||
configPrefix = "ingester" | ||
suffixBrokers = ".brokers" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, I'm not sure about this. These configs shouldn't belong on the ingester itself, but should be encapsulated inside the storage factory. Do we have to do this like this since we haven't come up with a better story for the storage factory and flags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked in person, but I'll recap for those who may be curious. Early in the design process there was a discussion about wether the Ingester should operate on an abstract queue or for Kafka. We decided it will be designed for kafka, but be independent of the write storage. Therefore there isn't an issue with having kafka specific config on the Ingester as long as the write storage is handled similarly to the other jaeger applications.
cmd/ingester/main.go
Outdated
if err != nil { | ||
logger.Fatal("Failed to create span writer", zap.Error(err)) | ||
} | ||
unmarshaller := app.UnmarshallerFromType(options.Encoding) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see us copy pasting a lot of this kafka initialization logic when we OSS the indexer (is that part of the plan btw). Do you think it'll make sense to create something like https://github.com/jaegertracing/jaeger/blob/master/pkg/cassandra/config/config.go to handle all the kafka bootstrapping rather than dumping it all here in main?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, let me rephrase a bit, internally, we don't want to write this same main logic if we can help it. Ideally, we create a builder for the ingester similar to the agent builder https://github.com/jaegertracing/jaeger/blob/master/cmd/agent/app/builder.go and internally, we can just run builder.build() instead of copy pasting this same code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
e22c22c
to
60fc837
Compare
cmd/ingester/app/options.go
Outdated
encodingProto = "protobuf" | ||
|
||
defaultBroker = "127.0.0.1:9092" | ||
defaultTopic = "jaeger-ingester-spans" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the default topic used by the jaeger-collector
's kafka writers to emit spans?
cmd/ingester/main.go
Outdated
case <-signalsChannel: | ||
err := kafkaConsumer.Close() | ||
if err != nil { | ||
logger.Error("Failed to close span writer", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does closing the consumer also close the span writer?
cmd/ingester/main.go
Outdated
logger.Error("Failed to close span writer", zap.Error(err)) | ||
} | ||
} | ||
logger.Info("Jaeger Ingester is finishing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd also add a log line on L101 stating "Beginning shutdown of Jaeger Ingester" or something similar.
Redoing using a builder to allow for easy initialization of an ingester |
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
cmd/ingester/app/builder/builder.go
Outdated
// DefaultParallelism is the default parallelism for the span processor | ||
DefaultParallelism = 1000 | ||
// DefaultEncoding is the default span encoding | ||
DefaultEncoding = EncodingJSON |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, is this what we emit from the collectors? (Not protobuf?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake, default should be protobuf
cmd/ingester/app/builder/builder.go
Outdated
var unmarshaller kafka.Unmarshaller | ||
if b.Encoding == EncodingJSON { | ||
unmarshaller = kafka.NewJSONUnmarshaller() | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this approach because if someone passes in an arbitrary encoding, they would get a protobuf marshaller. I recommend checking for the type and returning an error if nothing matches.
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Which problem is this PR solving?
Short description of the changes