Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ambient: service entry initial impl #45621

Merged
merged 27 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8dee7e0
ambient: service entry initial impl
Jan 21, 2023
74cb6ed
update more tests to use common functions
GregHanson Jun 23, 2023
11781d2
add release note
GregHanson Jun 23, 2023
946171f
Prefer updates.Insert()
Jun 30, 2023
2473dc2
Move lock lower so there's less lock contention
Jun 30, 2023
7055f90
Ambient SE handler should work regardless of EnableK8SServiceSelectWo…
Jun 30, 2023
e6672a0
Merge branch 'master' into ambient-service-entries
Jul 3, 2023
a9f01a4
Update test after local testing for auto assign vips
Jul 5, 2023
527f88a
Stronger assertion for load balancing
Jul 5, 2023
4d5d92b
Merge branch 'master' into ambient-service-entries
Jul 5, 2023
929addf
Fix unit tests for ndots in test app
Jul 5, 2023
0eb683e
Remove dead code
Jul 5, 2023
5a0ee02
DRY SE ports construction
Jul 5, 2023
434a15a
DRY some internal network address conversion code
Jul 5, 2023
3f65da2
Stop testing with load balancing; tests run super slowly with that
Jul 5, 2023
5c095d9
Merge branch 'master' into ambient-service-entries
Jul 5, 2023
ce14466
Merge branch 'master' into ambient-service-entries
Jul 6, 2023
eb05c10
Ensure service entries cannot select across namespaces
Jul 6, 2023
bef3d72
Remove customizations for skipped test
Jul 6, 2023
eaac012
Optimize pod xds events if not selected by service entry
Jul 6, 2023
02882ed
Optimize workload entry xds events if not selected by service entry
Jul 6, 2023
707f66f
Move logic for HandleServiceEntry into AmbientIndex interface
Jul 6, 2023
13d1862
Don't pass around entire ambient index
Jul 6, 2023
b632775
Prefer ValidatedSetSelector for performance
Jul 6, 2023
04b7435
Rename function to be clearer
Jul 6, 2023
99b26b2
Don't pass around ambient index child
Jul 6, 2023
37e605d
Remove auto vip allocation
Jul 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 52 additions & 11 deletions pilot/pkg/serviceregistry/kube/controller/ambientindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/protobuf/proto"
v1 "k8s.io/api/core/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"istio.io/api/networking/v1alpha3"
Expand Down Expand Up @@ -73,6 +74,11 @@ type AmbientIndexImpl struct {

// Map of Scope -> address
waypoints map[model.WaypointScope]*workloadapi.GatewayAddress

// map of service entry name/namespace to the service entry.
// used on pod updates to add VIPs to pods from service entries.
// also used on service entry updates to cleanup any old VIPs from pods/workloads maps.
servicesMap map[types.NamespacedName]*apiv1alpha3.ServiceEntry
}

func workloadToAddressInfo(w *workloadapi.Workload) *model.AddressInfo {
Expand Down Expand Up @@ -340,7 +346,7 @@ func (a *AmbientIndexImpl) extractWorkload(p *v1.Pod, c *Controller) *model.Work

policies := c.selectorAuthorizationPolicies(p.Namespace, p.Labels)
policies = append(policies, c.convertedSelectorPeerAuthentications(p.Namespace, p.Labels)...)
wl := c.constructWorkload(p, waypoint, policies)
wl := a.constructWorkload(p, waypoint, policies, c)
if wl == nil {
return nil
}
Expand Down Expand Up @@ -433,6 +439,29 @@ func (c *Controller) setupIndex() *AmbientIndexImpl {
}
})

// Handle ServiceEntries.
c.configController.RegisterEventHandler(gvk.ServiceEntry, func(_ config.Config, newCfg config.Config, ev model.Event) {
newSvcEntrySpec := serviceentry.ConvertServiceEntry(newCfg)
var newSvcEntry *apiv1alpha3.ServiceEntry
if newSvcEntrySpec != nil {
newSvcEntry = &apiv1alpha3.ServiceEntry{
ObjectMeta: newCfg.ToObjectMeta(),
Spec: *newSvcEntrySpec.DeepCopy(),
}
}

idx.mu.Lock()
defer idx.mu.Unlock()
updates := idx.handleServiceEntry(newSvcEntry, ev, c)
if len(updates) > 0 {
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: false,
ConfigsUpdated: updates,
Reason: []model.TriggerReason{model.AmbientUpdate},
})
}
})

serviceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
idx.mu.Lock()
Expand Down Expand Up @@ -477,6 +506,8 @@ func (c *Controller) setupIndex() *AmbientIndexImpl {
}
},
}

idx.servicesMap = make(map[types.NamespacedName]*apiv1alpha3.ServiceEntry)
c.services.AddEventHandler(serviceHandler)
return &idx
}
Expand Down Expand Up @@ -558,6 +589,19 @@ func networkAddressFromWorkload(wl *model.WorkloadInfo) []networkAddress {
return networkAddrs
}

func toInternalNetworkAddresses(nwAddrs []*workloadapi.NetworkAddress) []networkAddress {
networkAddrs := make([]networkAddress, 0, len(nwAddrs))
for _, addr := range nwAddrs {
if ip, ok := netip.AddrFromSlice(addr.Address); ok {
networkAddrs = append(networkAddrs, networkAddress{
ip: ip.String(),
network: addr.Network,
})
}
}
return networkAddrs
}

// NOTE: Mutex is locked prior to being called.
func (a *AmbientIndexImpl) handleService(obj any, isDelete bool, c *Controller) sets.Set[model.ConfigKey] {
svc := controllers.Extract[*v1.Service](obj)
Expand Down Expand Up @@ -604,15 +648,7 @@ func (a *AmbientIndexImpl) handleService(obj any, isDelete bool, c *Controller)
}

si := c.constructService(svc)
networkAddrs := make([]networkAddress, 0, len(si.Addresses))
for _, addr := range si.Addresses {
if vip, ok := netip.AddrFromSlice(addr.Address); ok {
networkAddrs = append(networkAddrs, networkAddress{
ip: vip.String(),
network: addr.Network,
})
}
}
networkAddrs := toInternalNetworkAddresses(si.GetAddresses())
pods := c.getPodsInService(svc)
wls := make(map[string]*model.WorkloadInfo, len(pods))
for _, p := range pods {
Expand Down Expand Up @@ -706,7 +742,9 @@ func (c *Controller) AddressInformation(addresses sets.String) ([]*model.Address
return wls, removed
}

func (c *Controller) constructWorkload(pod *v1.Pod, waypoint *workloadapi.GatewayAddress, policies []string) *workloadapi.Workload {
func (a *AmbientIndexImpl) constructWorkload(pod *v1.Pod, waypoint *workloadapi.GatewayAddress, policies []string,
c *Controller,
) *workloadapi.Workload {
workloadServices := map[string]*workloadapi.PortList{}
allServices := c.services.List(pod.Namespace, klabels.Everything())
if services := getPodServices(allServices, pod); len(services) > 0 {
Expand Down Expand Up @@ -736,6 +774,9 @@ func (c *Controller) constructWorkload(pod *v1.Pod, waypoint *workloadapi.Gatewa
for _, podIP := range pod.Status.PodIPs {
addresses = append(addresses, parseIP(podIP.IP))
}
for nsName, ports := range a.getWorkloadServicesFromServiceEntries(nil, pod.GetNamespace(), pod.Labels) {
workloadServices[nsName] = ports
}

wl := &workloadapi.Workload{
Uid: c.generatePodUID(pod),
Expand Down