Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rev k8s to 1.29 #8805

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/pkg/client/informers_generated/externalversions/factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 26 additions & 76 deletions api/pkg/openapi/openapi_generated.go

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions apiserver/cmd/apiserver/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ func (o *CalicoServerOptions) Config() (*apiserver.Config, error) {
if err := o.RecommendedOptions.Audit.ApplyTo(&serverConfig.Config); err != nil {
return nil, err
}
if err := o.RecommendedOptions.Features.ApplyTo(&serverConfig.Config); err != nil {
return nil, err
}

if err := o.RecommendedOptions.CoreAPI.ApplyTo(serverConfig); err != nil {
return nil, err
Expand All @@ -171,11 +168,16 @@ func (o *CalicoServerOptions) Config() (*apiserver.Config, error) {
if err != nil {
return nil, err
}

dynamicClient, err := dynamic.NewForConfig(serverConfig.ClientConfig)
if err != nil {
return nil, err
}

if err := o.RecommendedOptions.Features.ApplyTo(&serverConfig.Config, kubeClient, serverConfig.SharedInformerFactory); err != nil {
return nil, err
}

if initializers, err := o.RecommendedOptions.ExtraAdmissionInitializers(serverConfig); err != nil {
return nil, err
} else if err := o.RecommendedOptions.Admission.ApplyTo(
Expand Down
1 change: 1 addition & 0 deletions apiserver/test/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func withConfigGetFreshApiserverServerAndClient(
t.Logf("Starting server on port: %d", securePort)
ro := genericoptions.NewRecommendedOptions(defaultEtcdPathPrefix, apiserver.Codecs.LegacyCodec(v3.SchemeGroupVersion))
ro.Etcd.StorageConfig.Transport.ServerList = serverConfig.etcdServerList
ro.Features.EnablePriorityAndFairness = false
options := &server.CalicoServerOptions{
RecommendedOptions: ro,
DisableAuth: true,
Expand Down
152 changes: 152 additions & 0 deletions felix/bpf/proxy/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) 2020-2021 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// NOTICE:
//
// Original code pulled from k8s.io@1.29.4, package: pkg/proxy/endpoints.go.
// Adapted for legacy use in this repository.

package proxy

import (
"net"
"strconv"

"k8s.io/apimachinery/pkg/util/sets"
k8sp "k8s.io/kubernetes/pkg/proxy"
)

// endpointInfo contains base information that defines an endpoint.
type endpointInfo struct {
// Cache this values to improve performance
ip string
port int
// endpoint is the same as net.JoinHostPort(ip,port)
endpoint string

// isLocal indicates whether the endpoint is running on same host as kube-proxy.
isLocal bool

// ready indicates whether this endpoint is ready and NOT terminating, unless
// PublishNotReadyAddresses is set on the service, in which case it will just
// always be true.
ready bool
// serving indicates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
serving bool
// terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
terminating bool

// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
}

var _ k8sp.Endpoint = &endpointInfo{}

type EndpoiontInfoOpt func(*endpointInfo)

// EndpointInfoOptIsLocal applies the given bool to the endpoint's isLocal field.
func EndpointInfoOptIsLocal(b bool) EndpoiontInfoOpt {
return func(ep *endpointInfo) {
ep.isLocal = b
}
}

// EndpointInfoOptIsReady applies the given bool to the endpoint's ready field.
func EndpointInfoOptIsReady(b bool) EndpoiontInfoOpt {
return func(ep *endpointInfo) {
ep.ready = b
}
}

// EndpointInfoOptIsServing applies the given bool to the endpoint's serving field.
func EndpointInfoOptIsServing(b bool) EndpoiontInfoOpt {
return func(ep *endpointInfo) {
ep.serving = b
}
}

// EndpointInfoOptIsTerminating applies the given bool to the endpoint's terminating field.
func EndpointInfoOptIsTerminating(b bool) EndpoiontInfoOpt {
return func(ep *endpointInfo) {
ep.terminating = b
}
}

// EndpointInfoOptZoneHints applies the given set to the endpoint's zoneHints field.
func EndpointInfoOptZoneHints(b sets.Set[string]) EndpoiontInfoOpt {
return func(ep *endpointInfo) {
ep.zoneHints = b
}
}

// NewEndpointInfo creates a new endpointInfo, returning it as a k8s proxy Endpoint.
func NewEndpointInfo(ip string, port int, opts ...EndpoiontInfoOpt) k8sp.Endpoint {
ep := &endpointInfo{
ip: ip,
port: port,
endpoint: net.JoinHostPort(ip, strconv.Itoa(port)),
}

for _, opt := range opts {
opt(ep)
}

return ep
}

// String is part of proxy.Endpoint interface.
func (info *endpointInfo) String() string {
return info.endpoint
}

// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *endpointInfo) IP() string {
return info.ip
}

// Port returns just the Port part of the endpoint.
func (info *endpointInfo) Port() int {
return info.port
}

// IsLocal is part of proxy.Endpoint interface.
func (info *endpointInfo) IsLocal() bool {
return info.isLocal
}

// IsReady returns true if an endpoint is ready and not terminating.
func (info *endpointInfo) IsReady() bool {
return info.ready
}

// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *endpointInfo) IsServing() bool {
return info.serving
}

// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *endpointInfo) IsTerminating() bool {
return info.terminating
}

// ZoneHints returns the zone hint for the endpoint.
func (info *endpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}
2 changes: 1 addition & 1 deletion felix/bpf/proxy/lb_src_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func testfn(makeIPs func(ips []string) proxy.K8sServicePortOption) {
),
},
EpsMap: k8sp.EndpointsMap{
svcKey: []k8sp.Endpoint{&k8sp.BaseEndpointInfo{Endpoint: "10.1.0.1:5555"}},
svcKey: []k8sp.Endpoint{proxy.NewEndpointInfo("10.1.0.1", 5555)},
},
}
makestep := func(step func()) func() {
Expand Down
38 changes: 23 additions & 15 deletions felix/bpf/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type proxy struct {
k8s kubernetes.Interface
ipFamily int

epsChanges *k8sp.EndpointChangeTracker
epsChanges *k8sp.EndpointsChangeTracker
svcChanges *k8sp.ServiceChangeTracker

svcMap k8sp.ServicePortMap
Expand All @@ -105,7 +105,7 @@ type proxy struct {
// event recorder to update node events
recorder events.EventRecorder
svcHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater
healthzServer *healthcheck.ProxierHealthServer

stopCh chan struct{}
stopWg sync.WaitGroup
Expand Down Expand Up @@ -154,20 +154,11 @@ func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option)
p.invokeDPSyncer, p.minDPSyncPeriod, time.Hour /* XXX might be infinite? */, 1)
dp.SetTriggerFn(p.runner.Run)

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: p.hostname,
UID: types.UID(p.hostname),
Namespace: "",
}
ipVersion := v1.IPv4Protocol
if p.ipFamily != 4 {
ipVersion = v1.IPv6Protocol
}
p.healthzServer = healthcheck.NewProxierHealthServer("0.0.0.0:10256", p.minDPSyncPeriod, p.recorder, nodeRef)
ipVersion := p.v1IPFamily()
p.healthzServer = healthcheck.NewProxierHealthServer("0.0.0.0:10256", p.minDPSyncPeriod)
p.svcHealthServer = healthcheck.NewServiceHealthServer(p.hostname, p.recorder, util.NewNodePortAddresses(ipVersion, []string{"0.0.0.0/0"}), p.healthzServer)

