Skip to content

Commit

Permalink
feat: implement new logging architecture v0 (#1071)
Browse files Browse the repository at this point in the history
## Description:
This PR finalizes v0 of centralized logging over Docker. Specifically,
it makes the following changes:

- Removes all `LogsDatabase` code (including loki implementations)
- "Re-activates" fluentbit `LogsCollectors` running per enclave
- Adds `LogsAggregator` component
- Implements a `LogsAggregator` using [Vector](https://vector.dev/) 
- Configures fluentbit logs collectors to send logs to vector logs
aggregators (as opposed to loki logs database)
- Configures the `APIContainer` to send logs to its respective enclave
fluentbit logs collector

## Is this change user facing?
NO

---------

Co-authored-by: preetrawal <15133250+Peeeekay@users.noreply.github.com>
  • Loading branch information
tedim52 and Peeeekay committed Aug 11, 2023
1 parent 64f0c20 commit c66c148
Show file tree
Hide file tree
Showing 64 changed files with 921 additions and 3,188 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -492,7 +492,7 @@ jobs:
- when:
condition:
and:
- equal: [ "kubernetes", << parameters.cli-cluster-backend >> ]
- equal: [ "kubernetes", << parameters.cli-cluster-backend >> ]
<<: *steps_prepare_testing_k8s_k3s

- when:
Expand Down
5 changes: 3 additions & 2 deletions cli/cli/helpers/engine_manager/engine_existence_guarantor.go
Expand Up @@ -18,7 +18,9 @@ import (

const (
// If set to empty, then we'll use whichever default version the launcher provides
defaultEngineImageVersionTag = ""
defaultEngineImageVersionTag = ""
shouldForceLogsComponentsContainersRestartWhenEngineContainerIsStopped = true
shouldForceLogsComponentsContainersRestartWhenEngineContainerIsRunning = false
)

var engineRestartCmd = fmt.Sprintf(
Expand Down Expand Up @@ -194,7 +196,6 @@ func (guarantor *engineExistenceGuarantor) VisitContainerRunningButServerNotResp
}

func (guarantor *engineExistenceGuarantor) VisitRunning() error {

guarantor.postVisitingHostMachineIpAndPort = guarantor.preVisitingMaybeHostMachineIpAndPort
runningEngineSemver, cliEngineSemver, err := guarantor.getRunningAndCLIEngineVersions()
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cli/cli/helpers/engine_manager/engine_manager.go
Expand Up @@ -281,7 +281,6 @@ func (manager *EngineManager) StopEngineIdempotently(ctx context.Context) error
}

logrus.Debugf("Destroyed signal sent to engines %v", successfulDestroyedEngineGuids)

return nil
}

Expand Down
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/docker/docker/client"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_collector_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider/label_key_consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/object_attributes_provider/label_value_consts"
Expand Down Expand Up @@ -196,10 +197,16 @@ func getDockerKurtosisBackend(
networkIp := network.GetIpAndMask().IP
apiContainerIp := optionalApiContainerModeArgs.APIContainerIP

logsCollectorObj, err := logs_collector_functions.GetLogsCollectorForEnclave(ctx, enclaveUuid, dockerManager)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred while getting the logs collector object for enclave '%v'; This is a bug in Kurtosis", enclaveUuid)
}

alreadyTakenIps := map[string]bool{
networkIp.String(): true,
network.GetGatewayIp(): true,
apiContainerIp.String(): true,
logsCollectorObj.GetEnclaveNetworkIpAddress().String(): true,
}

freeIpAddrProvider, err := free_ip_addr_tracker.GetOrCreateNewFreeIpAddrTracker(
Expand Down
Expand Up @@ -3,10 +3,10 @@ package docker_kurtosis_backend
import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/engine_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions/implementations/vector"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_collector_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_collector_functions/implementations/fluentbit"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_database_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_database_functions/implementations/loki"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/user_services_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
Expand All @@ -19,8 +19,8 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/logs_aggregator"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/logs_collector"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/logs_database"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/database_accessors/enclave_db/free_ip_addr_tracker"
"github.com/kurtosis-tech/stacktrace"
Expand Down Expand Up @@ -218,11 +218,28 @@ func (backend *DockerKurtosisBackend) StartRegisteredUserServices(ctx context.Co
)
}

logsCollector, err := backend.GetLogsCollectorForEnclave(ctx, enclaveUuid)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting the logs collector")
}
if logsCollector == nil || logsCollector.GetStatus() != container_status.ContainerStatus_Running {
return nil, nil, stacktrace.NewError("The user services can't be started because no logs collector is running for it to send logs to.")
}

logsCollectorIpAddressInEnclaveNetwork := logsCollector.GetEnclaveNetworkIpAddress()
if logsCollectorIpAddressInEnclaveNetwork == nil {
return nil, nil, stacktrace.NewError("Expected the logs collector to have an ip address in the enclave network but it does not.")
}

logsCollectorAvailabilityChecker := fluentbit.NewFluentbitAvailabilityChecker(logsCollectorIpAddressInEnclaveNetwork, logsCollector.GetPrivateHttpPort().GetNumber())

successfullyStartedService, failedService, err := user_service_functions.StartRegisteredUserServices(
ctx,
enclaveUuid,
services,
serviceRegistrationsForEnclave,
logsCollector,
logsCollectorAvailabilityChecker,
backend.objAttrsProvider,
freeIpAddrProviderForEnclave,
backend.dockerManager)
Expand Down Expand Up @@ -375,57 +392,36 @@ func (backend *DockerKurtosisBackend) DestroyUserServices(
return successfullyDestroyedServices, failedServices, nil
}

func (backend *DockerKurtosisBackend) CreateLogsDatabase(
ctx context.Context,
logsDatabaseHttpPortNumber uint16,
) (
*logs_database.LogsDatabase,
error,
) {

//Declaring the implementation
logsDatabaseContainer := loki.NewLokiLogDatabaseContainer()
func (backend *DockerKurtosisBackend) CreateLogsAggregator(ctx context.Context) (*logs_aggregator.LogsAggregator, error) {
logsAggregatorContainer := vector.NewVectorLogsAggregatorContainer() //Declaring the implementation

logsDatabase, err := logs_database_functions.CreateLogsDatabase(
logsAggregator, _, err := logs_aggregator_functions.CreateLogsAggregator(
ctx,
logsDatabaseHttpPortNumber,
logsDatabaseContainer,
logsAggregatorContainer,
backend.dockerManager,
backend.objAttrsProvider,
)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred creating the logs database using the logs database container '%+v' and the HTTP port number '%v'", logsDatabaseContainer, logsDatabaseHttpPortNumber)
return nil, stacktrace.Propagate(err, "An error occurred creating the logs aggregator using the logs aggregator container '%+v'.", logsAggregatorContainer)
}
return logsDatabase, nil
return logsAggregator, nil
}

// If nothing is found returns nil
func (backend *DockerKurtosisBackend) GetLogsDatabase(
ctx context.Context,
) (
resultMaybeLogsDatabase *logs_database.LogsDatabase,
resultErr error,
) {
maybeLogsDatabase, err := logs_database_functions.GetLogsDatabase(
func (backend *DockerKurtosisBackend) GetLogsAggregator(ctx context.Context) (*logs_aggregator.LogsAggregator, error) {
maybeLogsAggregator, err := logs_aggregator_functions.GetLogsAggregator(
ctx,
backend.dockerManager,
)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting the logs database")
return nil, stacktrace.Propagate(err, "An error occurred getting the logs aggregator")
}

return maybeLogsDatabase, nil
return maybeLogsAggregator, nil
}

func (backend *DockerKurtosisBackend) DestroyLogsDatabase(
ctx context.Context,
) error {

if err := logs_database_functions.DestroyLogsDatabase(
ctx,
backend.dockerManager,
); err != nil {
return stacktrace.Propagate(err, "An error occurred destroying the logs database")
func (backend *DockerKurtosisBackend) DestroyLogsAggregator(ctx context.Context) error {
if err := logs_aggregator_functions.DestroyLogsAggregator(ctx, backend.dockerManager); err != nil {
return stacktrace.Propagate(err, "An error occurred destroying the logs aggregator")
}

return nil
Expand All @@ -440,15 +436,13 @@ func (backend *DockerKurtosisBackend) CreateLogsCollectorForEnclave(
*logs_collector.LogsCollector,
error,
) {

//TODO we we'd have to replace this part if we ever wanted to send to an external source
logsDatabase, err := backend.GetLogsDatabase(ctx)
logsAggregator, err := backend.GetLogsAggregator(ctx)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting the logs database; the logs collector cannot be run without a logs database")
return nil, stacktrace.Propagate(err, "An error occurred getting the logs aggregator; the logs collector cannot be run without a logs aggregator")
}

if logsDatabase == nil || logsDatabase.GetStatus() != container_status.ContainerStatus_Running {
return nil, stacktrace.NewError("The logs database is not running; the logs collector cannot be run without a running logs database")
if logsAggregator == nil || logsAggregator.GetStatus() != container_status.ContainerStatus_Running {
return nil, stacktrace.NewError("The logs aggregator is not running; the logs collector cannot be run without a running logs aggregator")
}

//Declaring the implementation
Expand All @@ -460,7 +454,7 @@ func (backend *DockerKurtosisBackend) CreateLogsCollectorForEnclave(
logsCollectorTcpPortNumber,
logsCollectorHttpPortNumber,
logsCollectorContainer,
logsDatabase,
logsAggregator,
backend.dockerManager,
backend.objAttrsProvider,
)
Expand Down
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/docker/go-connections/nat"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_collector_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
Expand Down Expand Up @@ -68,10 +69,16 @@ func (backend *DockerKurtosisBackend) CreateAPIContainer(
return nil, stacktrace.Propagate(err, "An error occurred getting enclave network by enclave UUID '%v'", enclaveUuid)
}

enclaveLogsCollector, err := backend.GetLogsCollectorForEnclave(ctx, enclaveUuid)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred while getting the logs collector for enclave '%v; This is a bug in Kurtosis'", enclaveUuid)
}

networkCidr := enclaveNetwork.GetIpAndMask()
alreadyTakenIps := map[string]bool{
networkCidr.IP.String(): true,
enclaveNetwork.GetGatewayIp(): true,
networkCidr.IP.String(): true,
enclaveNetwork.GetGatewayIp(): true,
enclaveLogsCollector.GetEnclaveNetworkIpAddress().String(): true,
}

ipAddr, err := network_helpers.GetFreeIpAddrFromSubnet(alreadyTakenIps, networkCidr)
Expand Down Expand Up @@ -141,6 +148,16 @@ func (backend *DockerKurtosisBackend) CreateAPIContainer(
labelStrs[labelKey.GetString()] = labelValue.GetString()
}

//The APIContainer will be configured to send the logs to the Fluentbit logs collector server
logCollectorAddr, err := enclaveLogsCollector.GetEnclaveNetworkAddressString()
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred retrieving the log collector address.")
}
fluentdLoggingDriverCnfg := docker_manager.NewFluentdLoggingDriver(
logCollectorAddr,
logs_collector_functions.GetKurtosisTrackedLogsCollectorLabels(),
)

createAndStartArgs := docker_manager.NewCreateAndStartContainerArgsBuilder(
image,
apiContainerAttrs.GetName().GetString(),
Expand All @@ -157,6 +174,8 @@ func (backend *DockerKurtosisBackend) CreateAPIContainer(
ipAddr,
).WithLabels(
labelStrs,
).WithLoggingDriver(
fluentdLoggingDriverCnfg,
).Build()

if err = backend.dockerManager.FetchImage(ctx, image); err != nil {
Expand Down
Expand Up @@ -21,6 +21,9 @@ const (
shouldFetchStoppedContainersWhenGettingEnclaveStatus = true

shouldFetchStoppedContainersWhenDumpingEnclave = true

defaultHttpLogsCollectorPortNum = uint16(9712)
defaultTcpLogsCollectorPortNum = uint16(9713)
)

// TODO: MIGRATE THIS FOLDER TO USE STRUCTURE OF USER_SERVICE_FUNCTIONS MODULE
Expand Down Expand Up @@ -142,6 +145,21 @@ func (backend *DockerKurtosisBackend) CreateEnclave(ctx context.Context, enclave

newEnclave := enclave.NewEnclave(enclaveUuid, enclaveName, enclave.EnclaveStatus_Empty, &creationTime)

// TODO the logs collector has a random private ip address in the enclave network that must be tracked
if _, err := backend.CreateLogsCollectorForEnclave(ctx, enclaveUuid, defaultTcpLogsCollectorPortNum, defaultHttpLogsCollectorPortNum); err != nil {
return nil, stacktrace.Propagate(err, "An error occurred creating the logs collector with TCP port number '%v' and HTTP port number '%v'", defaultTcpLogsCollectorPortNum, defaultHttpLogsCollectorPortNum)
}
shouldDeleteLogsCollector := true
defer func() {
if shouldDeleteLogsCollector {
err = backend.DestroyLogsCollectorForEnclave(ctx, enclaveUuid)
if err != nil {
logrus.Errorf("Couldn't cleanup logs collector for enclave '%v' as the following error was thrown:\n%v", enclaveUuid, err)
}
}
}()

shouldDeleteLogsCollector = false
shouldDeleteNetwork = false
shouldDeleteVolume = false
return newEnclave, nil
Expand Down
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/docker/go-connections/nat"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/consts"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions/implementations/vector"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
Expand Down Expand Up @@ -63,6 +65,31 @@ func CreateEngine(
)
}

engineNetwork, err := shared_helpers.GetEngineAndLogsComponentsNetwork(ctx, dockerManager)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting the engine network")
}
targetNetworkId := engineNetwork.GetId()

logrus.Infof("Starting the centralized logs components...")
logsAggregatorContainer := vector.NewVectorLogsAggregatorContainer() // Declaring implementation
_, removeLogsAggregatorFunc, err := logs_aggregator_functions.CreateLogsAggregator(
ctx,
logsAggregatorContainer,
dockerManager,
objAttrsProvider)
if err != nil {
return nil, stacktrace.Propagate(err,
"An error occurred attempting to create logging components for engine with GUID '%v' in Docker network with network id '%v'.", engineGuidStr, targetNetworkId)
}
shouldRemoveCentralizedLogComponents := true
defer func() {
if shouldRemoveCentralizedLogComponents {
removeLogsAggregatorFunc()
}
}()
logrus.Infof("Centralized logs components started.")

httpPortSpec, err := port_spec.NewPortSpec(uint16(frontendPortSpec), consts.EngineTransportProtocol, consts.HttpApplicationProtocol, defaultWait)
if err != nil {
return nil, stacktrace.Propagate(
Expand Down Expand Up @@ -123,12 +150,6 @@ func CreateEngine(
labelStrs[labelKey.GetString()] = labelValue.GetString()
}

engineNetwork, err := shared_helpers.GetEngineAndLogsComponentsNetwork(ctx, dockerManager)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred getting the engine network")
}
targetNetworkId := engineNetwork.GetId()

createAndStartArgs := docker_manager.NewCreateAndStartContainerArgsBuilder(
containerImageAndTag,
engineAttrs.GetName().GetString(),
Expand Down Expand Up @@ -185,6 +206,7 @@ func CreateEngine(
return nil, stacktrace.Propagate(err, "An error occurred creating an engine object from container with GUID '%v'", containerId)
}

shouldRemoveCentralizedLogComponents = false
shouldKillEngineContainer = false
return result, nil
}
Expand Up @@ -2,6 +2,7 @@ package engine_functions

import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_operation_parallelizer"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/engine"
Expand Down Expand Up @@ -61,5 +62,13 @@ func DestroyEngines(
)
}

// This is a small hack so that the log aggregator isn't cleaned while trying to remove stopped engines (eg. kurtosis clean -a)
shouldRemoveLogComponents := len(matchingEnginesByContainerId) == 0
if shouldRemoveLogComponents {
if err := logs_aggregator_functions.DestroyLogsAggregator(ctx, dockerManager); err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred removing the logging components.")
}
}

return successfulGuids, erroredGuids, nil
}
@@ -0,0 +1,5 @@
package logs_aggregator_functions

const (
defaultLogsListeningPortNum = uint16(9714)
)

0 comments on commit c66c148

Please sign in to comment.