Skip to content

Commit

Permalink
Merge pull request #60 from kubescape/telemetry-correction
Browse files Browse the repository at this point in the history
telemetry correction
  • Loading branch information
dwertent committed May 1, 2023
2 parents 11f5880 + d36a47b commit 2bf665d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 39 deletions.
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ import (

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"go.opentelemetry.io/otel"
)

func waitOnEBPFEngineProcessErrorCode(cacheAccumulatorErrorChan chan error) {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "EBPF engine process error")
defer span.End()
err := <-cacheAccumulatorErrorChan
if err != nil {
logger.L().Ctx(ctx).Fatal("error", helpers.Error(err))
}
}

func main() {
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
Expand All @@ -35,6 +45,9 @@ func main() {
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during start accumulator", helpers.Error(err))
}
go func() {
waitOnEBPFEngineProcessErrorCode(accumulatorChannelError)
}()

k8sAPIServerClient, err := conthandler.CreateContainerClientK8SAPIServer()
if err != nil {
Expand Down
15 changes: 1 addition & 14 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"sniffer/pkg/config"

"github.com/kubescape/go-logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

const (
Expand All @@ -17,7 +15,6 @@ const (

type BackgroundContext struct {
ctx context.Context
span trace.Span
}

var backgroundContext BackgroundContext
Expand All @@ -34,19 +31,9 @@ func SetBackgroundContext() {
config.GetConfigurationConfigContext().GetAccountID(),
config.GetConfigurationConfigContext().GetClusterName(),
url.URL{Host: config.GetConfigurationConfigContext().GetBackgroundContextURL()})
setMainSpan(ctx)
}

func setMainSpan(context context.Context) {
ctx, span := otel.Tracer("").Start(context, "mainSpan")
backgroundContext.ctx = ctx
backgroundContext.span = span
}

func GetBackgroundContext() context.Context {
return backgroundContext.ctx
}

func GetMainSpan() trace.Span {
return backgroundContext.span
}
}
39 changes: 20 additions & 19 deletions pkg/conthandler/container_main_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conthandler

import (
gcontext "context"
"errors"
"fmt"
"sniffer/pkg/config"
Expand Down Expand Up @@ -78,11 +77,11 @@ func (ch *ContainerHandler) afterTimerActions() error {
var err error

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
afterTimerActionsData := <-ch.afterTimerActionsChannel
containerDataInterface, exist := ch.watchedContainers.Load(afterTimerActionsData.containerID)
if !exist {
logger.L().Ctx(context.GetBackgroundContext()).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
span.End()
continue
}
Expand All @@ -93,23 +92,23 @@ func (ch *ContainerHandler) afterTimerActions() error {

if err = <-containerData.syncChannel[StepGetSBOM]; err != nil {
logger.L().Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.FilterSBOM(fileList); err != nil {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Warning("failed to filter SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.StoreFilterSBOM(containerData.event.GetInstanceIDHash()); err != nil {
if !errors.Is(err, sbom.IsAlreadyExist()) {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
}
span.End()
continue
}
logger.L().Info("filtered SBOM has been stored successfully", []helpers.IDetails{helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID())}...)
span.End() // defer in infinite loop never runs
}
}
}
Expand All @@ -129,6 +128,7 @@ func (ch *ContainerHandler) startTimer(watchedContainer watchedContainerData, co
err = droppedEventsError
} else if errors.Is(err, containerHasTerminatedError) {
watchedContainer.snifferTicker.Stop()
err = containerHasTerminatedError
}
}

Expand All @@ -146,17 +146,16 @@ func (ch *ContainerHandler) deleteResources(watchedContainer watchedContainerDat
ch.watchedContainers.Delete(contEvent.GetContainerID())
}

func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData, ctx gcontext.Context) {
func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData) {
containerDataInterface, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if !exist {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
defer span.End()
logger.L().Ctx(ctx).Error("startRelevancyProcess: failed to get container data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()))
return
}
watchedContainer := containerDataInterface.(watchedContainerData)

ctx, span := otel.Tracer("").Start(ctx, "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
defer span.End()

err := watchedContainer.containerAggregator.StartAggregate(watchedContainer.syncChannel[StepEventAggregator])
if err != nil {
return
Expand All @@ -167,14 +166,16 @@ func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventDat
stopSniffingTime := now.Add(configStopTime)
for start := time.Now(); start.Before(stopSniffingTime); {
go ch.getSBOM(contEvent)
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
err = ch.startTimer(watchedContainer, contEvent.GetContainerID())
if err != nil {
if errors.Is(err, droppedEventsError) {
logger.L().Ctx(ctx).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
} else if errors.Is(err, containerHasTerminatedError) {
break
}
} else if errors.Is(err, containerHasTerminatedError) {
break
}
span.End()
}
logger.L().Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
ch.deleteResources(watchedContainer, contEvent)
Expand All @@ -199,7 +200,7 @@ func (ch *ContainerHandler) getSBOM(contEvent v1.ContainerEventData) {
watchedContainer.syncChannel[StepGetSBOM] <- err
}

func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData) error {
_, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if exist {
return containerAlreadyExistError
Expand All @@ -216,7 +217,7 @@ func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEv
},
}
ch.watchedContainers.Store(contEvent.GetContainerID(), newWatchedContainer)
go ch.startRelevancyProcess(contEvent, ctx)
go ch.startRelevancyProcess(contEvent)
return nil
}

Expand All @@ -232,10 +233,10 @@ func (ch *ContainerHandler) handleContainerTerminatedEvent(contEvent v1.Containe
return nil
}

func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData) error {
switch contEvent.GetContainerEventType() {
case v1.ContainerRunning:
return ch.handleContainerRunningEvent(contEvent, ctx)
return ch.handleContainerRunningEvent(contEvent)

case v1.ContainerDeleted:
return ch.handleContainerTerminatedEvent(contEvent)
Expand All @@ -252,14 +253,14 @@ func (ch *ContainerHandler) StartMainHandler() error {
}()

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
contEvent := <-ch.containersEventChan
err := ch.handleNewContainerEvent(contEvent, ctx)
err := ch.handleNewContainerEvent(contEvent)
if err != nil {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
if !errors.Is(err, containerAlreadyExistError) {
logger.L().Ctx(ctx).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err))
}
span.End()
}
span.End()
}
}
3 changes: 1 addition & 2 deletions pkg/conthandler/container_main_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conthandler