p.epsChanges = k8sp.NewEndpointChangeTracker(p.hostname,
p.epsChanges = k8sp.NewEndpointsChangeTracker(p.hostname,
nil, // change if you want to provide more ctx
ipVersion,
p.recorder,
Expand Down Expand Up @@ -213,6 +204,15 @@ func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option)
return p, nil
}

func (p *proxy) v1IPFamily() v1.IPFamily {
pr := v1.IPv4Protocol
if p.ipFamily != 4 {
pr = v1.IPv6Protocol
}

return pr
}

func (p *proxy) setIpFamily(ipFamily int) {
p.ipFamily = ipFamily
}
Expand Down Expand Up @@ -279,15 +279,17 @@ func (p *proxy) invokeDPSyncer() {
}

if p.healthzServer != nil {
p.healthzServer.Updated()
p.healthzServer.Updated(p.v1IPFamily())
}
}

func (p *proxy) OnServiceAdd(svc *v1.Service) {
log.WithField("svc", svc).Debug("Add service")
p.OnServiceUpdate(nil, svc)
}

func (p *proxy) OnServiceUpdate(old, curr *v1.Service) {
log.WithFields(log.Fields{"old": old, "curr": curr}).Debug("Update service")
if p.svcChanges.Update(old, curr) && p.isInitialized() {
p.syncDP()
}
Expand All @@ -298,29 +300,35 @@ func (p *proxy) OnServiceDelete(svc *v1.Service) {
}

func (p *proxy) OnServiceSynced() {
log.Debug("Services cache synced. Forcing dataplane sync...")
p.setSvcsSynced()
p.forceSyncDP()
}

func (p *proxy) OnEndpointSliceAdd(eps *discovery.EndpointSlice) {
log.WithField("eps", eps).Debug("EndpointSlice add")
if p.epsChanges.EndpointSliceUpdate(eps, false) && p.isInitialized() {
p.syncDP()
}
}

func (p *proxy) OnEndpointSliceUpdate(_, eps *discovery.EndpointSlice) {
log.WithField("eps", eps).Debug("EndpointSlice update")
if p.epsChanges.EndpointSliceUpdate(eps, false) && p.isInitialized() {
p.syncDP()
}
}

func (p *proxy) OnEndpointSliceDelete(eps *discovery.EndpointSlice) {
log.WithField("eps", eps).Debug("EndpointSlice delete")
if p.epsChanges.EndpointSliceUpdate(eps, true) && p.isInitialized() {
p.syncDP()
}
}

func (p *proxy) OnEndpointSlicesSynced() {
log.Debug("EndpointSlices synced")

p.setEpsSynced()
p.forceSyncDP()
}
Expand Down
4 changes: 2 additions & 2 deletions felix/bpf/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ var _ = Describe("BPF Proxy", func() {
}]

Expect(len(ep)).To(Equal(2))
Expect(ep[0].GetIsLocal).NotTo(Equal(ep[1].GetIsLocal))
Expect(ep[0].IsLocal()).NotTo(Equal(ep[1].IsLocal()))
}
})
})
Expand Down Expand Up @@ -506,7 +506,7 @@ var _ = Describe("BPF Proxy", func() {
Expect(s.SvcMap).To(HaveLen(1))
for k := range s.SvcMap {
for _, ep := range s.EpsMap[k] {
Expect(ep.GetIsLocal()).To(Equal(ep.String() == "10.1.2.1:1234"))
Expect(ep.IsLocal()).To(Equal(ep.String() == "10.1.2.1:1234"))
}
}
})
Expand Down