diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index aba9434b5..659e720e7 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -82,7 +82,7 @@ type encodeProm struct { expiryTime int64 mList *list.List mCache metricCache - exitChan chan bool + exitChan chan struct{} } var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ @@ -336,7 +336,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { } } - ch := make(chan bool, 1) + ch := make(chan struct{}) utils.RegisterExitChannel(ch) log.Debugf("metrics = %v", metrics) diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index 33865336b..d83179d05 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -75,7 +75,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi func (aggregates *Aggregates) cleanupExpiredEntriesLoop() { ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second) - done := make(chan bool) + done := make(chan struct{}) utils.RegisterExitChannel(done) go func() { for { diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index 43a5ab78d..7df418544 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -51,7 +51,7 @@ type ingestCollector struct { in chan map[string]interface{} batchFlushTime time.Duration batchMaxLength int - exitChan chan bool + exitChan chan struct{} } // TransportWrapper is an implementation of the goflow2 transport interface @@ -200,7 +200,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { log.Infof("hostname = %s", jsonIngestCollector.HostName) log.Infof("port = %d", jsonIngestCollector.Port) - ch := make(chan bool, 1) + ch := make(chan struct{}) pUtils.RegisterExitChannel(ch) bml := defaultBatchMaxLength diff --git a/pkg/pipeline/ingest/ingest_collector_test.go b/pkg/pipeline/ingest/ingest_collector_test.go index d8b16dc13..64189afc2 100644 --- a/pkg/pipeline/ingest/ingest_collector_test.go +++ b/pkg/pipeline/ingest/ingest_collector_test.go @@ -19,7 +19,7 @@ func TestIngest(t *testing.T) { hostname: "0.0.0.0", port: collectorPort, batchFlushTime: 10 * time.Millisecond, - exitChan: make(chan bool), + exitChan: make(chan struct{}), } forwarded := make(chan []interface{}) //defer close(forwarded) diff --git a/pkg/pipeline/ingest/ingest_file.go b/pkg/pipeline/ingest/ingest_file.go index a865c2d83..4dab07e92 100644 --- a/pkg/pipeline/ingest/ingest_file.go +++ b/pkg/pipeline/ingest/ingest_file.go @@ -30,7 +30,7 @@ import ( type IngestFile struct { params config.Ingest - exitChan chan bool + exitChan chan struct{} PrevRecords []interface{} TotalRecords int } @@ -104,7 +104,7 @@ func NewIngestFile(params config.StageParam) (Ingester, error) { log.Debugf("input file name = %s", params.Ingest.File.Filename) - ch := make(chan bool, 1) + ch := make(chan struct{}) utils.RegisterExitChannel(ch) return &IngestFile{ params: params.Ingest, diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index 62e037f17..2be7ea728 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/netobserv-agent/pkg/grpc" "github.com/netobserv/netobserv-agent/pkg/pbflow" ) @@ -37,6 +38,13 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) { } func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) { + exitCh := make(chan struct{}) + utils.RegisterExitChannel(exitCh) + go func() { + <-exitCh + close(no.flowPackets) + no.collector.Close() + }() for fp := range no.flowPackets { out <- []interface{}{fp} } diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index c7410b7ec..3a55d2f7f 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -38,7 +38,7 @@ type ingestKafka struct { kafkaParams api.IngestKafka kafkaReader kafkaReadMessage in chan string - exitChan chan bool + exitChan chan struct{} prevRecords []interface{} // copy of most recently sent records; for testing and debugging } @@ -153,7 +153,7 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { } log.Debugf("kafkaReader = %v", kafkaReader) - ch := make(chan bool, 1) + ch := make(chan struct{}) utils.RegisterExitChannel(ch) return &ingestKafka{ diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index a8d1f5b18..cba3e4ea1 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -126,7 +126,7 @@ func Test_IngestKafka(t *testing.T) { require.Equal(t, record3, receivedEntries[2]) // make the ingest thread exit - ingestKafka.exitChan <- true + close(ingestKafka.exitChan) time.Sleep(time.Second) } @@ -179,7 +179,7 @@ func Test_KafkaListener(t *testing.T) { require.Equal(t, string(fakeRecord), receivedEntries[0]) // make the ingest thread exit - ingestKafka.exitChan <- true + close(ingestKafka.exitChan) time.Sleep(time.Second) } diff --git a/pkg/pipeline/utils/exit.go b/pkg/pipeline/utils/exit.go index b700314ea..c0f96d304 100644 --- a/pkg/pipeline/utils/exit.go +++ b/pkg/pipeline/utils/exit.go @@ -27,11 +27,11 @@ import ( ) var ( - registeredChannels []chan bool + registeredChannels []chan struct{} chanMutex sync.Mutex ) -func RegisterExitChannel(ch chan bool) { +func RegisterExitChannel(ch chan struct{}) { chanMutex.Lock() defer chanMutex.Unlock() registeredChannels = append(registeredChannels, ch) @@ -40,7 +40,7 @@ func RegisterExitChannel(ch chan bool) { func SetupElegantExit() { log.Debugf("entering SetupElegantExit") // handle elegant exit; create support for channels of go routines that want to exit cleanly - registeredChannels = make([]chan bool, 0) + registeredChannels = make([]chan struct{}, 0) exitSigChan := make(chan os.Signal, 1) log.Debugf("registered exit signal channel") signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM) @@ -52,7 +52,7 @@ func SetupElegantExit() { defer chanMutex.Unlock() // exit signal received; stop other go functions for _, ch := range registeredChannels { - ch <- true + close(ch) } log.Debugf("exiting SetupElegantExit go function") }() diff --git a/pkg/pipeline/utils/exit_test.go b/pkg/pipeline/utils/exit_test.go index 880be6a36..646a1f7f1 100644 --- a/pkg/pipeline/utils/exit_test.go +++ b/pkg/pipeline/utils/exit_test.go @@ -11,9 +11,9 @@ import ( func Test_SetupElegantExit(t *testing.T) { SetupElegantExit() require.Equal(t, 0, len(registeredChannels)) - ch1 := make(chan bool, 1) - ch2 := make(chan bool, 1) - ch3 := make(chan bool, 1) + ch1 := make(chan struct{}) + ch2 := make(chan struct{}) + ch3 := make(chan struct{}) RegisterExitChannel(ch1) require.Equal(t, 1, len(registeredChannels)) RegisterExitChannel(ch2) diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 6b71505ee..a3d9ef0c6 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -56,7 +56,7 @@ type Loki struct { client emitter timeNow func() time.Time in chan config.GenericMap - exitChan chan bool + exitChan chan struct{} } var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{ @@ -263,7 +263,7 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) { return nil, err } - ch := make(chan bool, 1) + ch := make(chan struct{}) pUtils.RegisterExitChannel(ch) in := make(chan config.GenericMap, channelSize)