Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type encodeProm struct {
expiryTime int64
mList *list.List
mCache metricCache
exitChan chan bool
exitChan chan struct{}
}

var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -336,7 +336,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
}
}

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)

log.Debugf("metrics = %v", metrics)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
done := make(chan bool)
done := make(chan struct{})
utils.RegisterExitChannel(done)
go func() {
for {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ingestCollector struct {
in chan map[string]interface{}
batchFlushTime time.Duration
batchMaxLength int
exitChan chan bool
exitChan chan struct{}
}

// TransportWrapper is an implementation of the goflow2 transport interface
Expand Down Expand Up @@ -200,7 +200,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
log.Infof("hostname = %s", jsonIngestCollector.HostName)
log.Infof("port = %d", jsonIngestCollector.Port)

ch := make(chan bool, 1)
ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

bml := defaultBatchMaxLength
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/ingest/ingest_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestIngest(t *testing.T) {
hostname: "0.0.0.0",
port: collectorPort,
batchFlushTime: 10 * time.Millisecond,
exitChan: make(chan bool),
exitChan: make(chan struct{}),
}
forwarded := make(chan []interface{})
//defer close(forwarded)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

type IngestFile struct {
params config.Ingest
exitChan chan bool
exitChan chan struct{}
PrevRecords []interface{}
TotalRecords int
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewIngestFile(params config.StageParam) (Ingester, error) {

log.Debugf("input file name = %s", params.Ingest.File.Filename)

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)
return &IngestFile{
params: params.Ingest,
Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-agent/pkg/grpc"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
)
Expand Down Expand Up @@ -37,6 +38,13 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
}

func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
exitCh := make(chan struct{})
utils.RegisterExitChannel(exitCh)
go func() {
<-exitCh
close(no.flowPackets)
no.collector.Close()
}()
for fp := range no.flowPackets {
out <- []interface{}{fp}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
in chan string
exitChan chan bool
exitChan chan struct{}
prevRecords []interface{} // copy of most recently sent records; for testing and debugging
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Debugf("kafkaReader = %v", kafkaReader)

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)

return &ingestKafka{
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Test_IngestKafka(t *testing.T) {
require.Equal(t, record3, receivedEntries[2])

// make the ingest thread exit
ingestKafka.exitChan <- true
close(ingestKafka.exitChan)
time.Sleep(time.Second)
}

Expand Down Expand Up @@ -179,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
require.Equal(t, string(fakeRecord), receivedEntries[0])

// make the ingest thread exit
ingestKafka.exitChan <- true
close(ingestKafka.exitChan)
time.Sleep(time.Second)

}
8 changes: 4 additions & 4 deletions pkg/pipeline/utils/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
)

var (
registeredChannels []chan bool
registeredChannels []chan struct{}
chanMutex sync.Mutex
)

func RegisterExitChannel(ch chan bool) {
func RegisterExitChannel(ch chan struct{}) {
chanMutex.Lock()
defer chanMutex.Unlock()
registeredChannels = append(registeredChannels, ch)
Expand All @@ -40,7 +40,7 @@ func RegisterExitChannel(ch chan bool) {
func SetupElegantExit() {
log.Debugf("entering SetupElegantExit")
// handle elegant exit; create support for channels of go routines that want to exit cleanly
registeredChannels = make([]chan bool, 0)
registeredChannels = make([]chan struct{}, 0)
exitSigChan := make(chan os.Signal, 1)
log.Debugf("registered exit signal channel")
signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -52,7 +52,7 @@ func SetupElegantExit() {
defer chanMutex.Unlock()
// exit signal received; stop other go functions
for _, ch := range registeredChannels {
ch <- true
close(ch)
}
log.Debugf("exiting SetupElegantExit go function")
}()
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/utils/exit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
func Test_SetupElegantExit(t *testing.T) {
SetupElegantExit()
require.Equal(t, 0, len(registeredChannels))
ch1 := make(chan bool, 1)
ch2 := make(chan bool, 1)
ch3 := make(chan bool, 1)
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
RegisterExitChannel(ch1)
require.Equal(t, 1, len(registeredChannels))
RegisterExitChannel(ch2)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Loki struct {
client emitter
timeNow func() time.Time
in chan config.GenericMap
exitChan chan bool
exitChan chan struct{}
}

var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -263,7 +263,7 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) {
return nil, err
}

ch := make(chan bool, 1)
ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

in := make(chan config.GenericMap, channelSize)
Expand Down