Skip to content

Commit

Permalink
fix telemetry and linting
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Apr 27, 2023
1 parent a0c8a23 commit 68b73f2
Show file tree
Hide file tree
Showing 23 changed files with 111 additions and 119 deletions.
2 changes: 1 addition & 1 deletion internal/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sniffer/pkg/config"
"syscall"

logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

Expand Down
8 changes: 4 additions & 4 deletions internal/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestInt8ToStr(t *testing.T) {
}

// Test with empty input
input2 := []int8{}
var input2 []int8
expected2 := ""
output2 := int8ToStr(input2)
if output2 != expected2 {
Expand Down Expand Up @@ -58,12 +58,12 @@ func TestCheckPrerequisites(t *testing.T) {
t.Fatalf("failed to set env %s with err %v", config.ConfigEnvVar, err)
}

config := config.GetConfigurationConfigContext()
configData, err := config.GetConfigurationReader()
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
t.Errorf("GetConfigurationReader failed with err %v", err)
}
err = config.ParseConfiguration(v1.CreateConfigData(), configData)
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
t.Fatalf("ParseConfiguration failed with err %v", err)
}
Expand Down
13 changes: 9 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ func main() {
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during getting configuration data", helpers.Error(err))
logger.L().Fatal("error during getting configuration data", helpers.Error(err))
}
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during parsing configuration", helpers.Error(err))
logger.L().Fatal("error during parsing configuration", helpers.Error(err))
}
err = validator.CheckPrerequisites()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during validation", helpers.Error(err))
logger.L().Fatal("error during validation", helpers.Error(err))
}

context.SetBackgroundContext()
// after this line we can use logger.L().Ctx() to attach events to spans

accumulatorChannelError := make(chan error, 10)
acc := accumulator.GetAccumulator()
Expand All @@ -49,5 +50,9 @@ func main() {
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during create the main container handler", helpers.Error(err))
}
mainHandler.StartMainHandler()

