Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka as a Storage Plugin #862

Merged
merged 18 commits into from Jun 14, 2018

Conversation

Projects
None yet
5 participants
@davit-y
Copy link
Contributor

commented Jun 8, 2018

Which problem is this PR solving?

Short description of the changes

  • Allows spans to be produced to Kafka
  • Toggled by changing storage type to Kafka

thriftSpan.Write(h.protocol)

return h.memBuffer.Bytes(), nil

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member
  • does membuffer need to be reset between writes?
  • can this be called concurrently?
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const configPrefix = "kafka"

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

why is it defined here, but used in options.go?

// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+"-"+optionBrokers,

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

why not add dash to the prefix constant for better readability?


// NewSpanWriter initiates and returns a new kafka spanwriter
func NewSpanWriter(producer sarama.AsyncProducer, topic string, factory metrics.Factory) *SpanWriter {
writeMetrics := spanWriterMetrics{

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

let's use reflection-based initializer

}

// NewSpanWriter initiates and returns a new kafka spanwriter
func NewSpanWriter(producer sarama.AsyncProducer, topic string, factory metrics.Factory) *SpanWriter {

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

shouldn't marshaller be passed to the constructor?

// SpanWriter writes spans to kafka. Implements spanstore.Writer
type SpanWriter struct {
metrics spanWriterMetrics
producer sarama.AsyncProducer

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

is this an interface or struct? we should depend on interface, otherwise you cannot test this file

This comment has been minimized.

return nil
}

// Close closes SpanWriter by closing producer

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 8, 2018

Member

// Close shuts down SpanWriter by closing the Kafka producer, which closes its channels causing go-routines to exit.

There may be a race condition here if producer closes the input channel but someone calls WriteSpan after that. May need to keep an atomic flag to stop writes after Close()

This comment has been minimized.

Copy link
@vprithvi

vprithvi Jun 8, 2018

Member

If the input channel is closed, shouldn't the send to the closed channel panic?
I think we should revisit how releasing resources work, and do something similar to the following:

  • Set healthcheck to unavailable
  • Close tchannel and http servers
  • Close span writers

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 13, 2018

Collaborator

I'm gonna allow this to land and punt on this. This rarely is a concern given how we do deploys internally anyway. And who does graceful shutdowns anyway.

flagSet.String(
configPrefix+suffixBrokers,
"127.0.0.1:9092",
"The comma-separated list of kafka brokers. i.e. 127.0.0.1:9092")

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 8, 2018

Collaborator

nit: provide an actual comma-separated list as an example

@davit-y davit-y force-pushed the davit-y:add-kafka-writer branch from 9f99d7e to 0d07df6 Jun 8, 2018

@davit-y

This comment has been minimized.

Copy link
Contributor Author

commented Jun 8, 2018

I had a misconfigured git config locally, so I needed to squash my commits to pass DCO sign-off checker.

@yurishkuro

This comment has been minimized.

Copy link
Member

commented Jun 8, 2018

  • find a coming X before your changes started
  • git reset --soft X
  • all your changes are now pending and staged
  • git commit -s
  • git push --force
@davit-y

This comment has been minimized.

Copy link
Contributor Author

commented Jun 8, 2018

Thanks Yuri, that's actually exactly what I ended up doing since my fork was behind origin/master.

@davit-y davit-y force-pushed the davit-y:add-kafka-writer branch from 270d1a4 to 4532ee5 Jun 11, 2018

@davit-y davit-y changed the title [WIP] Add Kafka as a Storage Plugin Add Kafka as a Storage Plugin Jun 12, 2018

glide.yaml Outdated
@@ -46,7 +46,7 @@ import:
- package: github.com/spf13/cobra
version: 0.0.1
- package: github.com/spf13/viper
version: ^1
version: 1.0.0

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

why was this required?

This comment has been minimized.

Copy link
@davit-y

davit-y Jun 12, 2018

Author Contributor

Fixes dependency issues with updated version of viper

}

// NewFactory creates a new Factory.
func NewFactory() *Factory {

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

try to have the constructor right beneath the definition of the struct

return err
}
f.producer = p

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

not a big fan of random empty lines


// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

Technically this is fine, but do you mind splitting this file up into factory and options? More consistent with the other storage

)

// Checks that Kafka Factory conforms to storage.Factory API
var _ storage.Factory = new(Factory)

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

👍

