Skip to content

Commit

Permalink
Merge pull request #55 from kubescape/telemetry-correction
Browse files Browse the repository at this point in the history
Telemetry correction
  • Loading branch information
rcohencyberarmor committed Apr 30, 2023
2 parents 72a8982 + 68b73f2 commit 9dd48dd
Show file tree
Hide file tree
Showing 24 changed files with 174 additions and 140 deletions.
2 changes: 1 addition & 1 deletion internal/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sniffer/pkg/config"
"syscall"

logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

Expand Down
8 changes: 4 additions & 4 deletions internal/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestInt8ToStr(t *testing.T) {
}

// Test with empty input
input2 := []int8{}
var input2 []int8
expected2 := ""
output2 := int8ToStr(input2)
if output2 != expected2 {
Expand Down Expand Up @@ -58,12 +58,12 @@ func TestCheckPrerequisites(t *testing.T) {
t.Fatalf("failed to set env %s with err %v", config.ConfigEnvVar, err)
}

config := config.GetConfigurationConfigContext()
configData, err := config.GetConfigurationReader()
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
t.Errorf("GetConfigurationReader failed with err %v", err)
}
err = config.ParseConfiguration(v1.CreateConfigData(), configData)
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
t.Fatalf("ParseConfiguration failed with err %v", err)
}
Expand Down
16 changes: 12 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ func main() {
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during getting configuration data", helpers.Error(err))
logger.L().Fatal("error during getting configuration data", helpers.Error(err))
}
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during parsing configuration", helpers.Error(err))
logger.L().Fatal("error during parsing configuration", helpers.Error(err))
}
err = validator.CheckPrerequisites()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during validation", helpers.Error(err))
logger.L().Fatal("error during validation", helpers.Error(err))
}

context.SetBackgroundContext()
// after this line we can use logger.L().Ctx() to attach events to spans

accumulatorChannelError := make(chan error, 10)
acc := accumulator.GetAccumulator()
Expand All @@ -46,5 +47,12 @@ func main() {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during create the storage client", helpers.Error(err))
}
mainHandler, err := conthandler.CreateContainerHandler(k8sAPIServerClient, storageClient)
mainHandler.StartMainHandler()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during create the main container handler", helpers.Error(err))
}