import (
"context"
"path"
"sniffer/pkg/config"
configV1 "sniffer/pkg/config/v1"
Expand Down Expand Up @@ -65,7 +64,7 @@ func TestContMainHandler(t *testing.T) {
t.Fatalf("container ID is wrong, get: %s expected: %s", event.GetContainerID(), RedisContainerIDContHandler)
}
time.Sleep(12 * time.Second)
err = contHandler.handleNewContainerEvent(event, context.Background())
err = contHandler.handleNewContainerEvent(event)
if err != nil {
t.Fatalf("handleNewContainerEvent failed with error %v", err)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/event_data_storage/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (acc *Accumulator) streamEventToRegisterContainer(event *evData.EventData)

func (acc *Accumulator) accumulateEbpfEngineData() {
for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "accumulator.accumulateEbpfEngineData")
event := <-acc.eventChannel
if nodeAgentContainerID != "" && strings.Contains(event.GetEventContainerID(), nodeAgentContainerID) {
continue
Expand All @@ -180,8 +179,8 @@ func (acc *Accumulator) accumulateEbpfEngineData() {
} else {
index, newSlotIsNeeded, err := acc.findIndexByTimestamp(event)
if err != nil {
logger.L().Ctx(ctx).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err))
logger.L().Ctx(ctx).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event)))
logger.L().Ctx(context.GetBackgroundContext()).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err))
logger.L().Ctx(context.GetBackgroundContext()).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event)))
continue
}
if newSlotIsNeeded {
Expand All @@ -191,7 +190,6 @@ func (acc *Accumulator) accumulateEbpfEngineData() {
acc.streamEventToRegisterContainer(event)
}
}
span.End()
}
}

Expand Down

0 comments on commit 2bf665d

Please sign in to comment.