Skip to content

Commit

Permalink
feat: implement LinkStatusController
Browse files Browse the repository at this point in the history
This is the first PR of many which implement new COSI network
configuration. This controller provides low-level status of the network
interfaces (links) not touching on the addresses of the interface.

The information gathered resembles output of `ip link show` command.

Examples:

```
$ talosctl -n 172.20.0.2 get links
NODE         NAMESPACE   TYPE         ID             VERSION   TYPE       KIND     HW ADDR                                           OPER STATE   LINK STATE
172.20.0.2   net         LinkStatus   bond0          1         ether      bond     fe:c4:d6:4c:04:05                                 down         false
172.20.0.2   net         LinkStatus   cni0           5         ether      bridge   22:cc:25:7e:64:19                                 up           true
172.20.0.2   net         LinkStatus   dummy0         1         ether      dummy    0e:f6:f3:ef:53:29                                 down         false
172.20.0.2   net         LinkStatus   eth0           4         ether               ae:1b:9c:19:6b:47                                 up           true
172.20.0.2   net         LinkStatus   flannel.1      2         ether      vxlan    be:c5:4f:eb:da:5c                                 unknown      true
172.20.0.2   net         LinkStatus   ip6tnl0        1         tunnel6    ip6tnl   00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00   down         false
172.20.0.2   net         LinkStatus   lo             4         loopback            00:00:00:00:00:00                                 unknown      true
172.20.0.2   net         LinkStatus   sit0           1         sit        sit      00:00:00:00                                       down         false
172.20.0.2   net         LinkStatus   teql0          1         void                                                                  down         false
172.20.0.2   net         LinkStatus   tunl0          1         ipip       ipip     00:00:00:00                                       down         false
172.20.0.2   net         LinkStatus   veth1c1422df   2         ether      veth     6a:2d:68:be:8e:8f                                 up           true
172.20.0.2   net         LinkStatus   veth2ce7ce8d   1         ether      veth     52:fc:98:82:f7:29                                 up           true
```

```
$ talosctl -n 172.20.0.2 get links eth0 -o yaml
node: 172.20.0.2
metadata:
    namespace: net
    type: LinkStatuses.net.talos.dev
    id: eth0
    version: 4
    owner: network.LinkStatusController
    phase: running
spec:
    index: 4
    type: ether
    linkIndex: 0
    flags: UP,BROADCAST,RUNNING,MULTICAST,LOWER_UP
    hardwareAddr: ae:1b:9c:19:6b:47
    broadcastAddr: ff:ff:ff:ff:ff:ff
    mtu: 1500
    queueDisc: pfifo_fast
    operationalState: up
    kind: ""
    slaveKind: ""
    linkState: true
    speedMbit: 4294967295
    port: Other
    duplex: Unknown
```

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed May 7, 2021
1 parent 0e8de04 commit 3c12135
Show file tree
Hide file tree
Showing 21 changed files with 1,164 additions and 5 deletions.
1 change: 1 addition & 0 deletions .conform.yaml
Expand Up @@ -40,6 +40,7 @@ policies:
- .go
excludeSuffixes:
- .pb.go
- _string.go
header: |
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
Expand Down
12 changes: 7 additions & 5 deletions Dockerfile
Expand Up @@ -163,10 +163,10 @@ RUN gofumports -w -local github.com/talos-systems/talos /api/

# run docgen for machinery config
FROM build-go AS go-generate
COPY ./pkg/machinery /pkg/machinery
WORKDIR /pkg/machinery
RUN --mount=type=cache,target=/.cache go generate /pkg/machinery/config/types/v1alpha1/...
WORKDIR /
COPY ./pkg/machinery ./pkg/machinery
COPY ./pkg/resources/network ./pkg/resources/network
RUN --mount=type=cache,target=/.cache go generate ./pkg/machinery/config/types/v1alpha1/...
RUN --mount=type=cache,target=/.cache go generate ./pkg/resources/network/...