model.Bool("bool", true),
model.Binary("binary", []byte("Omar")),
}
span := &model.Span{

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

this is a pretty elaborative span you've built only to confirm that bytes isn't nil after marshaling. Maybe just use a basic span instead? Or is there an unmarshaler that we can use to convert the span back and compare to the original?

// NewSpanWriter initiates and returns a new kafka spanwriter
func NewSpanWriter(producer sarama.AsyncProducer, marshaller Marshaller, topic string, factory metrics.Factory) *SpanWriter {
writeMetrics := spanWriterMetrics{
SpansWrittenSuccess: factory.Counter("kafka_spans_written", map[string]string{"status": "success"}),

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

i'll get back to you on this but I think we've generally used: "error": "false" and "error": "true" for these metrics. Is there somewhere else you've seen success vs failure status tags?

marshaller := &mocks.Marshaller{}
marshaller.On("Marshal", mock.AnythingOfType("*model.Span")).Return([]byte{}, nil)

tags := model.KeyValues{

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

move the tags under the writerTest

writer: NewSpanWriter(producer, marshaller, "someTopic", serviceMetrics),
}

span := &model.Span{

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

is there anything special about this span? Can we move it out into a var at the top of the file and share it with the marshaller test?

func TestKafkaWriter(t *testing.T) {
withSpanWriter(t, func(span *model.Span, w *spanWriterTest) {

w.producer.ExpectInputAndSucceed()

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 12, 2018

Collaborator

mocks done right

@codecov

This comment has been minimized.

Copy link

commented Jun 12, 2018

Codecov Report

Merging #862 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@          Coverage Diff          @@
##           master   #862   +/-   ##
=====================================
  Coverage     100%   100%           
=====================================
  Files         121    125    +4     
  Lines        5959   6047   +88     
=====================================
+ Hits         5959   6047   +88
Impacted Files Coverage Δ
plugin/storage/factory_config.go 100% <ø> (ø) ⬆️
plugin/storage/kafka/factory.go 100% <100%> (ø)
plugin/storage/kafka/writer.go 100% <100%> (ø)
plugin/storage/kafka/marshaller.go 100% <100%> (ø)
plugin/storage/kafka/options.go 100% <100%> (ø)
plugin/storage/factory.go 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e9eed5a...ccf965f. Read the comment docs.

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
p, err := f.config.NewProducer()

This comment has been minimized.

Copy link
@jpkrohling

jpkrohling Jun 13, 2018

Member

Could you log the contents of f.config? Like:

logger.Info("Kafka storage configuration", zap.Any("configuration", f.config))
@@ -55,3 +55,7 @@ import:
- package: github.com/go-openapi/validate
- package: github.com/go-openapi/loads
- package: github.com/elazarl/go-bindata-assetfs
- package: github.com/Shopify/sarama
version: ^1.16.0
- package: golang.org/x/sys

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 13, 2018

Collaborator

There was some dependency strangeness that I debugged with Davit but couldn't figure out so we had to pin this. I'm gonna land this to unblock and I'll address this separately.

@black-adder

This comment has been minimized.

Copy link
Collaborator

commented Jun 13, 2018

Looks like things are broken: golang/go#25872

@yurishkuro

This comment has been minimized.

Copy link
Member

commented Jun 13, 2018

Someone posted a workaround - web-platform-tests/wpt.fyi@cf5af40, which probably can be simpler in our case if we simply pin x/tools to some version.


// Marshall encodes a span as a thrift byte array
func (h *thriftMarshaller) Marshal(span *model.Span) ([]byte, error) {
thriftSpan := jaeger.FromDomainSpan(span)

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 13, 2018

Collaborator

@yurishkuro @vprithvi @davit-y brought up a very good point that the thrift model doesn't have the process in the definition (https://github.com/jaegertracing/jaeger-idl/blob/master/thrift/jaeger.thrift#L51) (our internal kafka model does). We have a couple of options,

  1. Add process to the thrift model
  2. Pass a batch with 1 span instead
  3. Write a json marshaller for now

I've told Davit to go ahead with option 3 for now until we come to a consensus on how to move forward.

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 13, 2018

Member

my preference is to land my proto PR and use that. You can marshal domain model directly into []byte.

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 14, 2018

Member

proto model was merged

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Jun 14, 2018

Member

model/ids_test.go shows example of to/from []byte marshaling

@davit-y davit-y force-pushed the davit-y:add-kafka-writer branch 3 times, most recently from 54c286b to 7fad27a Jun 13, 2018


// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return nil, errors.New("kafka storage is write-only")

This comment has been minimized.

Copy link
@jpkrohling

jpkrohling Jun 14, 2018

Member

Pardon my ignorance here, but does it mean that we won't be seeing any traces on the UI when Kafka is the storage being used? Shouldn't it then be a "secondary writer", instead of "main storage"?

This comment has been minimized.

Copy link
@black-adder

black-adder Jun 14, 2018

Collaborator

Yes I think that's fair. Internally we're currently using kafka as the primary storage and we have another downstream ingestor service which takes the kafka spans and inserts them into cassandra and another separate service which handles the indexing in cassandra. This way, if cassandra is overburdened, the indexer will skip indexing to ensure that the ingestor can at least write the span. We don't expect all users of Jaeger to be doing this (the load might not require this elaborate setup) so they should be able to dual write from the collector to cassandra and kafka (for the data pipeline).

Currently, we don't have a good way of configuring multiple storages via flags (I guess you can pass in comma separated list instead and we'd generate two storages for you). That being said, I think it can be all addressed in a separate PR.

davit-y added some commits Jun 8, 2018

Add kafka as a new storage plugin
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Replace Options with Configuration and ProducerBuilder
Options was previously incorrectly used as Kafka will not have an
archiver. Adding the Configuration and ProducerBuilder structs makes
code structure of kafka factory consistent with ES and Cassandra
which allows for easier unit testing.

Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add factory unit test
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add API conformance check
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add SpanWriter unit test
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add .nocover for config
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add Marshaller unit test
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Use common sample span
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Format factory
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Undo viper version pin
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Fix dependency issues
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Dependency fix test
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add options file
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add options unit test
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Improve storage.factory coverage
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Log configuration
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Add JSON marshaller
Signed-off-by: Davit Yeghshatyan <davo@uber.com>

@davit-y davit-y force-pushed the davit-y:add-kafka-writer branch from 7fad27a to 3819fff Jun 14, 2018

Update glide after rebase
Signed-off-by: Davit Yeghshatyan <davo@uber.com>

@davit-y davit-y force-pushed the davit-y:add-kafka-writer branch from 3819fff to ccf965f Jun 14, 2018

@black-adder black-adder merged commit 8576606 into jaegertracing:master Jun 14, 2018

5 checks passed

DCO All commits have a DCO sign-off from the author
Details
WIP ready for review
Details
codecov/patch 100% of diff hit (target 100%)
Details
codecov/project 100% (+0%) compared to e9eed5a
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details

@davit-y davit-y referenced this pull request Jun 14, 2018

Merged

Use protobuf marshaller #868

This was referenced Jun 14, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.