/
kubeprism_endpoints.go
88 lines (73 loc) · 2.87 KB
/
kubeprism_endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package k8s
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/controller/generic/transform"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
)
// KubePrismEndpointsController creates a list of API server endpoints.
type KubePrismEndpointsController = transform.Controller[*config.MachineConfig, *k8s.KubePrismEndpoints]
// NewKubePrismEndpointsController instanciates the controller.
//
//nolint:gocyclo
func NewKubePrismEndpointsController() *KubePrismEndpointsController {
return transform.NewController(
transform.Settings[*config.MachineConfig, *k8s.KubePrismEndpoints]{
Name: "k8s.KubePrismEndpointsController",
MapMetadataOptionalFunc: func(cfg *config.MachineConfig) optional.Optional[*k8s.KubePrismEndpoints] {
if cfg.Metadata().ID() != config.V1Alpha1ID {
return optional.None[*k8s.KubePrismEndpoints]()
}
if cfg.Config().Cluster() == nil || cfg.Config().Machine() == nil {
return optional.None[*k8s.KubePrismEndpoints]()
}
return optional.Some(k8s.NewKubePrismEndpoints(k8s.NamespaceName, k8s.KubePrismEndpointsID))
},
TransformFunc: func(ctx context.Context, r controller.Reader, logger *zap.Logger, machineConfig *config.MachineConfig, res *k8s.KubePrismEndpoints) error {
members, err := safe.ReaderListAll[*cluster.Member](ctx, r)
if err != nil {
return fmt.Errorf("error listing affiliates: %w", err)
}
var endpoints []k8s.KubePrismEndpoint
ce := machineConfig.Config().Cluster().Endpoint()
if ce != nil {
endpoints = append(endpoints, k8s.KubePrismEndpoint{
Host: ce.Hostname(),
Port: toPort(ce.Port()),
})
}
if machineConfig.Config().Machine().Type().IsControlPlane() {
endpoints = append(endpoints, k8s.KubePrismEndpoint{
Host: "localhost",
Port: uint32(machineConfig.Config().Cluster().LocalAPIServerPort()),
})
}
for it := members.Iterator(); it.Next(); {
memberSpec := it.Value().TypedSpec()
if len(memberSpec.Addresses) > 0 && memberSpec.ControlPlane != nil {
for _, addr := range memberSpec.Addresses {
endpoints = append(endpoints, k8s.KubePrismEndpoint{
Host: addr.String(),
Port: uint32(memberSpec.ControlPlane.APIServerPort),
})
}
}
}
res.TypedSpec().Endpoints = endpoints
return nil
},
},
transform.WithExtraInputs(
safe.Input[*cluster.Member](controller.InputWeak),
),
)
}