Skip to content
This repository has been archived by the owner on May 30, 2022. It is now read-only.

Project telemetry data #418

Merged
13 changes: 7 additions & 6 deletions agent/discovery/collector/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/afero"
"github.com/spf13/viper"
"github.com/stretchr/testify/suite"
_ "github.com/trento-project/trento/test"
"github.com/trento-project/trento/test/helpers"
)

Expand All @@ -35,9 +36,9 @@ func (suite *CollectorClientTestSuite) TestCollectorClient_NewClientWithTLS() {
EnablemTLS: true,
CollectorHost: "localhost",
CollectorPort: 8081,
Cert: "../../../test/certs/client-cert.pem",
Key: "../../../test/certs/client-key.pem",
CA: "../../../test/certs/ca-cert.pem",
Cert: "./test/certs/client-cert.pem",
nelsonkopliku marked this conversation as resolved.
Show resolved Hide resolved
Key: "./test/certs/client-key.pem",
CA: "./test/certs/ca-cert.pem",
})

suite.NoError(err)
Expand Down Expand Up @@ -69,9 +70,9 @@ func (suite *CollectorClientTestSuite) TestCollectorClient_PublishingSuccess() {
EnablemTLS: true,
CollectorHost: "localhost",
CollectorPort: 8081,
Cert: "../../../test/certs/client-cert.pem",
Key: "../../../test/certs/client-key.pem",
CA: "../../../test/certs/ca-cert.pem",
Cert: "./test/certs/client-cert.pem",
Key: "./test/certs/client-key.pem",
CA: "./test/certs/ca-cert.pem",
})

suite.NoError(err)
Expand Down
15 changes: 8 additions & 7 deletions agent/discovery/collector/publishing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/suite"
"github.com/trento-project/trento/agent/discovery/mocks"
_ "github.com/trento-project/trento/test"
"github.com/trento-project/trento/test/helpers"
)

Expand All @@ -36,9 +37,9 @@ func (suite *PublishingTestSuite) SetupSuite() {
EnablemTLS: true,
CollectorHost: "localhost",
CollectorPort: 8443,
Cert: "../../../test/certs/client-cert.pem",
Key: "../../../test/certs/client-key.pem",
CA: "../../../test/certs/ca-cert.pem",
Cert: "./test/certs/client-cert.pem",
Key: "./test/certs/client-key.pem",
CA: "./test/certs/ca-cert.pem",
})
suite.NoError(err)

Expand All @@ -52,7 +53,7 @@ func (suite *PublishingTestSuite) TestCollectorClient_PublishingClusterDiscovery
discoveredCluster := mocks.NewDiscoveredClusterMock()

suite.runDiscoveryScenario(discoveryType, discoveredCluster, func(requestBodyAgainstCollector string) {
suite.assertJsonMatchesJsonFileContent("../../../test/fixtures/discovery/cluster/expected_published_cluster_discovery.json", requestBodyAgainstCollector)
suite.assertJsonMatchesJsonFileContent("./test/fixtures/discovery/cluster/expected_published_cluster_discovery.json", requestBodyAgainstCollector)
})
}

Expand All @@ -61,7 +62,7 @@ func (suite *PublishingTestSuite) TestCollectorClient_PublishingCloudDiscovery()
discoveredCloudInstance := mocks.NewDiscoveredCloudMock()

suite.runDiscoveryScenario(discoveryType, discoveredCloudInstance, func(requestBodyAgainstCollector string) {
suite.assertJsonMatchesJsonFileContent("../../../test/fixtures/discovery/azure/expected_published_cloud_discovery.json", requestBodyAgainstCollector)
suite.assertJsonMatchesJsonFileContent("./test/fixtures/discovery/azure/expected_published_cloud_discovery.json", requestBodyAgainstCollector)
})
}

Expand All @@ -70,7 +71,7 @@ func (suite *PublishingTestSuite) TestCollectorClient_PublishingHostDiscovery()
discoveredHost := mocks.NewDiscoveredHostMock()

suite.runDiscoveryScenario(discoveryType, discoveredHost, func(requestBodyAgainstCollector string) {
suite.assertJsonMatchesJsonFileContent("../../../test/fixtures/discovery/host/expected_published_host_discovery.json", requestBodyAgainstCollector)
suite.assertJsonMatchesJsonFileContent("./test/fixtures/discovery/host/expected_published_host_discovery.json", requestBodyAgainstCollector)
})
}

Expand All @@ -79,7 +80,7 @@ func (suite *PublishingTestSuite) TestCollectorClient_PublishingSubscriptionDisc
discoveredSubscriptions := mocks.NewDiscoveredSubscriptionsMock()