FROM --platform=${BUILDPLATFORM} scratch AS generate
COPY --from=generate-build /api/common/*.pb.go /pkg/machinery/api/common/
Expand All @@ -179,7 +179,8 @@ COPY --from=generate-build /api/cluster/*.pb.go /pkg/machinery/api/cluster/
COPY --from=generate-build /api/storage/*.pb.go /pkg/machinery/api/storage/
COPY --from=generate-build /api/resource/*.pb.go /pkg/machinery/api/resource/
COPY --from=generate-build /api/inspect/*.pb.go /pkg/machinery/api/inspect/
COPY --from=go-generate /pkg/machinery/config/types/v1alpha1/ /pkg/machinery/config/types/v1alpha1/
COPY --from=go-generate /src/pkg/machinery/config/types/v1alpha1/ /pkg/machinery/config/types/v1alpha1/
COPY --from=go-generate /src/pkg/resources/network/ /pkg/resources/network/

# The base target provides a container that can be used to build all Talos
# assets.
Expand All @@ -190,6 +191,7 @@ COPY ./pkg ./pkg
COPY ./internal ./internal
COPY --from=generate /pkg/machinery/api ./pkg/machinery/api
COPY --from=generate /pkg/machinery/config ./pkg/machinery/config
COPY --from=generate /pkg/resources/network ./pkg/resources/network
RUN --mount=type=cache,target=/.cache go list -mod=readonly all >/dev/null
RUN --mount=type=cache,target=/.cache ! go mod tidy -v 2>&1 | grep .
WORKDIR /src/pkg/machinery
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -56,6 +56,7 @@ require (
github.com/insomniacslk/dhcp v0.0.0-20210120172423-cc9239ac6294
github.com/jsimonetti/rtnetlink v0.0.0-20210226120601-1b79e63a70a0
github.com/mattn/go-isatty v0.0.12
github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43
github.com/mdlayher/genetlink v1.0.0
github.com/mdlayher/netlink v1.4.0
github.com/morikuni/aec v1.0.0 // indirect
Expand Down
285 changes: 285 additions & 0 deletions internal/app/machined/pkg/controllers/network/link_status.go
@@ -0,0 +1,285 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package network

import (
"context"
"errors"
"fmt"
"log"
"os"
"sync"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/jsimonetti/rtnetlink"
"github.com/mdlayher/ethtool"
"github.com/mdlayher/genetlink"
"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"

"github.com/talos-systems/talos/pkg/resources/network"
"github.com/talos-systems/talos/pkg/resources/network/nethelpers"
)

// LinkStatusController manages secrets.Etcd based on configuration.
type LinkStatusController struct{}

// Name implements controller.Controller interface.
func (ctrl *LinkStatusController) Name() string {
return "network.LinkStatusController"
}

// Inputs implements controller.Controller interface.
func (ctrl *LinkStatusController) Inputs() []controller.Input {
return nil
}

// Outputs implements controller.Controller interface.
func (ctrl *LinkStatusController) Outputs() []controller.Output {
return []controller.Output{
{
Type: network.LinkStatusType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error {
// create watch connections to rtnetlink and ethtool via genetlink
// these connections are used only to join multicast groups and receive notifications on changes
// other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses
watchRtnetlink, err := rtnetlink.Dial(&netlink.Config{
Groups: unix.RTMGRP_LINK,
})
if err != nil {
return fmt.Errorf("error dialing watch socket: %w", err)
}

watchEthool, err := genetlink.Dial(nil)
if err != nil {
return fmt.Errorf("error dialing ethtool watch socket: %w", err)
}

ethFamily, err := watchEthool.GetFamily(unix.ETHTOOL_GENL_NAME)
if err != nil {
return fmt.Errorf("error getting family information for ethtool: %w", err)
}

var monitorID uint32

for _, g := range ethFamily.Groups {
if g.Name == unix.ETHTOOL_MCGRP_MONITOR_NAME {
monitorID = g.ID

break
}
}

if monitorID == 0 {
return fmt.Errorf("could not find monitor multicast group ID for ethtool")
}

if err = watchEthool.JoinGroup(monitorID); err != nil {
return fmt.Errorf("error joing multicast group for ethtool: %w", err)
}

watchCh := make(chan struct{})

var wg sync.WaitGroup

wg.Add(2)

go func() {
defer wg.Done()

for {
_, _, watchErr := watchRtnetlink.Receive()
if watchErr != nil {
return
}

select {
case watchCh <- struct{}{}:
case <-ctx.Done():
return
}
}
}()

go func() {
defer wg.Done()

for {
_, _, watchErr := watchEthool.Receive()
if watchErr != nil {
return
}

select {
case watchCh <- struct{}{}:
case <-ctx.Done():
return
}
}
}()

defer wg.Wait()

// close the watch connections first to abort the goroutines above on early exit
defer watchRtnetlink.Close() //nolint:errcheck
defer watchEthool.Close() //nolint:errcheck

conn, err := rtnetlink.Dial(nil)
if err != nil {
return fmt.Errorf("error dialing rtnetlink socket: %w", err)
}

defer conn.Close() //nolint:errcheck

ethClient, err := ethtool.New()
if err != nil {
return fmt.Errorf("error dialing ethtool socket: %w", err)
}

defer ethClient.Close() //nolint:errcheck

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-watchCh:
}

if err = ctrl.reconcile(ctx, r, conn, ethClient); err != nil {
return err
}
}
}

// reconcile function runs for every reconciliation loop querying the netlink state and updating resources.
//
//nolint:gocyclo,cyclop
func (ctrl *LinkStatusController) reconcile(ctx context.Context, r controller.Runtime, conn *rtnetlink.Conn, ethClient *ethtool.Client) error {
// list the existing LinkStatus resources and mark them all to be deleted, as the actual link is discovered via netlink, resource ID is removed from the list
list, err := r.List(ctx, resource.NewMetadata(network.NamespaceName, network.LinkStatusType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

itemsToDelete := map[resource.ID]struct{}{}

for _, r := range list.Items {
itemsToDelete[r.Metadata().ID()] = struct{}{}
}

links, err := conn.Link.List()
if err != nil {
return fmt.Errorf("error listing links: %w", err)
}

// for every rtnetlink discovered link
for _, link := range links {
link := link

var (
ethState *ethtool.LinkState
ethInfo *ethtool.LinkInfo
ethMode *ethtool.LinkMode
)

// query additional information via ethtool (if supported)
ethState, err = ethClient.LinkState(ethtool.Interface{
Index: int(link.Index),
})
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("error querying ethtool link state for %q: %w", link.Attributes.Name, err)
}

// skip if previous call failed (e.g. not supported)
if err == nil {
ethInfo, err = ethClient.LinkInfo(ethtool.Interface{
Index: int(link.Index),
})
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("error querying ethtool link info for %q: %w", link.Attributes.Name, err)
}
}

// skip if previous call failed (e.g. not supported)
if err == nil {
ethMode, err = ethClient.LinkMode(ethtool.Interface{
Index: int(link.Index),
})
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("error querying ethtool link mode for %q: %w", link.Attributes.Name, err)
}
}

if err = r.Modify(ctx, network.NewLinkStatus(network.NamespaceName, link.Attributes.Name), func(r resource.Resource) error {
status := r.(*network.LinkStatus).Status()

status.Index = link.Index
status.HardwareAddr = nethelpers.HardwareAddr(link.Attributes.Address)
status.BroadcastAddr = nethelpers.HardwareAddr(link.Attributes.Broadcast)
status.LinkIndex = link.Attributes.Type
status.Flags = nethelpers.LinkFlags(link.Flags)
status.Type = nethelpers.LinkType(link.Type)
status.QueueDisc = link.Attributes.QueueDisc
status.MTU = link.Attributes.MTU
if link.Attributes.Master != nil {
status.MasterIndex = *link.Attributes.Master
} else {
status.MasterIndex = 0
}
status.OperationalState = nethelpers.OperationalState(link.Attributes.OperationalState)
if link.Attributes.Info != nil {
status.Kind = link.Attributes.Info.Kind
status.SlaveKind = link.Attributes.Info.SlaveKind
} else {
status.Kind = ""
status.SlaveKind = ""
}

if ethState != nil {
status.LinkState = ethState.Link
} else {
status.LinkState = false
}

if ethInfo != nil {
status.Port = nethelpers.Port(ethInfo.Port)
} else {
status.Port = nethelpers.Port(ethtool.Other)
}

if ethMode != nil {
status.SpeedMegabits = ethMode.SpeedMegabits
status.Duplex = nethelpers.Duplex(ethMode.Duplex)
} else {
status.SpeedMegabits = 0
status.Duplex = nethelpers.Duplex(ethtool.Unknown)
}

return nil
}); err != nil {
return fmt.Errorf("error modifying resource: %w", err)
}

delete(itemsToDelete, link.Attributes.Name)
}

for id := range itemsToDelete {
if err = r.Destroy(ctx, resource.NewMetadata(network.NamespaceName, network.LinkStatusType, id, resource.VersionUndefined)); err != nil {
return fmt.Errorf("error deleting link status %q: %w", id, err)
}
}

return nil
}

0 comments on commit 3c12135

Please sign in to comment.