Skip to content

Commit

Permalink
feat: implement Talos kernel log receiver
Browse files Browse the repository at this point in the history
Fixes #527

Talos logs (see siderolabs/talos#4600) are
delivered to Sidero over the SideroLink tunnel.

Logs can be seen with:

```
$ kubectl logs -n sidero-system deployment/sidero-controller-manager -c serverlogs -f
{"clock":67194673,"cluster":"management-cluster","facility":"user","machine":"default/management-cluster-cp-4j8f4","metal_machine":"default/management-cluster-cp-hbq57","msg":"[talos] phase bootloader (19/19): done, 176.795226ms\n","priority":"warning","seq":768,"server_uuid":"5b72932a-c482-4aa5-b00e-4b8773d3ac48","talos-level":"warn","talos-time":"2021-11-26T19:34:42.444342392Z"}
```

Logs are annotated on the fly with the information about `Server`,
`MetalMachine`, `Machine` and `Cluster`.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Nov 30, 2021
1 parent 5bf7c21 commit ab12b81
Show file tree
Hide file tree
Showing 16 changed files with 449 additions and 166 deletions.
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ ARG GO_LDFLAGS
RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /siderolink-manager ./app/sidero-controller-manager/cmd/siderolink-manager
RUN chmod +x /siderolink-manager

FROM base AS build-log-receiver
ARG TALOS_RELEASE
ARG TARGETARCH
ARG GO_BUILDFLAGS
ARG GO_LDFLAGS
RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /log-receiver ./app/sidero-controller-manager/cmd/log-receiver
RUN chmod +x /log-receiver

FROM base AS agent-build-amd64
ARG GO_BUILDFLAGS
ARG GO_LDFLAGS
Expand Down Expand Up @@ -194,6 +202,7 @@ COPY --from=pkg-kernel-amd64 /boot/vmlinuz /var/lib/sidero/env/agent-amd64/vmlin
COPY --from=pkg-kernel-arm64 /boot/vmlinuz /var/lib/sidero/env/agent-arm64/vmlinuz
COPY --from=build-sidero-controller-manager /manager /manager
COPY --from=build-siderolink-manager /siderolink-manager /siderolink-manager
COPY --from=build-log-receiver /log-receiver /log-receiver

FROM sidero-controller-manager-image AS sidero-controller-manager
LABEL org.opencontainers.image.source https://github.com/talos-systems/sidero
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ MODULE := $(shell head -1 go.mod | cut -d' ' -f2)

ARTIFACTS := _out
TEST_PKGS ?= ./...
TALOS_RELEASE ?= v0.13.3
TALOS_RELEASE ?= v0.14.0-alpha.2
DEFAULT_K8S_VERSION ?= v1.22.3

TOOLS ?= ghcr.io/talos-systems/tools:v0.8.0
Expand Down
138 changes: 138 additions & 0 deletions app/sidero-controller-manager/cmd/log-receiver/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"go.uber.org/zap"
"inet.af/netaddr"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/talos-systems/siderolink/pkg/logreceiver"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
)

type sourceAnnotation struct {
ServerUUID string
MetalMachineName string
MachineName string
ClusterName string
}

var sourceMap sync.Map

func fetchSourceAnnotation(ctx context.Context, metalClient runtimeclient.Client, srcAddr netaddr.IP) (sourceAnnotation, error) {
var (
annotation sourceAnnotation
serverbindings sidero.ServerBindingList
)

if err := metalClient.List(ctx, &serverbindings); err != nil {
return annotation, fmt.Errorf("error getting server bindings: %w", err)
}

srcAddress := srcAddr.String()

var serverBinding *sidero.ServerBinding

for _, item := range serverbindings.Items {
item := item

if strings.HasPrefix(item.Spec.SideroLink.NodeAddress, srcAddress) {
serverBinding = &item

break
}
}

if serverBinding == nil {
// no matching server binding, leave things as is
return annotation, nil
}

annotation.ServerUUID = serverBinding.Name
annotation.MetalMachineName = fmt.Sprintf("%s/%s", serverBinding.Spec.MetalMachineRef.Namespace, serverBinding.Spec.MetalMachineRef.Name)
annotation.ClusterName = serverBinding.Labels[clusterv1.ClusterLabelName]

var metalMachine sidero.MetalMachine

if err := metalClient.Get(ctx,
types.NamespacedName{
Namespace: serverBinding.Spec.MetalMachineRef.Namespace,
Name: serverBinding.Spec.MetalMachineRef.Name,
},
&metalMachine); err != nil {
return annotation, fmt.Errorf("error getting metal machine: %w", err)
}

for _, ref := range metalMachine.OwnerReferences {
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
continue
}

if ref.Kind == "Machine" && gv.Group == clusterv1.GroupVersion.Group {
annotation.MachineName = fmt.Sprintf("%s/%s", metalMachine.Namespace, ref.Name)

break
}
}

