Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/kube-apiserver: turn core (legacy) rest storage into standard RESTStorageProvider #119042

Merged
merged 4 commits into from Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 34 additions & 111 deletions pkg/controlplane/controller/kubernetesservice/controller.go
Expand Up @@ -28,21 +28,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
"k8s.io/kubernetes/pkg/util/async"
)

const (
Expand All @@ -54,27 +46,14 @@ const (
// provide the IP repair check on service IPs
type Controller struct {
Config
RangeRegistries

runner *async.Runner
}
client kubernetes.Interface

type RangeRegistries struct {
ServiceClusterIPRegistry rangeallocation.RangeRegistry
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceNodePortRegistry rangeallocation.RangeRegistry
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
}

type Config struct {
Client kubernetes.Interface
Informers informers.SharedInformerFactory

KubernetesService
ClusterIP
NodePort
}

type KubernetesService struct {
PublicIP net.IP

EndpointReconciler reconcilers.EndpointReconciler
Expand All @@ -87,32 +66,18 @@ type KubernetesService struct {
KubernetesServiceNodePort int
}

type ClusterIP struct {
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRange net.IPNet
ServiceClusterIPInterval time.Duration
}

type NodePort struct {
ServiceNodePortInterval time.Duration
ServiceNodePortRange utilnet.PortRange
}

// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) {
func New(config Config, client kubernetes.Interface) *Controller {
return &Controller{
Config: config,
RangeRegistries: rangeRegistries,
}, nil
Config: config,
client: client,
stopCh: make(chan struct{}),
}
}

// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
if c.runner != nil {
return
}

func (c *Controller) Start(stopCh <-chan struct{}) {
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
Expand All @@ -121,72 +86,30 @@ func (c *Controller) Start() {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
}

repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)

// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(1)

runRepairNodePorts := func(stopCh chan struct{}) {
repairNodePorts.RunUntil(wg.Done, stopCh)
}

wg.Add(1)
var runRepairClusterIPs func(stopCh chan struct{})
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
c.Client.CoreV1(),
c.Client.EventsV1(),
&c.ServiceClusterIPRange,
c.ServiceClusterIPRegistry,
&c.SecondaryServiceClusterIPRange,
c.SecondaryServiceClusterIPRegistry)
runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
}
} else {
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
c.Client,
&c.ServiceClusterIPRange,
&c.SecondaryServiceClusterIPRange,
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().IPAddresses(),
)
runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
}
}
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
c.runner.Start()

// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
localStopCh := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
defer close(localStopCh)
select {
case <-stopCh: // from Start
case <-c.stopCh: // from Stop
}
}()
select {
case <-done:
case <-time.After(time.Minute):
klog.Fatalf("Unable to perform initial IP and Port allocation check")
}

go c.Run(localStopCh)
}

// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
func (c *Controller) Stop() {
if c.runner != nil {
c.runner.Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing is that the Stop() here need to be synchronized with the stopCh of the Start() method, or this can race creating and deleting the Service and its endpoints, thing is Stop() has to run only after Run() has stopped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this data-race on c.runner? Looks all very fragile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked the runner semantics: it does not wait for the running process to terminate. It only closes the channel. So we need something equivalent. But it also means that the old code had a race here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readded stop logic. I don't think we need it in our setup with hooks. But for the semantics of the controller alone it makes sense.

c.lock.Lock()
defer c.lock.Unlock()

select {
case <-c.stopCh:
return // only close once
default:
close(c.stopCh)
}

endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
finishedReconciling := make(chan struct{})
go func() {
Expand All @@ -208,12 +131,12 @@ func (c *Controller) Stop() {
}
}

// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// Run periodically updates the kubernetes service
func (c *Controller) Run(ch <-chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)

Expand All @@ -233,8 +156,8 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
Expand Down Expand Up @@ -286,12 +209,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
// The service already exists.
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
return err
}
}
Expand All @@ -315,7 +238,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
},
}

_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
}
Expand Down
Expand Up @@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.Client = fakeClient
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
Expand Down