Skip to content

Commit

Permalink
Ingester: add default healthcheck port, add a configurable metrics ht…
Browse files Browse the repository at this point in the history
…tp handler and make Kafka flags the same as in the Collector

Signed-off-by: Louis-Etienne Dorval <louis-etienne.dorval@ticketmaster.com>
  • Loading branch information
Louis-Etienne Dorval committed Sep 28, 2018
1 parent d73db1e commit fecd429
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 13 deletions.
42 changes: 29 additions & 13 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,22 @@ const (
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix fro the ingester flags
// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the ingester flags
KafkaConfigPrefix = "kafka"
// SuffixBrokers is a suffix for the brokers flag
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
SuffixTopic = ".topic"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixParallelism is a suffix for the parallelism flag
SuffixParallelism = ".parallelism"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixParallelism is a suffix for the parallelism flag
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
SuffixHTTPPort = ".http-port"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
Expand All @@ -54,44 +58,56 @@ const (
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
DefaultHTTPPort = 14271
// IngesterDefaultHealthCheckHTTPPort is the default HTTP Port for health check
IngesterDefaultHealthCheckHTTPPort = 14270
)

// Options stores the configuration options for the Ingester
type Options struct {
kafkaConsumer.Configuration
Parallelism int
Encoding string
// IngesterHTTPPort is the port that the ingester service listens in on for http requests
IngesterHTTPPort int
}

// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
ConfigPrefix+SuffixBrokers,
KafkaConfigPrefix+SuffixBrokers,
DefaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
ConfigPrefix+SuffixTopic,
KafkaConfigPrefix+SuffixTopic,
DefaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
ConfigPrefix+SuffixGroupID,
KafkaConfigPrefix+SuffixGroupID,
DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.String(
ConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
flagSet.Int(
ConfigPrefix+SuffixHTTPPort,
DefaultHTTPPort,
"The http port for the ingester service")

}

// InitFromViper initializes Builder with properties from viper
func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
o.Topic = v.GetString(ConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
o.Brokers = strings.Split(v.GetString(KafkaConfigPrefix+SuffixBrokers), ",")
o.Topic = v.GetString(KafkaConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConfigPrefix + SuffixGroupID)
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort)
}
24 changes: 24 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/gorilla/mux"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
Expand All @@ -31,7 +34,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
)
Expand Down Expand Up @@ -90,6 +95,23 @@ func main() {
}
consumer.Start()

r := mux.NewRouter()
if h := mBldr.Handler(); h != nil {
logger.Info("Registering metrics handler with HTTP server", zap.String("route", mBldr.HTTPRoute))
r.Handle(mBldr.HTTPRoute, h)
}
httpPortStr := ":" + strconv.Itoa(options.IngesterHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

logger.Info("Starting HTTP server", zap.Int("http-port", options.IngesterHTTPPort))

go func() {
if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()

hc.Ready()
<-signalsChannel
logger.Info("Shutting down")
Expand All @@ -111,6 +133,8 @@ func main() {
command.AddCommand(version.Command())
command.AddCommand(env.Command())

flags.SetDefaultHealthCheckPort(app.IngesterDefaultHealthCheckHTTPPort)

config.AddFlags(
v,
command,
Expand Down

0 comments on commit fecd429

Please sign in to comment.