return annotation, nil
}

func logHandler(metalClient runtimeclient.Client, logger *zap.Logger) logreceiver.Handler {
return func(srcAddr netaddr.IP, msg map[string]interface{}) {
var annotation sourceAnnotation

v, ok := sourceMap.Load(srcAddr)
if !ok {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

var err error

annotation, err = fetchSourceAnnotation(ctx, metalClient, srcAddr)
if err != nil {
logger.Error("error fetching server information", zap.Error(err), zap.Stringer("source_addr", srcAddr))
}

sourceMap.Store(srcAddr, annotation)
} else {
annotation = v.(sourceAnnotation)
}

if annotation.ServerUUID != "" {
msg["server_uuid"] = annotation.ServerUUID
}

if annotation.ClusterName != "" {
msg["cluster"] = annotation.ClusterName
}

if annotation.MetalMachineName != "" {
msg["metal_machine"] = annotation.MetalMachineName
}

if annotation.MachineName != "" {
msg["machine"] = annotation.MachineName
}

if err := json.NewEncoder(os.Stdout).Encode(msg); err != nil {
logger.Error("error printing log message", zap.Error(err))
}
}
}
33 changes: 33 additions & 0 deletions app/sidero-controller-manager/cmd/log-receiver/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
)

func getMetalClient() (runtimeclient.Client, *rest.Config, error) {
kubeconfig := ctrl.GetConfigOrDie()

scheme := runtime.NewScheme()

if err := clientgoscheme.AddToScheme(scheme); err != nil {
return nil, nil, err
}

if err := sidero.AddToScheme(scheme); err != nil {
return nil, nil, err
}

client, err := runtimeclient.New(kubeconfig, runtimeclient.Options{Scheme: scheme})

return client, kubeconfig, err
}
76 changes: 76 additions & 0 deletions app/sidero-controller-manager/cmd/log-receiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"errors"
"fmt"
"net"
"os"
"os/signal"
"syscall"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink"
"github.com/talos-systems/siderolink/pkg/logreceiver"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
}

func run() error {
logger, err := zap.NewProduction()
if err != nil {
return fmt.Errorf("failed to initialize logger: %w", err)
}

zap.ReplaceGlobals(logger)
zap.RedirectStdLog(logger)

metalclient, _, err := getMetalClient()
if err != nil {
return fmt.Errorf("error building runtime client: %w", err)
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

eg, ctx := errgroup.WithContext(ctx)

listener, err := net.Listen("tcp", fmt.Sprintf(":%d", siderolink.LogReceiverPort))
if err != nil {
return fmt.Errorf("error listening for log endpoint: %w", err)
}

srv, err := logreceiver.NewServer(logger, listener, logHandler(metalclient, logger))
if err != nil {
return fmt.Errorf("error initializing log receiver: %w", err)
}

eg.Go(func() error {
return srv.Serve()
})

eg.Go(func() error {
<-ctx.Done()

srv.Stop()

return nil
})

if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}

return nil
}
14 changes: 6 additions & 8 deletions app/sidero-controller-manager/cmd/siderolink-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,18 @@ func run() error {
return fmt.Errorf("error listening for gRPC API: %w", err)
}

cfg := siderolink.Config{
WireguardEndpoint: fmt.Sprintf("%s:%d", wireguardEndpoint, wireguardPort),
}
siderolink.Cfg.WireguardEndpoint = fmt.Sprintf("%s:%d", wireguardEndpoint, wireguardPort)

