From 07700788aee0d41fb4c8fbd62f098f5fe8a32661 Mon Sep 17 00:00:00 2001 From: Raziel Cohen Date: Sun, 30 Apr 2023 16:49:47 +0300 Subject: [PATCH 1/3] telemetry correction Signed-off-by: Raziel Cohen --- main.go | 13 ++++++++ pkg/context/context.go | 15 +-------- pkg/conthandler/container_main_handler.go | 39 ++++++++++++----------- pkg/event_data_storage/accumulator.go | 6 ++-- 4 files changed, 36 insertions(+), 37 deletions(-) diff --git a/main.go b/main.go index 6f618939..dab9cf0d 100644 --- a/main.go +++ b/main.go @@ -11,8 +11,18 @@ import ( "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" + "go.opentelemetry.io/otel" ) +func waitOnEBPFEngineProcessErrorCode(cacheAccumulatorErrorChan chan error) { + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "EBPF engine process error") + defer span.End() + err := <-cacheAccumulatorErrorChan + if err != nil { + logger.L().Ctx(ctx).Fatal("error", helpers.Error(err)) + } +} + func main() { cfg := config.GetConfigurationConfigContext() configData, err := cfg.GetConfigurationReader() @@ -35,6 +45,9 @@ func main() { if err != nil { logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during start accumulator", helpers.Error(err)) } + go func() { + waitOnEBPFEngineProcessErrorCode(accumulatorChannelError) + }() k8sAPIServerClient, err := conthandler.CreateContainerClientK8SAPIServer() if err != nil { diff --git a/pkg/context/context.go b/pkg/context/context.go index 80d84455..3b665a29 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -7,8 +7,6 @@ import ( "sniffer/pkg/config" "github.com/kubescape/go-logger" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/trace" ) const ( @@ -17,7 +15,6 @@ const ( type BackgroundContext struct { ctx context.Context - span trace.Span } var backgroundContext BackgroundContext @@ -34,19 +31,9 @@ func SetBackgroundContext() { config.GetConfigurationConfigContext().GetAccountID(), config.GetConfigurationConfigContext().GetClusterName(), url.URL{Host: config.GetConfigurationConfigContext().GetBackgroundContextURL()}) - setMainSpan(ctx) -} - -func setMainSpan(context context.Context) { - ctx, span := otel.Tracer("").Start(context, "mainSpan") backgroundContext.ctx = ctx - backgroundContext.span = span } func GetBackgroundContext() context.Context { return backgroundContext.ctx -} - -func GetMainSpan() trace.Span { - return backgroundContext.span -} +} \ No newline at end of file diff --git a/pkg/conthandler/container_main_handler.go b/pkg/conthandler/container_main_handler.go index dffa7621..026e013b 100644 --- a/pkg/conthandler/container_main_handler.go +++ b/pkg/conthandler/container_main_handler.go @@ -1,7 +1,6 @@ package conthandler import ( - gcontext "context" "errors" "fmt" "sniffer/pkg/config" @@ -78,11 +77,11 @@ func (ch *ContainerHandler) afterTimerActions() error { var err error for { - ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions") afterTimerActionsData := <-ch.afterTimerActionsChannel containerDataInterface, exist := ch.watchedContainers.Load(afterTimerActionsData.containerID) if !exist { - logger.L().Ctx(context.GetBackgroundContext()).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...) + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions") + logger.L().Ctx(ctx).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...) span.End() continue } @@ -93,23 +92,23 @@ func (ch *ContainerHandler) afterTimerActions() error { if err = <-containerData.syncChannel[StepGetSBOM]; err != nil { logger.L().Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...) - span.End() continue } if err = containerData.sbomClient.FilterSBOM(fileList); err != nil { + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions") logger.L().Ctx(ctx).Warning("failed to filter SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...) span.End() continue } if err = containerData.sbomClient.StoreFilterSBOM(containerData.event.GetInstanceIDHash()); err != nil { if !errors.Is(err, sbom.IsAlreadyExist()) { + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions") logger.L().Ctx(ctx).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...) + span.End() } - span.End() continue } logger.L().Info("filtered SBOM has been stored successfully", []helpers.IDetails{helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID())}...) - span.End() // defer in infinite loop never runs } } } @@ -129,6 +128,7 @@ func (ch *ContainerHandler) startTimer(watchedContainer watchedContainerData, co err = droppedEventsError } else if errors.Is(err, containerHasTerminatedError) { watchedContainer.snifferTicker.Stop() + err = containerHasTerminatedError } } @@ -146,17 +146,16 @@ func (ch *ContainerHandler) deleteResources(watchedContainer watchedContainerDat ch.watchedContainers.Delete(contEvent.GetContainerID()) } -func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData, ctx gcontext.Context) { +func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData) { containerDataInterface, exist := ch.watchedContainers.Load(contEvent.GetContainerID()) if !exist { + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID()))) + defer span.End() logger.L().Ctx(ctx).Error("startRelevancyProcess: failed to get container data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID())) return } watchedContainer := containerDataInterface.(watchedContainerData) - ctx, span := otel.Tracer("").Start(ctx, "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID()))) - defer span.End() - err := watchedContainer.containerAggregator.StartAggregate(watchedContainer.syncChannel[StepEventAggregator]) if err != nil { return @@ -167,14 +166,16 @@ func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventDat stopSniffingTime := now.Add(configStopTime) for start := time.Now(); start.Before(stopSniffingTime); { go ch.getSBOM(contEvent) + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID()))) err = ch.startTimer(watchedContainer, contEvent.GetContainerID()) if err != nil { if errors.Is(err, droppedEventsError) { logger.L().Ctx(ctx).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err)) + } else if errors.Is(err, containerHasTerminatedError) { + break } - } else if errors.Is(err, containerHasTerminatedError) { - break } + span.End() } logger.L().Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err)) ch.deleteResources(watchedContainer, contEvent) @@ -199,7 +200,7 @@ func (ch *ContainerHandler) getSBOM(contEvent v1.ContainerEventData) { watchedContainer.syncChannel[StepGetSBOM] <- err } -func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error { +func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData) error { _, exist := ch.watchedContainers.Load(contEvent.GetContainerID()) if exist { return containerAlreadyExistError @@ -216,7 +217,7 @@ func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEv }, } ch.watchedContainers.Store(contEvent.GetContainerID(), newWatchedContainer) - go ch.startRelevancyProcess(contEvent, ctx) + go ch.startRelevancyProcess(contEvent) return nil } @@ -232,10 +233,10 @@ func (ch *ContainerHandler) handleContainerTerminatedEvent(contEvent v1.Containe return nil } -func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error { +func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData) error { switch contEvent.GetContainerEventType() { case v1.ContainerRunning: - return ch.handleContainerRunningEvent(contEvent, ctx) + return ch.handleContainerRunningEvent(contEvent) case v1.ContainerDeleted: return ch.handleContainerTerminatedEvent(contEvent) @@ -252,14 +253,14 @@ func (ch *ContainerHandler) StartMainHandler() error { }() for { - ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler") contEvent := <-ch.containersEventChan - err := ch.handleNewContainerEvent(contEvent, ctx) + err := ch.handleNewContainerEvent(contEvent) if err != nil { + ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler") if !errors.Is(err, containerAlreadyExistError) { logger.L().Ctx(ctx).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err)) } + defer span.End() } - span.End() } } diff --git a/pkg/event_data_storage/accumulator.go b/pkg/event_data_storage/accumulator.go index a6676633..9898d77f 100644 --- a/pkg/event_data_storage/accumulator.go +++ b/pkg/event_data_storage/accumulator.go @@ -169,7 +169,6 @@ func (acc *Accumulator) streamEventToRegisterContainer(event *evData.EventData) func (acc *Accumulator) accumulateEbpfEngineData() { for { - ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "accumulator.accumulateEbpfEngineData") event := <-acc.eventChannel if nodeAgentContainerID != "" && strings.Contains(event.GetEventContainerID(), nodeAgentContainerID) { continue @@ -180,8 +179,8 @@ func (acc *Accumulator) accumulateEbpfEngineData() { } else { index, newSlotIsNeeded, err := acc.findIndexByTimestamp(event) if err != nil { - logger.L().Ctx(ctx).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err)) - logger.L().Ctx(ctx).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event))) + logger.L().Ctx(context.GetBackgroundContext()).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err)) + logger.L().Ctx(context.GetBackgroundContext()).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event))) continue } if newSlotIsNeeded { @@ -191,7 +190,6 @@ func (acc *Accumulator) accumulateEbpfEngineData() { acc.streamEventToRegisterContainer(event) } } - span.End() } } From cebbe3bc3f0ba57c5cfea37af031cdc97ea494c3 Mon Sep 17 00:00:00 2001 From: Raziel Cohen Date: Sun, 30 Apr 2023 17:00:33 +0300 Subject: [PATCH 2/3] test correction Signed-off-by: Raziel Cohen --- pkg/conthandler/container_main_handler_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/conthandler/container_main_handler_test.go b/pkg/conthandler/container_main_handler_test.go index 6adcdc39..3f927516 100644 --- a/pkg/conthandler/container_main_handler_test.go +++ b/pkg/conthandler/container_main_handler_test.go @@ -1,7 +1,6 @@ package conthandler import ( - "context" "os" "path" "sniffer/pkg/config" @@ -69,7 +68,7 @@ func TestContMainHandler(t *testing.T) { t.Fatalf("container ID is wrong, get: %s expected: %s", event.GetContainerID(), RedisContainerIDContHandler) } time.Sleep(12 * time.Second) - err = contHandler.handleNewContainerEvent(event, context.Background()) + err = contHandler.handleNewContainerEvent(event) if err != nil { t.Fatalf("handleNewContainerEvent failed with error %v", err) } From d4bc3dd12f01b7e8020b1286c83c83ad728398b3 Mon Sep 17 00:00:00 2001 From: Raziel Cohen Date: Mon, 1 May 2023 12:54:08 +0300 Subject: [PATCH 3/3] linter correction Signed-off-by: Raziel Cohen --- pkg/conthandler/container_main_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/conthandler/container_main_handler.go b/pkg/conthandler/container_main_handler.go index 026e013b..6a58daea 100644 --- a/pkg/conthandler/container_main_handler.go +++ b/pkg/conthandler/container_main_handler.go @@ -260,7 +260,7 @@ func (ch *ContainerHandler) StartMainHandler() error { if !errors.Is(err, containerAlreadyExistError) { logger.L().Ctx(ctx).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err)) } - defer span.End() + span.End() } } }