Skip to content

Fix authz during shutdown #1395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bpf/kmesh/workload/include/frontend.h
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ static inline int frontend_manager(struct kmesh_context *kmesh_ctx, frontend_val
direct_backend = true;
}

if (direct_backend) {
if (direct_backend) { // in this case, we donot check the healthy status of the backend, just let it go
Copy link
Preview

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider correcting 'donot' to 'do not' for improved clarity.

Suggested change
if (direct_backend) { // in this case, we donot check the healthy status of the backend, just let it go
if (direct_backend) { // in this case, we do not check the healthy status of the backend, just let it go

Copilot uses AI. Check for mistakes.

// For pod direct access, if a pod has waypoint captured, we will redirect to waypoint, otherwise we do nothing.
if (backend_v->waypoint_port != 0) {
BPF_LOG(
2 changes: 1 addition & 1 deletion bpf/kmesh/workload/xdp.c
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ static inline wl_policies_v *get_workload_policies(struct xdp_info *info, struct
}
frontend_v = kmesh_map_lookup_elem(&map_of_frontend, &frontend_k);
if (!frontend_v) {
BPF_LOG(INFO, XDP, "failed to get frontend in xdp");
BPF_LOG(DEBUG, XDP, "failed to get frontend in xdp");
return AUTH_ALLOW;
}
workload_uid = frontend_v->upstream_id;
24 changes: 21 additions & 3 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
@@ -201,6 +201,24 @@
return p.removeWorkloadFromBpfMap(wl)
}

// handleUnhealthyWorkload is used to handle unhealthy workload, we only leave it in the frontend and backend map.
// Because we rely on the backend map to be aware of direct access in `frontend_manager`,
// and xdp rely on frontend map to get the authz policy and do authz.
func (p *Processor) handleUnhealthyWorkload(workload *workloadapi.Workload) error {
backendUid := p.hashName.Hash(workload.Uid)

// 1. find all endpoint keys related to this workload and delete them
if eks := p.bpf.GetEndpointKeys(backendUid); len(eks) > 0 {
if err := p.deleteEndpointRecords(eks.UnsortedList()); err != nil {
return fmt.Errorf("handleUnhealthyWorkload: deleteEndpointRecords for %s failed: %v", workload.Uid, err)
}

Check warning on line 214 in pkg/controller/workload/workload_processor.go

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L213-L214

Added lines #L213 - L214 were not covered by tests
}

// TODO(hzxuzhonghu), maybe need to update backend map to update the workload status

return nil
}

func (p *Processor) removeWorkloadFromBpfMap(workload *workloadapi.Workload) error {
var (
err error
@@ -486,11 +504,11 @@
p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality())
}

// Exclude unhealthy workload, which is not ready to serve traffic
// Exclude unhealthy workload, which is not ready to serve traffic, but keep it in the frontend
// backend map for authz
if workload.Status == workloadapi.WorkloadStatus_UNHEALTHY {
log.Debugf("workload %s is unhealthy", workload.ResourceName())
// If the workload is updated to unhealthy, we should remove it from the bpf map
return p.removeWorkloadFromBpfMap(workload)
return p.handleUnhealthyWorkload(workload)
}

// 1. update workload in backend map
18 changes: 16 additions & 2 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
@@ -160,10 +160,12 @@ func Test_handleWorkload(t *testing.T) {
assert.Equal(t, got.Status, workloadapi.WorkloadStatus_UNHEALTHY)
checkNotExistInFrontEndMap(t, workload3.Addresses[0], p)

// 8. update workload from healthy to unhealthy, should remove it from bpf map
// 8. update workload from healthy to unhealthy, should remove it from bpf endpoint map
workload2.Status = workloadapi.WorkloadStatus_UNHEALTHY
_ = p.handleWorkload(workload2)
checkNotExistInFrontEndMap(t, workload2.Addresses[0], p)
checkNotExistInEndpointMap(t, p, fakeSvc, []uint32{workload2ID})
checkFrontEndMap(t, workload2.Addresses[0], p)
checkBackendMap(t, p, workload2ID, workload2)

// 9. delete service
p.handleRemovedAddresses([]string{fakeSvc.ResourceName()})
@@ -276,6 +278,18 @@ func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workload
assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(fakeSvc.Waypoint.GetHboneMtlsPort()))
}

func checkNotExistInEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) {
endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName()))
t.Logf("endpoints number: %v", len(endpoints))

all := sets.New[uint32](backendUid...)
for _, endpoint := range endpoints {
if all.Contains(endpoint.BackendUid) {
t.Fatalf("endpoint %v still exists", endpoint.BackendUid)
}
}
}

func checkEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) {
endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName()))
assert.Equal(t, len(endpoints), len(backendUid))
Loading
Oops, something went wrong.