suite.runDiscoveryScenario(discoveryType, discoveredSubscriptions, func(requestBodyAgainstCollector string) {
suite.assertJsonMatchesJsonFileContent("../../../test/fixtures/discovery/subscriptions/expected_published_subscriptions_discovery.json", requestBodyAgainstCollector)
suite.assertJsonMatchesJsonFileContent("./test/fixtures/discovery/subscriptions/expected_published_subscriptions_discovery.json", requestBodyAgainstCollector)
})
}

Expand Down
4 changes: 2 additions & 2 deletions agent/discovery/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

log "github.com/sirupsen/logrus"
"github.com/trento-project/trento/agent/discovery/collector"
"github.com/trento-project/trento/agent/discovery/models"
"github.com/trento-project/trento/internal/consul"
"github.com/trento-project/trento/internal/hosts"
"github.com/trento-project/trento/version"
Expand Down Expand Up @@ -50,7 +49,8 @@ func (h HostDiscovery) Discover() (string, error) {
var si sysinfo.SysInfo
si.GetSysInfo()

host := models.DiscoveredHost{
host := hosts.DiscoveredHost{
OSVersion: si.OS.Version,
HostIpAddresses: ipAddresses,
HostName: h.discovery.host,
CPUCount: int(si.CPU.Cpus) * int(si.CPU.Cores),
Expand Down
2 changes: 1 addition & 1 deletion agent/discovery/mocks/discovered_cloud_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func NewDiscoveredCloudMock() cloud.CloudInstance {
metadata := &cloud.AzureMetadata{}

jsonFile, err := os.Open("../../../test/fixtures/discovery/azure/azure_discovery.json")
jsonFile, err := os.Open("./test/fixtures/discovery/azure/azure_discovery.json")
if err != nil {
panic(err)
}
Expand Down
10 changes: 5 additions & 5 deletions agent/discovery/mocks/discovered_cluster_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import "github.com/trento-project/trento/internal/cluster"

func NewDiscoveredClusterMock() cluster.Cluster {
cluster, _ := cluster.NewClusterWithDiscoveryTools(&cluster.DiscoveryTools{
CibAdmPath: "../../../test/fake_cibadmin.sh",
CrmmonAdmPath: "../../../test/fake_crm_mon.sh",
CorosyncKeyPath: "../../../test/authkey",
SBDPath: "../../../test/fake_sbd.sh",
SBDConfigPath: "../../../test/sbd_config",
CibAdmPath: "./test/fake_cibadmin.sh",
CrmmonAdmPath: "./test/fake_crm_mon.sh",
CorosyncKeyPath: "./test/authkey",
SBDPath: "./test/fake_sbd.sh",
SBDConfigPath: "./test/sbd_config",
})

return cluster
Expand Down
9 changes: 4 additions & 5 deletions agent/discovery/mocks/discovered_host_mock.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package mocks

import (
"github.com/trento-project/trento/agent/discovery/models"
)
import "github.com/trento-project/trento/internal/hosts"

func NewDiscoveredHostMock() models.DiscoveredHost {
return models.DiscoveredHost{
func NewDiscoveredHostMock() hosts.DiscoveredHost {
return hosts.DiscoveredHost{
OSVersion: "15.3",
HostIpAddresses: []string{"10.1.1.4", "10.1.1.5", "10.1.1.6"},
HostName: "thehostnamewherethediscoveryhappened",
CPUCount: 64,
Expand Down
2 changes: 1 addition & 1 deletion agent/discovery/mocks/discovered_subscription_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func NewDiscoveredSubscriptionsMock() subscription.Subscriptions {
var subs subscription.Subscriptions

jsonFile, err := os.Open("../../../test/fixtures/discovery/subscriptions/subscriptions_discovery.json")
jsonFile, err := os.Open("./test/fixtures/discovery/subscriptions/subscriptions_discovery.json")
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models
package hosts

type DiscoveredHost struct {
OSVersion string `json:"os_version"`
HostIpAddresses []string `json:"ip_addresses"`
HostName string `json:"hostname"`
CPUCount int `json:"cpu_count"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"agent_id": "the-machine-id",
"discovery_type": "host_discovery",
"payload": {
"os_version": "15.3",
"ip_addresses": [
"10.1.1.4",
"10.1.1.5",
Expand Down
17 changes: 17 additions & 0 deletions test/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package test

import (
"os"
"path"
"runtime"
)

// importing _ "github.com/trento-project/trento/test" in tests would set the cwd to the root of the repo
func init() {
nelsonkopliku marked this conversation as resolved.
Show resolved Hide resolved
_, filename, _, _ := runtime.Caller(0)
dir := path.Join(path.Dir(filename), "..")
err := os.Chdir(dir)
if err != nil {
panic(err)
}
}
3 changes: 2 additions & 1 deletion web/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func DefaultDependencies(config *Config) Dependencies {
func MigrateDB(db *gorm.DB) error {
err := db.AutoMigrate(
models.Tag{}, models.SelectedChecks{}, models.ConnectionSettings{}, models.CheckRaw{},
models.Cluster{}, datapipeline.DataCollectedEvent{}, datapipeline.Subscription{})
models.Cluster{}, datapipeline.DataCollectedEvent{}, datapipeline.Subscription{}, models.HostTelemetry{})

if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions web/datapipeline/clusters_projector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestClustersProjector_ClusterDiscoveryHandler(t *testing.T) {
ClusterType: models.ClusterTypeUnknown,
})

jsonFile, err := os.Open("../../test/fixtures/cluster_discovery_hana_scale_up.json")
jsonFile, err := os.Open("./test/fixtures/cluster_discovery_hana_scale_up.json")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestClustersProjector_ClusterDiscoveryHandler(t *testing.T) {
}

func TestTransformClusterListData_HANAScaleUp(t *testing.T) {
jsonFile, err := os.Open("../../test/fixtures/cluster_discovery_hana_scale_up.json")
jsonFile, err := os.Open("./test/fixtures/cluster_discovery_hana_scale_up.json")
if err != nil {
panic(err)
}
Expand All @@ -76,7 +76,7 @@ func TestTransformClusterListData_HANAScaleUp(t *testing.T) {
}

func TestTransformClusterListData_Unknown(t *testing.T) {
jsonFile, err := os.Open("../../test/fixtures/cluster_discovery_unknown.json")
jsonFile, err := os.Open("./test/fixtures/cluster_discovery_unknown.json")
if err != nil {
panic(err)
}
Expand Down
7 changes: 5 additions & 2 deletions web/datapipeline/data_collected_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
)

const (
ClusterDiscovery = "ha_cluster_discovery"
SAPsystemDiscovery = "sap_system_discovery"
ClusterDiscovery = "ha_cluster_discovery"
SAPsystemDiscovery = "sap_system_discovery"
HostDiscovery = "host_discovery"
SubscriptionDiscovery = "subscription_discovery"
CloudDiscovery = "cloud_discovery"
)

type DataCollectedEvent struct {
Expand Down
84 changes: 84 additions & 0 deletions web/datapipeline/host_telemetry_projector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package datapipeline

import (
"bytes"
"encoding/json"

log "github.com/sirupsen/logrus"
"github.com/trento-project/trento/internal/cloud"
"github.com/trento-project/trento/internal/hosts"
"github.com/trento-project/trento/web/models"
"gorm.io/datatypes"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func NewHostTelemetryProjector(db *gorm.DB) *projector {
telemetryProjector := NewProjector("host_telemetry", db)

telemetryProjector.AddHandler(HostDiscovery, hostTelemetryProjector_HostDiscoveryHandler)
telemetryProjector.AddHandler(CloudDiscovery, hostTelemetryProjector_CloudDiscoveryHandler)

return telemetryProjector
}

func hostTelemetryProjector_HostDiscoveryHandler(dataCollectedEvent *DataCollectedEvent, db *gorm.DB) error {
decoder := payloadDecoder(dataCollectedEvent.Payload)

var discoveredHost hosts.DiscoveredHost
if err := decoder.Decode(&discoveredHost); err != nil {
log.Errorf("can't decode data: %s", err)
return err
}

telemetryReadModel := models.HostTelemetry{
AgentID: dataCollectedEvent.AgentID,
SLESVersion: discoveredHost.OSVersion,
HostName: discoveredHost.HostName,
CPUCount: discoveredHost.CPUCount,
SocketCount: discoveredHost.SocketCount,
TotalMemoryMB: discoveredHost.TotalMemoryMB,
}

return storeHostTelemetry(db, telemetryReadModel,
"sles_version",
"host_name",
"cpu_count",
"socket_count",
"total_memory_mb",
)
}

func hostTelemetryProjector_CloudDiscoveryHandler(dataCollectedEvent *DataCollectedEvent, db *gorm.DB) error {
decoder := payloadDecoder(dataCollectedEvent.Payload)

var discoveredCloud cloud.CloudInstance
if err := decoder.Decode(&discoveredCloud); err != nil {
log.Errorf("can't decode data: %s", err)
return err
}

telemetryReadModel := models.HostTelemetry{
AgentID: dataCollectedEvent.AgentID,
CloudProvider: discoveredCloud.Provider,
}

return storeHostTelemetry(db, telemetryReadModel, "cloud_provider")
}

func payloadDecoder(payload datatypes.JSON) *json.Decoder {
data, _ := payload.MarshalJSON()
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.DisallowUnknownFields()

return decoder
}

func storeHostTelemetry(db *gorm.DB, telemetryReadModel models.HostTelemetry, updateColumns ...string) error {
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{
{Name: "agent_id"},
},
DoUpdates: clause.AssignmentColumns(updateColumns),
}).Create(&telemetryReadModel).Error
}
Loading