err = mainHandler.StartMainHandler()
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during start the main container handler", helpers.Error(err))
}
}
8 changes: 4 additions & 4 deletions pkg/config/v1/config_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestConfigData_GetClusterName(t *testing.T) {

func TestConfigData_SetNodeName(t *testing.T) {
expectedName := "node-1"
os.Setenv(nodeNameEnvVar, expectedName)
_ = os.Setenv(nodeNameEnvVar, expectedName)
c := &ConfigData{}
c.SetNodeName()
if c.NodeData.Name != expectedName {
Expand All @@ -142,7 +142,7 @@ func TestConfigData_SetNodeName(t *testing.T) {

func TestConfigData_SetNamespace(t *testing.T) {
expectedName := "namespace-1"
os.Setenv(NamespaceEnvVar, expectedName)
_ = os.Setenv(NamespaceEnvVar, expectedName)
c := &ConfigData{}
c.SetNamespace()
if c.Namespace != expectedName {
Expand All @@ -152,7 +152,7 @@ func TestConfigData_SetNamespace(t *testing.T) {

func TestConfigData_SetContainerName(t *testing.T) {
expectedName := "cont-1"
os.Setenv(ContainerNameEnvVar, expectedName)
_ = os.Setenv(ContainerNameEnvVar, expectedName)
c := &ConfigData{}
c.SetContainerName()
if c.ContainerName != expectedName {
Expand All @@ -162,7 +162,7 @@ func TestConfigData_SetContainerName(t *testing.T) {

func TestConfigData_SetBackgroundContextURL(t *testing.T) {
expectedName := "URL-1"
os.Setenv("OTEL_COLLECTOR_SVC", expectedName)
_ = os.Setenv("OTEL_COLLECTOR_SVC", expectedName)
c := &ConfigData{}
c.SetBackgroundContextURL()
if c.telemetryURL != expectedName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/conthandler/container_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (aggregator *Aggregator) collectDataFromContainerAccumulator(errChan chan e
for {
newEvent := <-aggregator.aggregationDataChan
if newEvent.GetEventCMD() == accumulator.DropEventOccurred {
// aggregator.StopAggregate()
_ = aggregator.StopAggregate()
errChan <- fmt.Errorf(newEvent.GetEventCMD())
break
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/conthandler/container_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (

accumulator "sniffer/pkg/event_data_storage"

instanceidhandler "github.com/kubescape/k8s-interface/instanceidhandler/v1"
"github.com/kubescape/k8s-interface/instanceidhandler/v1"
)

const (
RedisPodName = "redis-64bd97b5fc-kvh7r"
RedisImageID = "docker-pullable://redis@sha256:6a59f1cbb8d28ac484176d52c473494859a512ddba3ea62a547258cf16c9b3ae"
RedisContainerID = "16248df36c67"
RedisWLID = "wlid://cluster-test/namespace-any/deployment/redis"
RedisInstanceID = "apiVersion-v1/namespace-any/kind-deployment/name-redis/containerName-redis"
RedisPodName = "redis-64bd97b5fc-kvh7r"
RedisImageID = "docker-pullable://redis@sha256:6a59f1cbb8d28ac484176d52c473494859a512ddba3ea62a547258cf16c9b3ae"
RedisContainerID = "16248df36c67"
RedisWLID = "wlid://cluster-test/namespace-any/deployment/redis"

NumberOfRedisEventInTheMockAfterFilterDuplicated = 73
)

Expand Down
20 changes: 12 additions & 8 deletions pkg/conthandler/container_main_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func (ch *ContainerHandler) afterTimerActions() error {

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
defer span.End()
afterTimerActionsData := <-ch.afterTimerActionsChannel
containerDataInterface, exist := ch.watchedContainers.Load(afterTimerActionsData.containerID)
if !exist {
logger.L().Ctx(context.GetBackgroundContext()).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
span.End()
continue
}
containerData := containerDataInterface.(watchedContainerData)
Expand All @@ -93,6 +93,7 @@ func (ch *ContainerHandler) afterTimerActions() error {

if err = <-containerData.syncChannel[StepGetSBOM]; err != nil {
logger.L().Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.FilterSBOM(fileList); err != nil {
Expand All @@ -104,9 +105,11 @@ func (ch *ContainerHandler) afterTimerActions() error {
if !errors.Is(err, sbom.IsAlreadyExist()) {
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
}
span.End()
continue
}
logger.L().Info("filtered SBOM has been stored successfully", []helpers.IDetails{helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID())}...)
span.End() // defer in infinite loop never runs
}
}
}
Expand Down Expand Up @@ -138,7 +141,7 @@ func createTicker() *time.Ticker {

func (ch *ContainerHandler) deleteResources(watchedContainer watchedContainerData, contEvent v1.ContainerEventData) {
watchedContainer.snifferTicker.Stop()
watchedContainer.containerAggregator.StopAggregate()
_ = watchedContainer.containerAggregator.StopAggregate()
watchedContainer.sbomClient.CleanResources()
ch.watchedContainers.Delete(contEvent.GetContainerID())
}
Expand Down Expand Up @@ -167,15 +170,13 @@ func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventDat
err = ch.startTimer(watchedContainer, contEvent.GetContainerID())
if err != nil {
if errors.Is(err, droppedEventsError) {
ctx, span := otel.Tracer("").Start(ctx, "dropped events.", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
logger.L().Ctx(ctx).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
span.End()
}
} else if errors.Is(err, containerHasTerminatedError) {
break
}
}
logger.L().Ctx(ctx).Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
logger.L().Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
ch.deleteResources(watchedContainer, contEvent)
}

Expand Down Expand Up @@ -203,7 +204,6 @@ func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEv
if exist {
return containerAlreadyExistError
}

logger.L().Info("new container has loaded - start monitor it", []helpers.IDetails{helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID())}...)
newWatchedContainer := watchedContainerData{
containerAggregator: CreateAggregator(getShortContainerID(contEvent.GetContainerID())),
Expand Down Expand Up @@ -244,8 +244,12 @@ func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventD
}

func (ch *ContainerHandler) StartMainHandler() error {
go ch.afterTimerActions()
go ch.containerWatcher.StartWatchedOnContainers(ch.containersEventChan)
go func() {
_ = ch.afterTimerActions()
}()
go func() {
_ = ch.containerWatcher.StartWatchedOnContainers(ch.containersEventChan)
}()

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
Expand Down
19 changes: 4 additions & 15 deletions pkg/conthandler/container_main_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,13 @@ import (
"testing"
"time"

instanceidhandler "github.com/kubescape/k8s-interface/instanceidhandler/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
"github.com/kubescape/k8s-interface/instanceidhandler/v1"
)

const (
RedisContainerIDContHandler = "docker://16248df36c67807ca5c429e6f021fe092e14a27aab89cbde00ba801de0f05266"
)

var watcherMainHandler *watch.FakeWatcher

type k8sFakeClientMainHandler struct {
Clientset *fake.Clientset
}

func (client *k8sFakeClientMainHandler) GetWatcher() (watch.Interface, error) {
watcherMainHandler = watch.NewFake()
return watcherMainHandler, nil
}

func TestContMainHandler(t *testing.T) {
configPath := path.Join(utils.CurrentDir(), "..", "..", "configuration", "ConfigurationFile.json")
err := os.Setenv(config.ConfigEnvVar, configPath)
Expand Down Expand Up @@ -61,7 +48,9 @@ func TestContMainHandler(t *testing.T) {
if err != nil {
t.Fatalf("CreateContainerHandler failed with err %v", err)
}
go contHandler.afterTimerActions()
go func() {
_ = contHandler.afterTimerActions()
}()
go func() {
RedisInstanceID := instanceidhandler.InstanceID{}
RedisInstanceID.SetAPIVersion("apps/v1")
Expand Down
6 changes: 3 additions & 3 deletions pkg/conthandler/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

"sniffer/pkg/context"

wlid "github.com/armosec/utils-k8s-go/wlid"
"github.com/armosec/utils-k8s-go/wlid"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
instanceidhandler "github.com/kubescape/k8s-interface/instanceidhandler"
"github.com/kubescape/k8s-interface/instanceidhandler"
instanceidhandlerV1 "github.com/kubescape/k8s-interface/instanceidhandler/v1"
k8sinterface "github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/k8s-interface/workloadinterface"
core "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
14 changes: 8 additions & 6 deletions pkg/conthandler/container_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ type k8sFakeClient struct {
Clientset *fake.Clientset
}

func (client *k8sFakeClient) GetApiVersion(workload any) string {
func (client *k8sFakeClient) GetApiVersion(_ any) string {
return "v1"
}

func (client *k8sFakeClient) GetResourceVersion(workload any) string {
func (client *k8sFakeClient) GetResourceVersion(_ any) string {
return "1234wat"
}

func (client *k8sFakeClient) CalculateWorkloadParentRecursive(workload any) (string, string, error) {
func (client *k8sFakeClient) CalculateWorkloadParentRecursive(_ any) (string, string, error) {
return "deployment", "nginx", nil
}

func (client *k8sFakeClient) GetWorkload(namespace, kind, name string) (any, error) {
func (client *k8sFakeClient) GetWorkload(_, _, _ string) (any, error) {
return "", nil
}

func (client *k8sFakeClient) GenerateWLID(workload any, clusterName string) string {
func (client *k8sFakeClient) GenerateWLID(_ any, clusterName string) string {
return "wlid://cluster-" + clusterName + "/namespace-any" + "/deployment-nginx"
}

Expand Down Expand Up @@ -73,7 +73,9 @@ func TestContWatcher(t *testing.T) {

containersEventChan := make(chan conthadlerV1.ContainerEventData, 50)
go func() {
go contWatcher.StartWatchedOnContainers(containersEventChan)
go func() {
_ = contWatcher.StartWatchedOnContainers(containersEventChan)
}()
time.Sleep(1 * time.Second)
go func() {
pod := &v1.Pod{
Expand Down
2 changes: 1 addition & 1 deletion pkg/conthandler/v1/container_handler_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"strings"

instanceidhandler "github.com/kubescape/k8s-interface/instanceidhandler"
"github.com/kubescape/k8s-interface/instanceidhandler"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpfeng/ebpf_engine_interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ebpfeng

import ebpfev "sniffer/pkg/ebpfev/v1"
import "sniffer/pkg/ebpfev/v1"

type EbpfEngineClient interface {
StartEbpfEngine() error
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpfeng/falco_sniffer_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"sniffer/pkg/config"
"sniffer/pkg/context"
ebpfev "sniffer/pkg/ebpfev/v1"
"sniffer/pkg/ebpfev/v1"

logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

Expand Down
12 changes: 6 additions & 6 deletions pkg/ebpfeng/falco_sniffer_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ func TestCreateFalcoEbpfEngine(t *testing.T) {
t.Fatalf("failed to set env %s with err %v", config.ConfigEnvVar, err)
}

config := config.GetConfigurationConfigContext()
configData, err := config.GetConfigurationReader()
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
t.Errorf("GetConfigurationReader failed with err %v", err)
}
err = config.ParseConfiguration(v1.CreateConfigData(), configData)
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
t.Fatalf("ParseConfiguration failed with err %v", err)
}
Expand All @@ -159,12 +159,12 @@ func TestEbpfEngineCMDWithParams(t *testing.T) {
t.Fatalf("failed to set env %s with err %v", config.ConfigEnvVar, err)
}

config := config.GetConfigurationConfigContext()
configData, err := config.GetConfigurationReader()
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
if err != nil {
t.Errorf("GetConfigurationReader failed with err %v", err)
}
err = config.ParseConfiguration(v1.CreateConfigData(), configData)
err = cfg.ParseConfiguration(v1.CreateConfigData(), configData)
if err != nil {
t.Fatalf("ParseConfiguration failed with err %v", err)
}
Expand Down
Loading

0 comments on commit 68b73f2

Please sign in to comment.