diff --git a/bpf/kmesh/workload/include/frontend.h b/bpf/kmesh/workload/include/frontend.h index 8bb9a21f5..a16492983 100644 --- a/bpf/kmesh/workload/include/frontend.h +++ b/bpf/kmesh/workload/include/frontend.h @@ -34,7 +34,7 @@ static inline int frontend_manager(struct kmesh_context *kmesh_ctx, frontend_val direct_backend = true; } - if (direct_backend) { + if (direct_backend) { // in this case, we donot check the healthy status of the backend, just let it go // For pod direct access, if a pod has waypoint captured, we will redirect to waypoint, otherwise we do nothing. if (backend_v->waypoint_port != 0) { BPF_LOG( diff --git a/bpf/kmesh/workload/xdp.c b/bpf/kmesh/workload/xdp.c index ad6d61fe8..c8e9857dd 100644 --- a/bpf/kmesh/workload/xdp.c +++ b/bpf/kmesh/workload/xdp.c @@ -70,7 +70,7 @@ static inline wl_policies_v *get_workload_policies(struct xdp_info *info, struct } frontend_v = kmesh_map_lookup_elem(&map_of_frontend, &frontend_k); if (!frontend_v) { - BPF_LOG(INFO, XDP, "failed to get frontend in xdp"); + BPF_LOG(DEBUG, XDP, "failed to get frontend in xdp"); return AUTH_ALLOW; } workload_uid = frontend_v->upstream_id; diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 7fdb587e6..c87bece99 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -201,6 +201,24 @@ func (p *Processor) removeWorkload(uid string) error { return p.removeWorkloadFromBpfMap(wl) } +// handleUnhealthyWorkload is used to handle unhealthy workload, we only leave it in the frontend and backend map. +// Because we rely on the backend map to be aware of direct access in `frontend_manager`, +// and xdp rely on frontend map to get the authz policy and do authz. +func (p *Processor) handleUnhealthyWorkload(workload *workloadapi.Workload) error { + backendUid := p.hashName.Hash(workload.Uid) + + // 1. find all endpoint keys related to this workload and delete them + if eks := p.bpf.GetEndpointKeys(backendUid); len(eks) > 0 { + if err := p.deleteEndpointRecords(eks.UnsortedList()); err != nil { + return fmt.Errorf("handleUnhealthyWorkload: deleteEndpointRecords for %s failed: %v", workload.Uid, err) + } + } + + // TODO(hzxuzhonghu), maybe need to update backend map to update the workload status + + return nil +} + func (p *Processor) removeWorkloadFromBpfMap(workload *workloadapi.Workload) error { var ( err error @@ -486,11 +504,11 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { p.locality.SetLocality(p.nodeName, workload.GetClusterId(), workload.GetNetwork(), workload.GetLocality()) } - // Exclude unhealthy workload, which is not ready to serve traffic + // Exclude unhealthy workload, which is not ready to serve traffic, but keep it in the frontend + // backend map for authz if workload.Status == workloadapi.WorkloadStatus_UNHEALTHY { log.Debugf("workload %s is unhealthy", workload.ResourceName()) - // If the workload is updated to unhealthy, we should remove it from the bpf map - return p.removeWorkloadFromBpfMap(workload) + return p.handleUnhealthyWorkload(workload) } // 1. update workload in backend map diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index cebcff46e..b55026f1c 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -160,10 +160,12 @@ func Test_handleWorkload(t *testing.T) { assert.Equal(t, got.Status, workloadapi.WorkloadStatus_UNHEALTHY) checkNotExistInFrontEndMap(t, workload3.Addresses[0], p) - // 8. update workload from healthy to unhealthy, should remove it from bpf map + // 8. update workload from healthy to unhealthy, should remove it from bpf endpoint map workload2.Status = workloadapi.WorkloadStatus_UNHEALTHY _ = p.handleWorkload(workload2) - checkNotExistInFrontEndMap(t, workload2.Addresses[0], p) + checkNotExistInEndpointMap(t, p, fakeSvc, []uint32{workload2ID}) + checkFrontEndMap(t, workload2.Addresses[0], p) + checkBackendMap(t, p, workload2ID, workload2) // 9. delete service p.handleRemovedAddresses([]string{fakeSvc.ResourceName()}) @@ -276,6 +278,18 @@ func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workload assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(fakeSvc.Waypoint.GetHboneMtlsPort())) } +func checkNotExistInEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) { + endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName())) + t.Logf("endpoints number: %v", len(endpoints)) + + all := sets.New[uint32](backendUid...) + for _, endpoint := range endpoints { + if all.Contains(endpoint.BackendUid) { + t.Fatalf("endpoint %v still exists", endpoint.BackendUid) + } + } +} + func checkEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) { endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName())) assert.Equal(t, len(endpoints), len(backendUid))