Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug when delete waypoint, dont update bpf kmesh_service map #422

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bpf/kmesh/workload/include/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v, __u32 service_id,
backend_k.backend_uid = endpoint_v->backend_uid;
backend_v = map_lookup_backend(&backend_k);
if (!backend_v) {
BPF_LOG(WARN, ENDPOINT, "find backend failed");
BPF_LOG(WARN, ENDPOINT, "find backend %u failed", backend_k.backend_uid);
return -ENOENT;
}

Expand Down
2 changes: 1 addition & 1 deletion bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static inline int lb_random_handle(ctx_buff_t *ctx, __u32 service_id, service_va

endpoint_v = map_lookup_endpoint(&endpoint_k);
if (!endpoint_v) {
BPF_LOG(WARN, SERVICE, "find endpoint failed");
BPF_LOG(WARN, SERVICE, "find endpoint [%u/%u] failed", service_id, endpoint_k.backend_index);
return -ENOENT;
}

Expand Down
129 changes: 53 additions & 76 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,19 @@ type Processor struct {
req *service_discovery_v3.DeltaDiscoveryRequest

hashName *HashName
// endpoint indexer, svc key -> workload id -> endpoint
endpointsByService map[string]map[string]Endpoint
// workloads indexer, svc key -> workload id
endpointsByService map[string]map[string]struct{}
bpf *bpf.Cache
Sm *kmeshsecurity.SecretManager
nodeName string
WorkloadCache cache.WorkloadCache
ServiceCache cache.ServiceCache
}

type Endpoint struct {
workloadUid string
serviceName string
portCount uint32
portList []*workloadapi.Port
}

func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
return &Processor{
hashName: NewHashName(),
endpointsByService: make(map[string]map[string]Endpoint),
endpointsByService: make(map[string]map[string]struct{}),
bpf: bpf.NewCache(workloadMap),
nodeName: os.Getenv("NODE_NAME"),
WorkloadCache: cache.NewWorkloadCache(),
Expand Down Expand Up @@ -334,20 +327,14 @@ func (p *Processor) storeEndpointWithService(sk *bpf.ServiceKey, sv *bpf.Service
return nil
}

func (p *Processor) storeServiceCache(workload_uid string, serviceName string, portList *workloadapi.PortList) {
var endpoint Endpoint
endpoint.workloadUid = workload_uid
endpoint.serviceName = serviceName
endpoint.portCount = uint32(len(portList.Ports))
endpoint.portList = portList.Ports

endpointCaches, ok := p.endpointsByService[serviceName]
func (p *Processor) storeServiceEndpoint(workload_uid string, serviceName string) {
wls, ok := p.endpointsByService[serviceName]
if !ok {
p.endpointsByService[serviceName] = make(map[string]Endpoint)
endpointCaches = p.endpointsByService[serviceName]
p.endpointsByService[serviceName] = make(map[string]struct{})
wls = p.endpointsByService[serviceName]
}

endpointCaches[workload_uid] = endpoint
wls[workload_uid] = struct{}{}
}

func (p *Processor) storeBackendData(uid uint32, ip []byte, waypoint *workloadapi.GatewayAddress, portList map[string]*workloadapi.PortList) error {
Expand Down Expand Up @@ -403,7 +390,7 @@ func (p *Processor) handleDataWithService(workload *workloadapi.Workload) error
return err
}

for serviceName, portList := range workload.GetServices() {
for serviceName := range workload.GetServices() {
bk.BackendUid = backend_uid
// for update sense, if the backend is exist, just need update it
if err = p.bpf.BackendLookup(&bk, &bv); err != nil {
Expand All @@ -414,8 +401,8 @@ func (p *Processor) handleDataWithService(workload *workloadapi.Workload) error
log.Errorf("storeEndpointWithService failed, err:%s", err)
return err
}
} else { // the service has not exist in the map yet, we need store it in the endpointsByService
p.storeServiceCache(workload.GetUid(), serviceName, portList)
} else {
p.storeServiceEndpoint(workload.GetUid(), serviceName)
}
}
}
Expand Down Expand Up @@ -518,20 +505,20 @@ func (p *Processor) storeServiceFrontendData(serviceId uint32, service *workload

func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.GatewayAddress, ports []*workloadapi.Port) error {
var (
err error
ek = bpf.EndpointKey{}
ev = bpf.EndpointValue{}
sk = bpf.ServiceKey{}
sv = bpf.ServiceValue{}
err error
ek = bpf.EndpointKey{}
ev = bpf.EndpointValue{}
sk = bpf.ServiceKey{}
oldValue = bpf.ServiceValue{}
)

sk.ServiceId = p.hashName.StrToNum(serviceName)
sv.LbPolicy = LbPolicyRandom
sv.EndpointCount = 0 // there are 0 endpoints in the initial state

newValue := bpf.ServiceValue{}
newValue.LbPolicy = LbPolicyRandom
if waypoint != nil {
sv.WaypointAddr = nets.ConvertIpByteToUint32(waypoint.GetAddress().Address)
sv.WaypointPort = nets.ConvertPortToBigEndian(waypoint.GetHboneMtlsPort())
newValue.WaypointAddr = nets.ConvertIpByteToUint32(waypoint.GetAddress().Address)
newValue.WaypointPort = nets.ConvertPortToBigEndian(waypoint.GetHboneMtlsPort())
}

for i, port := range ports {
Expand All @@ -540,31 +527,39 @@ func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.G
break
}

sv.ServicePort[i] = nets.ConvertPortToBigEndian(port.ServicePort)
newValue.ServicePort[i] = nets.ConvertPortToBigEndian(port.ServicePort)
if strings.Contains(serviceName, "waypoint") {
sv.TargetPort[i] = nets.ConvertPortToBigEndian(KmeshWaypointPort)
newValue.TargetPort[i] = nets.ConvertPortToBigEndian(KmeshWaypointPort)
} else {
sv.TargetPort[i] = nets.ConvertPortToBigEndian(port.TargetPort)
newValue.TargetPort[i] = nets.ConvertPortToBigEndian(port.TargetPort)
}
}

endpointCaches, ok := p.endpointsByService[serviceName]
if ok {
for workloadUid, endpoint := range endpointCaches {
sv.EndpointCount++
ek.ServiceId = p.hashName.StrToNum(endpoint.serviceName)
ek.BackendIndex = sv.EndpointCount
ev.BackendUid = p.hashName.StrToNum(workloadUid)

if err = p.bpf.EndpointUpdate(&ek, &ev); err != nil {
log.Errorf("Update Endpoint failed, err:%s", err)
return err
// Already exists, it means this is service update.
if err = p.bpf.ServiceLookup(&sk, &oldValue); err == nil {
newValue.EndpointCount = oldValue.EndpointCount
} else {
// Only update the endpoint map when the service is first time added
endpointCaches, ok := p.endpointsByService[serviceName]
if ok {
newValue.EndpointCount = uint32(len(endpointCaches))
endpointIndex := uint32(0)
for workloadUid := range endpointCaches {
endpointIndex++
ek.ServiceId = sk.ServiceId
ek.BackendIndex = endpointIndex
ev.BackendUid = p.hashName.StrToNum(workloadUid)

if err = p.bpf.EndpointUpdate(&ek, &ev); err != nil {
log.Errorf("Update Endpoint failed, err:%s", err)
return err
}
}
}
delete(p.endpointsByService, serviceName)
}

if err = p.bpf.ServiceUpdate(&sk, &sv); err != nil {
if err = p.bpf.ServiceUpdate(&sk, &newValue); err != nil {
log.Errorf("Update Service failed, err:%s", err)
}

Expand All @@ -573,38 +568,20 @@ func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.G

func (p *Processor) handleService(service *workloadapi.Service) error {
log.Debugf("service resource name: %s/%s", service.Namespace, service.Hostname)
var (
err error
sk = bpf.ServiceKey{}
sv = bpf.ServiceValue{}
)
p.ServiceCache.AddOrUpdateService(service)
serviceName := service.ResourceName()
serviceId := p.hashName.StrToNum(serviceName)
sk.ServiceId = serviceId
// if service has exist, just need update frontend port info
if err = p.bpf.ServiceLookup(&sk, &sv); err == nil {
// update: delete then store
if err = p.deleteFrontendData(serviceId); err != nil {
log.Errorf("deleteFrontendData failed: %s", err)
return err
}
if err = p.storeServiceFrontendData(serviceId, service); err != nil {
log.Errorf("storeServiceFrontendData failed, err:%s", err)
return err
}
} else {
// store in frontend
if err = p.storeServiceFrontendData(serviceId, service); err != nil {
log.Errorf("storeServiceFrontendData failed, err:%s", err)
return err
}

// get endpoint from ServiceCache, and update service and endpoint map
if err = p.storeServiceData(serviceName, service.GetWaypoint(), service.GetPorts()); err != nil {
log.Errorf("storeServiceData failed, err:%s", err)
return err
}
// store in frontend
if err := p.storeServiceFrontendData(serviceId, service); err != nil {
log.Errorf("storeServiceFrontendData failed, err:%s", err)
return err
}

// get endpoint from ServiceCache, and update service and endpoint map
if err := p.storeServiceData(serviceName, service.GetWaypoint(), service.GetPorts()); err != nil {
log.Errorf("storeServiceData failed, err:%s", err)
return err
}
return nil
}
Expand Down
Loading