From 1a27edcf3045e1f539852ab7b09fdbdc21c43227 Mon Sep 17 00:00:00 2001 From: Eran Raichstein Date: Sun, 20 Feb 2022 14:32:42 +0200 Subject: [PATCH] add health server (http) --- README.md | 1 + cmd/flowlogs2metrics/main.go | 16 +++++++--- go.mod | 1 + go.sum | 2 ++ pkg/config/config.go | 5 +++ pkg/health/health.go | 61 ++++++++++++++++++++++++++++++++++++ pkg/health/health_test.go | 59 ++++++++++++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 23 ++++++++++++++ 8 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 pkg/health/health.go create mode 100644 pkg/health/health_test.go diff --git a/README.md b/README.md index 29208d200..9a5b9ec03 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ Usage: Flags: --config string config file (default is $HOME/.flowlogs2metrics) + --health.port string Health server port (default "8080") -h, --help help for flowlogs2metrics --log-level string Log level: debug, info, warning, error (default "error") --pipeline.decode.aws string aws fields diff --git a/cmd/flowlogs2metrics/main.go b/cmd/flowlogs2metrics/main.go index 530f1b32b..b7d3a0cbb 100644 --- a/cmd/flowlogs2metrics/main.go +++ b/cmd/flowlogs2metrics/main.go @@ -22,6 +22,7 @@ import ( "fmt" jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/netobserv/flowlogs2metrics/pkg/health" "github.com/netobserv/flowlogs2metrics/pkg/pipeline" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" log "github.com/sirupsen/logrus" @@ -128,6 +129,7 @@ func initFlags() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName)) rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error") rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)") + rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port") rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)") rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API") rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API") @@ -160,23 +162,29 @@ func run() { mainPipeline *pipeline.Pipeline ) - // Starting log message + // Initial log message fmt.Printf("%s starting - version [%s]\n\n", filepath.Base(os.Args[0]), Version) - // Dump the configuration + + // Dump configuration dumpConfig() + // Setup (threads) exit manager utils.SetupElegantExit() - // creating a new pipeline + // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline() if err != nil { log.Fatalf("failed to initialize pipeline %s", err) os.Exit(1) } + // Starts the flows pipeline mainPipeline.Run() - // give all threads a chance to exit and then exit the process + // Start health report server + health.NewHealthServer(mainPipeline) + + // Give all threads a chance to exit and then exit the process time.Sleep(time.Second) log.Debugf("exiting main run") os.Exit(0) diff --git a/go.mod b/go.mod index 497588b41..efb91dde1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.17 require ( github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/go-kit/kit v0.12.0 + github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb github.com/ip2location/ip2location-go/v9 v9.2.0 github.com/json-iterator/go v1.1.12 github.com/mitchellh/mapstructure v1.4.3 diff --git a/go.sum b/go.sum index 525c3397e..c545a1f72 100644 --- a/go.sum +++ b/go.sum @@ -495,6 +495,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hashicorp/serf v0.9.3/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= +github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb h1:tsEKRC3PU9rMw18w/uAptoijhgG4EvlA5kfJPtwrMDk= +github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb/go.mod h1:NtmN9h8vrTveVQRLHcX2HQ5wIPBDCsZ351TGbZWgg38= github.com/hetznercloud/hcloud-go v1.22.0/go.mod h1:xng8lbDUg+xM1dgc0yGHX5EeqbwIq7UYlMWMTx3SQVg= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= diff --git a/pkg/config/config.go b/pkg/config/config.go index 12c98729c..fd35af85e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,11 @@ var ( type Options struct { PipeLine Pipeline + Health Health +} + +type Health struct { + Port string } type Pipeline struct { diff --git a/pkg/health/health.go b/pkg/health/health.go new file mode 100644 index 000000000..cae863371 --- /dev/null +++ b/pkg/health/health.go @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package health + +import ( + "github.com/heptiolabs/healthcheck" + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/netobserv/flowlogs2metrics/pkg/pipeline" + log "github.com/sirupsen/logrus" + "net" + "net/http" + "time" +) + +const defaultServerHost = "0.0.0.0" + +type Server struct { + handler healthcheck.Handler + address string +} + +func (hs *Server) Serve() { + for { + err := http.ListenAndServe(hs.address, hs.handler) + log.Errorf("http.ListenAndServe error %v", err) + time.Sleep(60 * time.Second) + } +} + +func NewHealthServer(pipeline *pipeline.Pipeline) *Server { + + handler := healthcheck.NewHandler() + address := net.JoinHostPort(defaultServerHost, config.Opt.Health.Port) + + handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive()) + handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady()) + + server := &Server{ + handler: handler, + address: address, + } + + go server.Serve() + + return server +} diff --git a/pkg/health/health_test.go b/pkg/health/health_test.go new file mode 100644 index 000000000..96a35be57 --- /dev/null +++ b/pkg/health/health_test.go @@ -0,0 +1,59 @@ +package health + +import ( + "fmt" + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/netobserv/flowlogs2metrics/pkg/pipeline" + "github.com/stretchr/testify/require" + "net/http" + "net/url" + "testing" + "time" +) + +func TestNewHealthServer(t *testing.T) { + readyPath := "/ready" + livePath := "/live" + + type args struct { + pipeline pipeline.Pipeline + port string + } + type want struct { + statusCode int + } + + tests := []struct { + name string + args args + want want + }{ + {name: "pipeline running", args: args{pipeline: pipeline.Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}}, + {name: "pipeline not running", args: args{pipeline: pipeline.Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + config.Opt.Health.Port = tt.args.port + expectedAddr := fmt.Sprintf("0.0.0.0:%s", config.Opt.Health.Port) + server := NewHealthServer(&tt.args.pipeline) + require.NotNil(t, server) + require.Equal(t, expectedAddr, server.address) + + client := &http.Client{} + + time.Sleep(time.Second) + readyURL := url.URL{Scheme: "http", Host: expectedAddr, Path: readyPath} + var resp, err = client.Get(readyURL.String()) + require.NoError(t, err) + require.Equal(t, tt.want.statusCode, resp.StatusCode) + + liveURL := url.URL{Scheme: "http", Host: expectedAddr, Path: livePath} + resp, err = client.Get(liveURL.String()) + require.NoError(t, err) + require.Equal(t, tt.want.statusCode, resp.StatusCode) + + }) + } +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 5ff346064..8ae8bcbc1 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -18,6 +18,8 @@ package pipeline import ( + "fmt" + "github.com/heptiolabs/healthcheck" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/encode" @@ -40,6 +42,7 @@ type Pipeline struct { Writer write.Writer Extractor extract.Extractor Encoder encode.Encoder + IsRunning bool } func getIngester() (ingest.Ingester, error) { @@ -150,7 +153,9 @@ func NewPipeline() (*Pipeline, error) { } func (p *Pipeline) Run() { + p.IsRunning = true p.Ingester.Ingest(p.Process) + p.IsRunning = false } // Process is called by the Ingester function @@ -170,3 +175,21 @@ func (p Pipeline) Process(entries []interface{}) { extracted := p.Extractor.Extract(transformed) _ = p.Encoder.Encode(extracted) } + +func (p *Pipeline) IsReady() healthcheck.Check { + return func() error { + if !p.IsRunning { + return fmt.Errorf("pipeline is not running") + } + return nil + } +} + +func (p *Pipeline) IsAlive() healthcheck.Check { + return func() error { + if !p.IsRunning { + return fmt.Errorf("pipeline is not running") + } + return nil + } +}