Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Usage:

Flags:
--config string config file (default is $HOME/.flowlogs2metrics)
--health.port string Health server port (default "8080")
-h, --help help for flowlogs2metrics
--log-level string Log level: debug, info, warning, error (default "error")
--pipeline.decode.aws string aws fields
Expand Down
16 changes: 12 additions & 4 deletions cmd/flowlogs2metrics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/health"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -128,6 +129,7 @@ func initFlags() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)")
rootCmd.PersistentFlags().StringVar(&config.Opt.Health.Port, "health.port", "8080", "Health server port")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API")
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API")
Expand Down Expand Up @@ -160,23 +162,29 @@ func run() {
mainPipeline *pipeline.Pipeline
)

// Starting log message
// Initial log message
fmt.Printf("%s starting - version [%s]\n\n", filepath.Base(os.Args[0]), Version)
// Dump the configuration

// Dump configuration
dumpConfig()

// Setup (threads) exit manager
utils.SetupElegantExit()

// creating a new pipeline
// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline()
if err != nil {
log.Fatalf("failed to initialize pipeline %s", err)
os.Exit(1)
}

// Starts the flows pipeline
mainPipeline.Run()

// give all threads a chance to exit and then exit the process
// Start health report server
health.NewHealthServer(mainPipeline)

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
log.Debugf("exiting main run")
os.Exit(0)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/go-kit/kit v0.12.0
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mitchellh/mapstructure v1.4.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/hashicorp/serf v0.9.3/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb h1:tsEKRC3PU9rMw18w/uAptoijhgG4EvlA5kfJPtwrMDk=
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb/go.mod h1:NtmN9h8vrTveVQRLHcX2HQ5wIPBDCsZ351TGbZWgg38=
github.com/hetznercloud/hcloud-go v1.22.0/go.mod h1:xng8lbDUg+xM1dgc0yGHX5EeqbwIq7UYlMWMTx3SQVg=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ var (

type Options struct {
PipeLine Pipeline
Health Health
}

type Health struct {
Port string
}

type Pipeline struct {
Expand Down
61 changes: 61 additions & 0 deletions pkg/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package health

import (
"github.com/heptiolabs/healthcheck"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
log "github.com/sirupsen/logrus"
"net"
"net/http"
"time"
)

const defaultServerHost = "0.0.0.0"

type Server struct {
handler healthcheck.Handler
address string
}

func (hs *Server) Serve() {
for {
err := http.ListenAndServe(hs.address, hs.handler)
log.Errorf("http.ListenAndServe error %v", err)
time.Sleep(60 * time.Second)
}
}

func NewHealthServer(pipeline *pipeline.Pipeline) *Server {

handler := healthcheck.NewHandler()
address := net.JoinHostPort(defaultServerHost, config.Opt.Health.Port)

handler.AddLivenessCheck("PipelineCheck", pipeline.IsAlive())
handler.AddReadinessCheck("PipelineCheck", pipeline.IsReady())

server := &Server{
handler: handler,
address: address,
}

go server.Serve()

return server
}
59 changes: 59 additions & 0 deletions pkg/health/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package health

import (
"fmt"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline"
"github.com/stretchr/testify/require"
"net/http"
"net/url"
"testing"
"time"
)

func TestNewHealthServer(t *testing.T) {
readyPath := "/ready"
livePath := "/live"

type args struct {
pipeline pipeline.Pipeline
port string
}
type want struct {
statusCode int
}

tests := []struct {
name string
args args
want want
}{
{name: "pipeline running", args: args{pipeline: pipeline.Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}},
{name: "pipeline not running", args: args{pipeline: pipeline.Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

config.Opt.Health.Port = tt.args.port
expectedAddr := fmt.Sprintf("0.0.0.0:%s", config.Opt.Health.Port)
server := NewHealthServer(&tt.args.pipeline)
require.NotNil(t, server)
require.Equal(t, expectedAddr, server.address)

client := &http.Client{}

time.Sleep(time.Second)
readyURL := url.URL{Scheme: "http", Host: expectedAddr, Path: readyPath}
var resp, err = client.Get(readyURL.String())
require.NoError(t, err)
require.Equal(t, tt.want.statusCode, resp.StatusCode)

liveURL := url.URL{Scheme: "http", Host: expectedAddr, Path: livePath}
resp, err = client.Get(liveURL.String())
require.NoError(t, err)
require.Equal(t, tt.want.statusCode, resp.StatusCode)

})
}
}
23 changes: 23 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package pipeline

import (
"fmt"
"github.com/heptiolabs/healthcheck"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/encode"
Expand All @@ -40,6 +42,7 @@ type Pipeline struct {
Writer write.Writer
Extractor extract.Extractor
Encoder encode.Encoder
IsRunning bool
}

func getIngester() (ingest.Ingester, error) {
Expand Down Expand Up @@ -150,7 +153,9 @@ func NewPipeline() (*Pipeline, error) {
}

func (p *Pipeline) Run() {
p.IsRunning = true
p.Ingester.Ingest(p.Process)
p.IsRunning = false
}

// Process is called by the Ingester function
Expand All @@ -170,3 +175,21 @@ func (p Pipeline) Process(entries []interface{}) {
extracted := p.Extractor.Extract(transformed)
_ = p.Encoder.Encode(extracted)
}

func (p *Pipeline) IsReady() healthcheck.Check {
return func() error {
if !p.IsRunning {
return fmt.Errorf("pipeline is not running")
}
return nil
}
}

func (p *Pipeline) IsAlive() healthcheck.Check {
return func() error {
if !p.IsRunning {
return fmt.Errorf("pipeline is not running")
}
return nil
}
}