Skip to content

Commit

Permalink
Add Ingester Main, Builder, and Flags (jaegertracing#952)
Browse files Browse the repository at this point in the history
  • Loading branch information
davit-y authored and isaachier committed Sep 3, 2018
1 parent 15f1027 commit 53a99d5
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 0 deletions.
79 changes: 79 additions & 0 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"fmt"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if options.Encoding == app.EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else if options.Encoding == app.EncodingProto {
unmarshaller = kafka.NewProtobufUnmarshaller()
} else {
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
options.Encoding, app.EncodingProto, app.EncodingJSON)
}

spParams := processor.SpanProcessorParams{
Writer: spanWriter,
Unmarshaller: unmarshaller,
}
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
return nil, err
}

factoryParams := consumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}
processorFactory, err := consumer.NewProcessorFactory(factoryParams)
if err != nil {
return nil, err
}

consumerParams := consumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
}
return consumer.New(consumerParams)
}
97 changes: 97 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"
"strconv"
"strings"

"github.com/spf13/viper"

kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix fro the ingester flags
ConfigPrefix = "ingester"
// SuffixBrokers is a suffix for the brokers flag
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
SuffixTopic = ".topic"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixParallelism is a suffix for the parallelism flag
SuffixParallelism = ".parallelism"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
// DefaultTopic is the default kafka topic
DefaultTopic = "jaeger-spans"
// DefaultGroupID is the default consumer Group ID
DefaultGroupID = "jaeger-ingester"
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
)

// Options stores the configuration options for the Ingester
type Options struct {
kafkaConsumer.Configuration
Parallelism int
Encoding string
}

// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
ConfigPrefix+SuffixBrokers,
DefaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
ConfigPrefix+SuffixTopic,
DefaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
ConfigPrefix+SuffixGroupID,
DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.String(
ConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
}

// InitFromViper initializes Builder with properties from viper
func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
o.Topic = v.GetString(ConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
}
54 changes: 54 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/pkg/config"
)

func TestOptionsWithFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--ingester.topic=topic1",
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
"--ingester.group-id=group1",
"--ingester.parallelism=5",
"--ingester.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, EncodingJSON, o.Encoding)
}

func TestFlagDefaults(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
o.InitFromViper(v)

assert.Equal(t, DefaultTopic, o.Topic)
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
assert.Equal(t, DefaultGroupID, o.GroupID)
assert.Equal(t, DefaultParallelism, o.Parallelism)
assert.Equal(t, DefaultEncoding, o.Encoding)
}
130 changes: 130 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/pkg/config"
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
)

func main() {
var signalsChannel = make(chan os.Signal, 0)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-ingester",
Short: "Jaeger ingester consumes from Kafka and writes to storage",
Long: `Jaeger ingester consumes spans from a particular Kafka topic and writes them to all configured storage types.`,
RunE: func(cmd *cobra.Command, args []string) error {
err := flags.TryLoadConfigFile(v)
if err != nil {
return err
}

sFlags := new(flags.SharedFlags).InitFromViper(v)
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
if err != nil {
return err
}
hc, err := sFlags.NewHealthCheck(logger)
if err != nil {
logger.Fatal("Could not start the health check server.", zap.Error(err))
}

mBldr := new(pMetrics.Builder).InitFromViper(v)
baseFactory, err := mBldr.CreateMetricsFactory("jaeger")
if err != nil {
logger.Fatal("Cannot create metrics factory.", zap.Error(err))
}
metricsFactory := baseFactory.Namespace("ingester", nil)

storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}

options := app.Options{}
options.InitFromViper(v)
consumer, err := builder.CreateConsumer(logger, metricsFactory, spanWriter, options)
if err != nil {
logger.Fatal("Unable to create consumer", zap.Error(err))
}
consumer.Start()

hc.Ready()
select {
case <-signalsChannel:
logger.Info("Jaeger Ingester is starting to close")
err := consumer.Close()
if err != nil {
logger.Error("Failed to close consumer", zap.Error(err))
}
if closer, ok := spanWriter.(io.Closer); ok {
err := closer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
}
}
logger.Info("Jaeger Ingester has finished closing")
}
return nil
},
}

command.AddCommand(version.Command())
command.AddCommand(env.Command())

config.AddFlags(
v,
command,
flags.AddConfigFileFlag,
flags.AddFlags,
storageFactory.AddFlags,
pMetrics.AddFlags,
app.AddFlags,
)

if err := command.Execute(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

0 comments on commit 53a99d5

Please sign in to comment.