err = mainHandler.StartMainHandler()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during start the main container handler", helpers.Error(err))
}
}
8 changes: 4 additions & 4 deletions pkg/config/v1/config_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestConfigData_GetClusterName(t *testing.T) {

func TestConfigData_SetNodeName(t *testing.T) {
expectedName := "node-1"
os.Setenv(nodeNameEnvVar, expectedName)
_ = os.Setenv(nodeNameEnvVar, expectedName)
c := &ConfigData{}
c.SetNodeName()
if c.NodeData.Name != expectedName {
Expand All @@ -142,7 +142,7 @@ func TestConfigData_SetNodeName(t *testing.T) {

func TestConfigData_SetNamespace(t *testing.T) {
expectedName := "namespace-1"
os.Setenv(NamespaceEnvVar, expectedName)
_ = os.Setenv(NamespaceEnvVar, expectedName)
c := &ConfigData{}
c.SetNamespace()
if c.Namespace != expectedName {
Expand All @@ -152,7 +152,7 @@ func TestConfigData_SetNamespace(t *testing.T) {

func TestConfigData_SetContainerName(t *testing.T) {
expectedName := "cont-1"
os.Setenv(ContainerNameEnvVar, expectedName)
_ = os.Setenv(ContainerNameEnvVar, expectedName)
c := &ConfigData{}
c.SetContainerName()
if c.ContainerName != expectedName {
Expand All @@ -162,7 +162,7 @@ func TestConfigData_SetContainerName(t *testing.T) {

func TestConfigData_SetBackgroundContextURL(t *testing.T) {
expectedName := "URL-1"
os.Setenv("OTEL_COLLECTOR_SVC", expectedName)
_ = os.Setenv("OTEL_COLLECTOR_SVC", expectedName)
c := &ConfigData{}
c.SetBackgroundContextURL()
if c.telemetryURL != expectedName {
Expand Down
17 changes: 15 additions & 2 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ import (
"sniffer/pkg/config"

"github.com/kubescape/go-logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

const (
releaseBuildTagEnvironmentVariable = "RELEASE"
)

type BackgroundContext struct {
ctx context.Context
ctx context.Context
span trace.Span
}

var backgroundContext BackgroundContext
Expand All @@ -26,14 +29,24 @@ func init() {
}

func SetBackgroundContext() {
ctx := logger.InitOtel("node agent",
ctx := logger.InitOtel("nodeagent",
os.Getenv(releaseBuildTagEnvironmentVariable),
config.GetConfigurationConfigContext().GetAccountID(),
config.GetConfigurationConfigContext().GetClusterName(),
url.URL{Host: config.GetConfigurationConfigContext().GetBackgroundContextURL()})
setMainSpan(ctx)
}

func setMainSpan(context context.Context) {
ctx, span := otel.Tracer("").Start(context, "mainSpan")
backgroundContext.ctx = ctx
backgroundContext.span = span
}

func GetBackgroundContext() context.Context {
return backgroundContext.ctx
}

func GetMainSpan() trace.Span {
return backgroundContext.span
}
2 changes: 1 addition & 1 deletion pkg/conthandler/container_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (aggregator *Aggregator) collectDataFromContainerAccumulator(errChan chan e
for {
newEvent := <-aggregator.aggregationDataChan
if newEvent.GetEventCMD() == accumulator.DropEventOccurred {
aggregator.StopAggregate()
_ = aggregator.StopAggregate()
errChan <- fmt.Errorf(newEvent.GetEventCMD())
break
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/conthandler/container_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (

accumulator "sniffer/pkg/event_data_storage"

instanceidhandler "github.com/kubescape/k8s-interface/instanceidhandler/v1"
"github.com/kubescape/k8s-interface/instanceidhandler/v1"
)

const (
RedisPodName = "redis-64bd97b5fc-kvh7r"
RedisImageID = "docker-pullable://redis@sha256:6a59f1cbb8d28ac484176d52c473494859a512ddba3ea62a547258cf16c9b3ae"
RedisContainerID = "16248df36c67"
RedisWLID = "wlid://cluster-test/namespace-any/deployment/redis"
RedisInstanceID = "apiVersion-v1/namespace-any/kind-deployment/name-redis/containerName-redis"
RedisPodName = "redis-64bd97b5fc-kvh7r"
RedisImageID = "docker-pullable://redis@sha256:6a59f1cbb8d28ac484176d52c473494859a512ddba3ea62a547258cf16c9b3ae"
RedisContainerID = "16248df36c67"
RedisWLID = "wlid://cluster-test/namespace-any/deployment/redis"

NumberOfRedisEventInTheMockAfterFilterDuplicated = 73
)

Expand Down
61 changes: 40 additions & 21 deletions pkg/conthandler/container_main_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conthandler

import (
gcontext "context"
"errors"
"fmt"
"sniffer/pkg/config"
Expand All @@ -15,6 +16,9 @@ import (

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
Expand Down Expand Up @@ -62,10 +66,9 @@ func CreateContainerHandler(contClient ContainerClient, storageClient storagecli
}

return &ContainerHandler{
containersEventChan: make(chan v1.ContainerEventData, 50),
containerWatcher: contWatcher,
watchedContainers: sync.Map{},
// syncWatchedContainersMap: &sync.RWMutex{},
containersEventChan: make(chan v1.ContainerEventData, 50),
containerWatcher: contWatcher,
watchedContainers: sync.Map{},
afterTimerActionsChannel: make(chan afterTimerActionsData, 50),
storageClient: storageClient,
}, nil
Expand All @@ -75,10 +78,12 @@ func (ch *ContainerHandler) afterTimerActions() error {
var err error

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
afterTimerActionsData := <-ch.afterTimerActionsChannel
containerDataInterface, exist := ch.watchedContainers.Load(afterTimerActionsData.containerID)
if !exist {
logger.L().Ctx(context.GetBackgroundContext()).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
span.End()
continue
}
containerData := containerDataInterface.(watchedContainerData)
Expand All @@ -87,20 +92,24 @@ func (ch *ContainerHandler) afterTimerActions() error {
fileList := containerData.containerAggregator.GetContainerRealtimeFileList()

if err = <-containerData.syncChannel[StepGetSBOM]; err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
logger.L().Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.FilterSBOM(fileList); err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Warning("failed to filter SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
logger.L().Ctx(ctx).Warning("failed to filter SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.StoreFilterSBOM(containerData.event.GetInstanceIDHash()); err != nil {
if !errors.Is(err, sbom.IsAlreadyExist()) {
logger.L().Ctx(context.GetBackgroundContext()).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
}
span.End()
continue
}
logger.L().Info("filtered SBOM has been stored successfully", []helpers.IDetails{helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID())}...)
span.End() // defer in infinite loop never runs
}
}
}
Expand All @@ -117,11 +126,9 @@ func (ch *ContainerHandler) startTimer(watchedContainer watchedContainerData, co
}
case err = <-watchedContainer.syncChannel[StepEventAggregator]:
if err.Error() == accumulator.DropEventOccurred {
watchedContainer.snifferTicker.Stop()
err = droppedEventsError
} else if errors.Is(err, containerHasTerminatedError) {
watchedContainer.snifferTicker.Stop()
logger.L().Debug("container has terminated", helpers.String("container ID", watchedContainer.event.GetContainerID()), helpers.String("container name", watchedContainer.event.GetContainerName()), helpers.String("k8s resources", watchedContainer.event.GetK8SWorkloadID()))
}
}

Expand All @@ -134,19 +141,22 @@ func createTicker() *time.Ticker {

func (ch *ContainerHandler) deleteResources(watchedContainer watchedContainerData, contEvent v1.ContainerEventData) {
watchedContainer.snifferTicker.Stop()
watchedContainer.containerAggregator.StopAggregate()
_ = watchedContainer.containerAggregator.StopAggregate()
watchedContainer.sbomClient.CleanResources()
ch.watchedContainers.Delete(contEvent.GetContainerID())
}

func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData) {
func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData, ctx gcontext.Context) {
containerDataInterface, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if !exist {
logger.L().Ctx(context.GetBackgroundContext()).Error("startRelevancyProcess: failed to get container data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()))
logger.L().Ctx(ctx).Error("startRelevancyProcess: failed to get container data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()))
return
}
watchedContainer := containerDataInterface.(watchedContainerData)

ctx, span := otel.Tracer("").Start(ctx, "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
defer span.End()

err := watchedContainer.containerAggregator.StartAggregate(watchedContainer.syncChannel[StepEventAggregator])
if err != nil {
return
Expand All @@ -160,10 +170,13 @@ func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventDat
err = ch.startTimer(watchedContainer, contEvent.GetContainerID())
if err != nil {
if errors.Is(err, droppedEventsError) {
logger.L().Ctx(context.GetBackgroundContext()).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
logger.L().Ctx(ctx).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
}
} else if errors.Is(err, containerHasTerminatedError) {
break
}
}
logger.L().Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
ch.deleteResources(watchedContainer, contEvent)
}

Expand All @@ -186,7 +199,7 @@ func (ch *ContainerHandler) getSBOM(contEvent v1.ContainerEventData) {
watchedContainer.syncChannel[StepGetSBOM] <- err
}

func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData) error {
func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
_, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if exist {
return containerAlreadyExistError
Expand All @@ -203,7 +216,7 @@ func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEv
},
}
ch.watchedContainers.Store(contEvent.GetContainerID(), newWatchedContainer)
go ch.startRelevancyProcess(contEvent)
go ch.startRelevancyProcess(contEvent, ctx)
return nil
}

Expand All @@ -219,10 +232,10 @@ func (ch *ContainerHandler) handleContainerTerminatedEvent(contEvent v1.Containe
return nil
}

func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData) error {
func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
switch contEvent.GetContainerEventType() {
case v1.ContainerRunning:
return ch.handleContainerRunningEvent(contEvent)
return ch.handleContainerRunningEvent(contEvent, ctx)

case v1.ContainerDeleted:
return ch.handleContainerTerminatedEvent(contEvent)
Expand All @@ -231,16 +244,22 @@ func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventD
}

func (ch *ContainerHandler) StartMainHandler() error {
go ch.afterTimerActions()
go ch.containerWatcher.StartWatchedOnContainers(ch.containersEventChan)
go func() {
_ = ch.afterTimerActions()
}()
go func() {
_ = ch.containerWatcher.StartWatchedOnContainers(ch.containersEventChan)
}()

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
contEvent := <-ch.containersEventChan
err := ch.handleNewContainerEvent(contEvent)
err := ch.handleNewContainerEvent(contEvent, ctx)
if err != nil {
if !errors.Is(err, containerAlreadyExistError) {
logger.L().Ctx(context.GetBackgroundContext()).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err))
logger.L().Ctx(ctx).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err))
}
}
span.End()
}
}
Loading

0 comments on commit 9dd48dd

Please sign in to comment.