if err = cfg.LoadOrCreate(ctx, metalclient); err != nil {
if err = siderolink.Cfg.LoadOrCreate(ctx, metalclient); err != nil {
return err
}

wireguardEndpoint, err := netaddr.ParseIPPort(cfg.WireguardEndpoint)
wireguardEndpoint, err := netaddr.ParseIPPort(siderolink.Cfg.WireguardEndpoint)
if err != nil {
return fmt.Errorf("invalid Wireguard endpoint: %w", err)
}

wgDevice, err := wireguard.NewDevice(cfg.ServerAddress, cfg.PrivateKey, wireguardEndpoint.Port())
wgDevice, err := wireguard.NewDevice(siderolink.Cfg.ServerAddress, siderolink.Cfg.PrivateKey, wireguardEndpoint.Port())
if err != nil {
return fmt.Errorf("error initializing wgDevice: %w", err)
}
Expand All @@ -133,7 +131,7 @@ func run() error {
),
}

srv := siderolink.NewServer(&cfg, metalclient)
srv := siderolink.NewServer(&siderolink.Cfg, metalclient)

peers := siderolink.NewPeerState(kubeconfig, logger)

Expand All @@ -160,7 +158,7 @@ func run() error {
return nil
})

if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) && errors.Is(err, context.Canceled) {
if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) && !errors.Is(err, context.Canceled) {
return err
}

Expand Down
12 changes: 12 additions & 0 deletions app/sidero-controller-manager/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ spec:
volumeMounts:
- mountPath: /dev/net/tun
name: dev-tun
- command:
- /log-receiver
image: controller:latest
imagePullPolicy: Always
name: serverlogs
resources:
limits:
cpu: 256m
memory: 256Mi
requests:
cpu: 50m
memory: 128Mi
volumes:
- hostPath:
path: /dev/net/tun
Expand Down
8 changes: 8 additions & 0 deletions app/sidero-controller-manager/internal/ipxe/ipxe_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

infrav1 "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
metalv1alpha1 "github.com/talos-systems/sidero/app/sidero-controller-manager/api/v1alpha1"
"github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink"
"github.com/talos-systems/sidero/app/sidero-controller-manager/pkg/constants"
)

Expand Down Expand Up @@ -409,10 +410,12 @@ func appendTalosArguments(env *metalv1alpha1.Environment) {

talosConfigPrefix := talosconstants.KernelParamConfig + "="
sideroLinkPrefix := talosconstants.KernelParamSideroLink + "="
logDeliveryPrefix := talosconstants.KernelParamLoggingKernel + "="

for _, prefix := range []string{
talosConfigPrefix,
sideroLinkPrefix,
logDeliveryPrefix,
} {
for _, arg := range args {
if strings.HasPrefix(arg, prefix) {
Expand All @@ -432,6 +435,11 @@ func appendTalosArguments(env *metalv1alpha1.Environment) {
env.Spec.Kernel.Args = append(env.Spec.Kernel.Args,
fmt.Sprintf("%s=%s:%d", talosconstants.KernelParamSideroLink, apiEndpoint, apiPort),
)
case logDeliveryPrefix:
// patch environment with the log receiver endpoint
env.Spec.Kernel.Args = append(env.Spec.Kernel.Args,
fmt.Sprintf("%s=tcp://[%s]:%d", talosconstants.KernelParamLoggingKernel, siderolink.Cfg.ServerAddress.IP(), siderolink.LogReceiverPort),
)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions app/sidero-controller-manager/internal/siderolink/siderolink.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,13 @@ package siderolink
//
// Secret holds private Sidero Wireguard key and installation ID.
const SecretName = "siderolink"

// LogReceiverPort is the port of the log receiver container.
//
// LogReceiverPort is working only over Wireguard.
const LogReceiverPort = 4001

// Cfg is a default global instance of the SideroLink configuration.
//
// Cfg should be initialized first with `LoadOrCreate`.
var Cfg Config

0 comments on commit ab12b81

Please sign in to comment.