From 6bb0b03edf5017708c4c7d24c30452f26ac6a4be Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 26 Oct 2024 18:17:13 +0800 Subject: [PATCH 01/22] kernel-native mode support restart Signed-off-by: lec-bit --- pkg/bpf/ads/loader_enhanced.go | 4 +- pkg/bpf/ads/sock_connection.go | 35 +++++++++++---- pkg/bpf/ads/sock_ops.go | 34 +++++++++++---- pkg/bpf/ads/trace_point.go | 68 ++++++++++++++++++++++++----- pkg/bpf/bpf.go | 12 +++-- pkg/bpf/utils/bpf_helper.go | 32 +++++++++++++- pkg/bpf/workload/sendmsg.go | 12 ----- pkg/bpf/workload/sock_connection.go | 4 +- pkg/bpf/workload/sock_ops.go | 3 +- 9 files changed, 155 insertions(+), 49 deletions(-) diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index 13b9808ef..bd6266c77 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -45,7 +45,9 @@ type BpfAds struct { func NewBpfAds(cfg *options.BpfConfig) (*BpfAds, error) { sc := &BpfAds{} - sc.TracePoint.NewBpf(cfg) + if err := sc.TracePoint.NewBpf(cfg); err != nil { + return sc, err + } if err := sc.SockOps.NewBpf(cfg); err != nil { return sc, err diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index e2db0b1e5..9781065c0 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -22,12 +22,15 @@ package ads import "C" import ( "os" + "path/filepath" "reflect" "syscall" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" + "kmesh.net/kmesh/pkg/bpf/restart" + "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/utils" @@ -100,11 +103,6 @@ func (sc *BpfSockConn) loadKmeshSockConnObjects() (*ebpf.CollectionSpec, error) return nil, err } - value := reflect.ValueOf(sc.KmeshCgroupSockObjects.KmeshCgroupSockPrograms) - if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { - return nil, err - } - return spec, nil } @@ -156,18 +154,37 @@ func (sc *BpfSockConn) close() error { } func (sc *BpfSockConn) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: sc.Info.Cgroup2Path, Attach: sc.Info.AttachType, Program: sc.KmeshCgroupSockObjects.CgroupConnect4Prog, } - lk, err := link.AttachCgroup(cgopt) - if err != nil { + // pin bpf_tail_call map + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent + mapPinPath := filepath.Join(sc.Info.BpfFsPath, "sockconn_tail_call_map") + progPinPath := filepath.Join(sc.Info.BpfFsPath, "sockconn_link") + if restart.GetStartType() == restart.Restart { + if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { + return err + } + if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { + return err + } + } else { + sc.Link, err = link.AttachCgroup(cgopt) + if err != nil { + return err + } + + if err := sc.Link.Pin(progPinPath); err != nil { + return err + } + } + if err = sc.KmeshCgroupSockMaps.KmeshTailCallProg.Pin(mapPinPath); err != nil { return err } - sc.Link = lk - return nil } diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index 22ec2d3d4..0979bcda2 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -21,6 +21,7 @@ package ads import ( "os" + "path/filepath" "reflect" "syscall" @@ -29,6 +30,7 @@ import ( "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" helper "kmesh.net/kmesh/pkg/utils" ) @@ -82,11 +84,6 @@ func (sc *BpfSockOps) loadKmeshSockopsObjects() (*ebpf.CollectionSpec, error) { return nil, err } - value := reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) - if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { - return nil, err - } - return spec, nil } @@ -181,18 +178,37 @@ func (sc *BpfSockOps) Load() error { } func (sc *BpfSockOps) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: sc.Info.Cgroup2Path, Attach: sc.Info.AttachType, Program: sc.KmeshSockopsObjects.SockopsProg, } - lk, err := link.AttachCgroup(cgopt) - if err != nil { + // pin bpf_link and bpf_tail_call map + // pin bpf_link, after restart, update prog in bpf_link + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent + mapPinPath := filepath.Join(sc.Info.BpfFsPath, "sockops_tail_call_map") + progPinPath := filepath.Join(sc.Info.BpfFsPath, "sockops_link") + if restart.GetStartType() == restart.Restart { + if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { + return err + } + if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { + return err + } + } else { + sc.Link, err = link.AttachCgroup(cgopt) + if err != nil { + return err + } + if err = sc.Link.Pin(progPinPath); err != nil { + return err + } + } + if err = sc.KmeshSockopsMaps.KmeshTailCallProg.Pin(mapPinPath); err != nil { return err } - sc.Link = lk - return nil } diff --git a/pkg/bpf/ads/trace_point.go b/pkg/bpf/ads/trace_point.go index 72e2849b9..d7008f205 100644 --- a/pkg/bpf/ads/trace_point.go +++ b/pkg/bpf/ads/trace_point.go @@ -20,11 +20,18 @@ package ads import ( + "os" + "path/filepath" + "reflect" + "syscall" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" + "kmesh.net/kmesh/pkg/bpf/utils" helper "kmesh.net/kmesh/pkg/utils" ) @@ -34,10 +41,24 @@ type BpfTracePoint struct { bpf2go.KmeshTracePointObjects } -func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) { - sc.Info.MapPath = cfg.BpfFsPath - sc.Info.BpfFsPath = cfg.BpfFsPath +func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) error { + sc.Info.MapPath = cfg.BpfFsPath + "/bpf_kmesh/map/" + sc.Info.BpfFsPath = cfg.BpfFsPath + "/bpf_kmesh/tracepoint/" sc.Info.Cgroup2Path = cfg.Cgroup2Path + + if err := os.MkdirAll(sc.Info.MapPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + if err := os.MkdirAll(sc.Info.BpfFsPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + return nil } func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, error) { @@ -61,8 +82,23 @@ func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, err } } - if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { - return nil, err + if restart.GetStartType() == restart.Restart { + pinPath := filepath.Join(sc.Info.BpfFsPath, "connect_ret") + _, err := ebpf.LoadPinnedProgram(pinPath, nil) + if err != nil { + log.Errorf("LoadPinnedProgram failed: %v", err) + return nil, err + } + } else { + if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { + return nil, err + } + + value := reflect.ValueOf(sc.KmeshTracePointObjects.KmeshTracePointPrograms) + if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { + log.Errorf("tracepoint err: %v \n path is:%v", err, sc.Info.BpfFsPath) + return nil, err + } } return spec, nil @@ -81,12 +117,24 @@ func (sc *BpfTracePoint) Attach() error { Program: sc.KmeshTracePointObjects.ConnectRet, } - lk, err := link.AttachRawTracepoint(tpopt) - if err != nil { - return err - } - sc.Link = lk + pinPath := filepath.Join(sc.Info.BpfFsPath, "trace_point_link") + if restart.GetStartType() == restart.Restart { + lk, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + if err != nil { + return err + } + sc.Link = lk + } else { + lk, err := link.AttachRawTracepoint(tpopt) + if err != nil { + return err + } + if err := lk.Pin(pinPath); err != nil { + return err + } + sc.Link = lk + } return nil } diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 2d7c300e9..6c43745c3 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -132,10 +132,14 @@ func StopMda() error { func (l *BpfLoader) Stop() { var err error - if restart.GetExitType() == restart.Restart && l.config.DualEngineEnabled() { - C.deserial_uninit(true) - log.Infof("kmesh restart, not clean bpf map and prog") - return + if restart.GetExitType() == restart.Restart { + ret := C.deserial_uninit(true) + if ret == 0 { + log.Infof("kmesh restart, not clean bpf map and prog") + return + } else { + log.Errorf("kmesh restart, failed to save configuration, deserial_uninit failed, ret=%v\nClean all configuration", ret) + } } closeMap(l.versionMap) diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index eadc1ed69..83af4a184 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -16,7 +16,12 @@ package utils -import "github.com/cilium/ebpf" +import ( + "fmt" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" +) func SetInnerMap(spec *ebpf.CollectionSpec) { var ( @@ -35,3 +40,28 @@ func SetInnerMap(spec *ebpf.CollectionSpec) { } } } + +func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) { + sclink, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + if err != nil { + return nil, err + } + if err := sclink.Update(cgopt.Program); err != nil { + return nil, fmt.Errorf("updating link %s failed: %w", pinPath, err) + } + return sclink, nil +} + +func BpfMapDeleteByPinPath(bpfFsPath string) error { + + progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) + if err != nil { + return fmt.Errorf("loadPinnedProgram failed for %s: %v", bpfFsPath, err) + } + defer progMap.Close() + if err := progMap.Unpin(); err != nil { + return fmt.Errorf("unpin failed for %s: %v", bpfFsPath, err) + } + + return nil +} diff --git a/pkg/bpf/workload/sendmsg.go b/pkg/bpf/workload/sendmsg.go index 7805e724e..71cd5497f 100644 --- a/pkg/bpf/workload/sendmsg.go +++ b/pkg/bpf/workload/sendmsg.go @@ -17,7 +17,6 @@ package workload import ( - "fmt" "os" "path/filepath" "reflect" @@ -33,17 +32,6 @@ import ( helper "kmesh.net/kmesh/pkg/utils" ) -func bpfProgUpdate(pinPath string, cgopt link.CgroupOptions) error { - sclink, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) - if err != nil { - return err - } - if err := sclink.Update(cgopt.Program); err != nil { - return fmt.Errorf("updating link %s failed: %w", pinPath, err) - } - return nil -} - type BpfSendMsgWorkload struct { Info BpfInfo AttachFD int diff --git a/pkg/bpf/workload/sock_connection.go b/pkg/bpf/workload/sock_connection.go index aa37748f2..ea720346d 100644 --- a/pkg/bpf/workload/sock_connection.go +++ b/pkg/bpf/workload/sock_connection.go @@ -144,11 +144,11 @@ func (sc *SockConnWorkload) Attach() error { pinPath6 := filepath.Join(sc.Info.BpfFsPath, "sockconn6_prog") if restart.GetStartType() == restart.Restart { - if err = bpfProgUpdate(pinPath4, cgopt4); err != nil { + if sc.Link, err = utils.BpfProgUpdate(pinPath4, cgopt4); err != nil { return err } - if err = bpfProgUpdate(pinPath6, cgopt6); err != nil { + if sc.Link6, err = utils.BpfProgUpdate(pinPath6, cgopt6); err != nil { return err } } else { diff --git a/pkg/bpf/workload/sock_ops.go b/pkg/bpf/workload/sock_ops.go index db58e8573..dbb6a06d1 100644 --- a/pkg/bpf/workload/sock_ops.go +++ b/pkg/bpf/workload/sock_ops.go @@ -105,6 +105,7 @@ func (so *BpfSockOpsWorkload) LoadSockOps() error { } func (so *BpfSockOpsWorkload) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: so.Info.Cgroup2Path, Attach: so.Info.AttachType, @@ -113,7 +114,7 @@ func (so *BpfSockOpsWorkload) Attach() error { pinPath := filepath.Join(so.Info.BpfFsPath, "cgroup_sockops_prog") if restart.GetStartType() == restart.Restart { - if err := bpfProgUpdate(pinPath, cgopt); err != nil { + if so.Link, err = utils.BpfProgUpdate(pinPath, cgopt); err != nil { return err } } else { From 6193ad210dfa5fae99e38b0e6d425ccfc333fffd Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 26 Oct 2024 18:46:11 +0800 Subject: [PATCH 02/22] make gen Signed-off-by: lec-bit --- pkg/bpf/utils/bpf_helper.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index 83af4a184..f573b41d3 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -53,7 +53,6 @@ func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) } func BpfMapDeleteByPinPath(bpfFsPath string) error { - progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) if err != nil { return fmt.Errorf("loadPinnedProgram failed for %s: %v", bpfFsPath, err) From 753953c59d6d8fef9ac4923dd274a25b1b9c66ab Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 26 Oct 2024 19:51:28 +0800 Subject: [PATCH 03/22] update Signed-off-by: lec-bit --- pkg/bpf/ads/loader_enhanced.go | 6 +++--- pkg/bpf/ads/sock_connection.go | 5 +++-- pkg/bpf/ads/trace_point.go | 13 +++++++------ pkg/bpf/utils/bpf_helper.go | 2 +- pkg/constants/constants.go | 3 +++ 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index bd6266c77..531776915 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -46,15 +46,15 @@ type BpfAds struct { func NewBpfAds(cfg *options.BpfConfig) (*BpfAds, error) { sc := &BpfAds{} if err := sc.TracePoint.NewBpf(cfg); err != nil { - return sc, err + return nil, err } if err := sc.SockOps.NewBpf(cfg); err != nil { - return sc, err + return nil, err } if err := sc.SockConn.NewBpf(cfg); err != nil { - return sc, err + return nil, err } return sc, nil } diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index 9781065c0..84d7fcaad 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -30,6 +30,7 @@ import ( "github.com/cilium/ebpf/link" "kmesh.net/kmesh/pkg/bpf/restart" + "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" @@ -163,8 +164,8 @@ func (sc *BpfSockConn) Attach() error { // pin bpf_tail_call map // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent - mapPinPath := filepath.Join(sc.Info.BpfFsPath, "sockconn_tail_call_map") - progPinPath := filepath.Join(sc.Info.BpfFsPath, "sockconn_link") + mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Tail_call_map) + progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { return err diff --git a/pkg/bpf/ads/trace_point.go b/pkg/bpf/ads/trace_point.go index d7008f205..8f0f951f2 100644 --- a/pkg/bpf/ads/trace_point.go +++ b/pkg/bpf/ads/trace_point.go @@ -32,6 +32,7 @@ import ( "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" + "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) @@ -112,28 +113,28 @@ func (sc *BpfTracePoint) Load() error { } func (sc *BpfTracePoint) Attach() error { + var err error tpopt := link.RawTracepointOptions{ Name: "connect_ret", Program: sc.KmeshTracePointObjects.ConnectRet, } - pinPath := filepath.Join(sc.Info.BpfFsPath, "trace_point_link") + pinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { - lk, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + sc.Link, err = link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) if err != nil { return err } - sc.Link = lk } else { - lk, err := link.AttachRawTracepoint(tpopt) + sc.Link, err = link.AttachRawTracepoint(tpopt) if err != nil { return err } - if err := lk.Pin(pinPath); err != nil { + if err := sc.Link.Pin(pinPath); err != nil { return err } - sc.Link = lk + } return nil } diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index f573b41d3..ef8097686 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -55,7 +55,7 @@ func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) func BpfMapDeleteByPinPath(bpfFsPath string) error { progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) if err != nil { - return fmt.Errorf("loadPinnedProgram failed for %s: %v", bpfFsPath, err) + return fmt.Errorf("loadPinnedMap failed for %s: %v, when kmesh delete by pin path", bpfFsPath, err) } defer progMap.Close() if err := progMap.Unpin(); err != nil { diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index ac8568196..0f25db0b7 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -60,4 +60,7 @@ const ( VersionPath = "/bpf_kmesh/map/" WorkloadVersionPath = "/bpf_kmesh_workload/map/" + + Tail_call_map = "tail_call_map" + Prog_link = "prog_link" ) From 5a1aee9384eee5fed20c7fc9e43499f4152642f1 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Wed, 30 Oct 2024 18:07:49 +0800 Subject: [PATCH 04/22] update constants.Tail_call_map Signed-off-by: lec-bit --- pkg/bpf/ads/sock_connection.go | 2 +- pkg/constants/constants.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index 84d7fcaad..fcefffa14 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -164,7 +164,7 @@ func (sc *BpfSockConn) Attach() error { // pin bpf_tail_call map // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent - mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Tail_call_map) + mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 0f25db0b7..ad6c80e0e 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -61,6 +61,6 @@ const ( VersionPath = "/bpf_kmesh/map/" WorkloadVersionPath = "/bpf_kmesh_workload/map/" - Tail_call_map = "tail_call_map" + TailCallMap = "tail_call_map" Prog_link = "prog_link" ) From af523f6b537d981b1fa685b105818db3b1eff722 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 5 Nov 2024 11:05:19 +0800 Subject: [PATCH 05/22] Configuration recovery after restart in kernel-native mode Signed-off-by: lec-bit --- pkg/cache/v2/cluster.go | 4 + pkg/cache/v2/listener.go | 4 + pkg/cache/v2/route.go | 4 + pkg/controller/ads/ads_hash.go | 121 ++++++++++++++++++++++++++++ pkg/controller/ads/ads_processor.go | 25 +++++- pkg/controller/client.go | 12 +++ 6 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 pkg/controller/ads/ads_hash.go diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 7023d0728..85303dffe 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -173,3 +173,7 @@ func (cache *ClusterCache) Dump() []*cluster_v2.Cluster { } return clusters } + +func (cache *ClusterCache) GetClusterHashPtr() *map[string][2]uint64 { + return &cache.resourceHash +} diff --git a/pkg/cache/v2/listener.go b/pkg/cache/v2/listener.go index 7228e7f74..61cfa73c7 100644 --- a/pkg/cache/v2/listener.go +++ b/pkg/cache/v2/listener.go @@ -136,3 +136,7 @@ func (cache *ListenerCache) Dump() []*listener_v2.Listener { } return listeners } + +func (cache *ListenerCache) GetListenerHashPtr() *map[string]uint64 { + return &cache.resourceHash +} diff --git a/pkg/cache/v2/route.go b/pkg/cache/v2/route.go index ce2e69e4b..328365217 100644 --- a/pkg/cache/v2/route.go +++ b/pkg/cache/v2/route.go @@ -117,3 +117,7 @@ func (cache *RouteConfigCache) Dump() []*route_v2.RouteConfiguration { } return mapCache } + +func (cache *RouteConfigCache) GetRouteHashPtr() *map[string]uint64 { + return &cache.resourceHash +} diff --git a/pkg/controller/ads/ads_hash.go b/pkg/controller/ads/ads_hash.go new file mode 100644 index 000000000..f183f67d8 --- /dev/null +++ b/pkg/controller/ads/ads_hash.go @@ -0,0 +1,121 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ads + +import ( + "os" + + cache_v2 "kmesh.net/kmesh/pkg/cache/v2" + maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" + "sigs.k8s.io/yaml" +) + +const ( + persistPath = "/mnt/kernel_native_hash_name.yaml" +) + +// HashName converts a string to a uint32 integer as the key of bpf map +type HashName struct { + NameToCds map[string][2]uint64 + NameToLds map[string]uint64 + NameToRds map[string]uint64 +} + +// HashName creates a new HashName instance +func NewHashName() *HashName { + return &HashName{ + NameToCds: make(map[string][2]uint64), + NameToLds: make(map[string]uint64), + NameToRds: make(map[string]uint64), + } +} + +func ReadFromPersistFile(h *HashName) error { + data, err := os.ReadFile(persistPath) + if err != nil { + return nil + } + + return yaml.Unmarshal(data, h) +} + +func WritePersistFile(h *HashName) error { + data, err := yaml.Marshal(h) + if err != nil { + return err + } + + return os.WriteFile(persistPath, data, 0644) +} + +// Should only be used by test +func ResetPersistFile() { + os.Remove(persistPath) +} + +func HandleRemovedCdsAndEdsDuringRestart(cache *cache_v2.ClusterCache) error { + hashName := NewHashName() + if ReadFromPersistFile(hashName) != nil { + return nil + } + + for key := range hashName.NameToCds { + if cache.GetEdsHash(key) == 0 { + err := maps_v2.ClusterDelete(key) + if err != nil { + return err + } + } + } + return nil + +} + +func HandleRemovedLdsDuringRestart(cache *cache_v2.ListenerCache) error { + hashName := NewHashName() + if ReadFromPersistFile(hashName) != nil { + return nil + } + for key := range hashName.NameToLds { + if cache.GetLdsHash(key) == 0 { + listener := cache.GetApiListener(key) + err := maps_v2.ListenerDelete(listener.GetAddress()) + if err != nil { + return err + } + } + } + return nil + +} + +func HandleRemovedRdsDuringRestart(cache *cache_v2.RouteConfigCache) error { + hashName := NewHashName() + if ReadFromPersistFile(hashName) != nil { + return nil + } + for key := range hashName.NameToRds { + if cache.GetRdsHash(key) == 0 { + err := maps_v2.RouteConfigDelete(key) + if err != nil { + return err + } + } + } + return nil + +} diff --git a/pkg/controller/ads/ads_processor.go b/pkg/controller/ads/ads_processor.go index d49f22de1..628496224 100644 --- a/pkg/controller/ads/ads_processor.go +++ b/pkg/controller/ads/ads_processor.go @@ -18,6 +18,7 @@ package ads import ( "fmt" + "sync" config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -54,6 +55,7 @@ type processor struct { lastNonce *lastNonce // the channel used to send domains to dns resolver. key is domain name and value is refreshrate DnsResolverChan chan []*config_cluster_v3.Cluster + once [3]sync.Once } func newProcessor() *processor { @@ -202,6 +204,7 @@ func (p *processor) handleCdsResponse(resp *service_discovery_v3.DiscoveryRespon } func (p *processor) handleEdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { + var err error var loadAssignment = &config_endpoint_v3.ClusterLoadAssignment{} p.lastNonce.edsNonce = resp.Nonce for _, resource := range resp.GetResources() { @@ -238,8 +241,13 @@ func (p *processor) handleEdsResponse(resp *service_discovery_v3.DiscoveryRespon } p.Cache.ClusterCache.Flush() + p.once[0].Do(func() { + if err = HandleRemovedCdsAndEdsDuringRestart(&p.Cache.ClusterCache); err != nil { + fmt.Println("Error:", err) + } + }) - return nil + return err } func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { @@ -279,6 +287,11 @@ func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryRespon } p.Cache.ListenerCache.Flush() + p.once[1].Do(func() { + if err := HandleRemovedLdsDuringRestart(&p.Cache.ListenerCache); err != nil { + fmt.Println("Error:", err) + } + }) if !slices.EqualUnordered(p.Cache.routeNames, lastRouteNames) { // we cannot set the nonce here. @@ -286,10 +299,11 @@ func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryRespon // Then it will lead to this request been ignored, we will lose the new rds resource p.req = newAdsRequest(resource_v3.RouteType, p.Cache.routeNames, "") } - return nil + return err } func (p *processor) handleRdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { + var err error routeConfiguration := &config_route_v3.RouteConfiguration{} p.lastNonce.rdsNonce = resp.Nonce @@ -318,7 +332,12 @@ func (p *processor) handleRdsResponse(resp *service_discovery_v3.DiscoveryRespon p.Cache.RouteCache.UpdateApiRouteStatus(key, core_v2.ApiStatus_DELETE) } p.Cache.RouteCache.Flush() - return nil + p.once[2].Do(func() { + if err = HandleRemovedRdsDuringRestart(&p.Cache.RouteCache); err != nil { + fmt.Println("Error:", err) + } + }) + return err } func (p *processor) Reset() { diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 30f23da5c..5c89e3697 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "kmesh.net/kmesh/pkg/bpf/restart" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/ads" @@ -168,5 +169,16 @@ func (c *XdsClient) closeStreamClient() { } func (c *XdsClient) Close() error { + if restart.GetExitType() == restart.Restart { + hashName := &ads.HashName{ + NameToCds: *c.AdsController.Processor.Cache.ClusterCache.GetClusterHashPtr(), + NameToLds: *c.AdsController.Processor.Cache.ListenerCache.GetListenerHashPtr(), + NameToRds: *c.AdsController.Processor.Cache.RouteCache.GetRouteHashPtr(), + } + ads.ResetPersistFile() + if err := ads.WritePersistFile(hashName); err != nil { + return err + } + } return nil } From 9e9f6827a5010e4983eb847242dec5137153ecc5 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 5 Nov 2024 11:53:56 +0800 Subject: [PATCH 06/22] typo Signed-off-by: lec-bit --- bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c index 2d03b506b..52107aa05 100644 --- a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c +++ b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c @@ -1789,7 +1789,7 @@ int inner_map_mng_restore_by_persist_stat(struct persist_info *p, struct inner_m // What is recorded in g_inner_map_mng.inner_maps[i].map_fd is map_id. // Use map_id to get the fd of the inner_map in this process and refresh // it to inner_maps for use. - for (int i = 0; i < p->max_allocated_idx; i++) { + for (int i = 0; i <= p->max_allocated_idx; i++) { if (g_inner_map_mng.inner_maps[i].allocated) { int map_fd = bpf_map_get_fd_by_id(g_inner_map_mng.inner_maps[i].map_fd); if (map_fd < 0) { From 976cdd20b40ec963e1c45aed682600308578c3fe Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 5 Nov 2024 13:07:33 +0800 Subject: [PATCH 07/22] gofmt update Signed-off-by: lec-bit --- pkg/constants/constants.go | 2 +- pkg/controller/ads/ads_hash.go | 4 ---- pkg/controller/client.go | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index ad6c80e0e..9cc1caf01 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -62,5 +62,5 @@ const ( WorkloadVersionPath = "/bpf_kmesh_workload/map/" TailCallMap = "tail_call_map" - Prog_link = "prog_link" + Prog_link = "prog_link" ) diff --git a/pkg/controller/ads/ads_hash.go b/pkg/controller/ads/ads_hash.go index f183f67d8..e9d3781a9 100644 --- a/pkg/controller/ads/ads_hash.go +++ b/pkg/controller/ads/ads_hash.go @@ -18,7 +18,6 @@ package ads import ( "os" - cache_v2 "kmesh.net/kmesh/pkg/cache/v2" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "sigs.k8s.io/yaml" @@ -82,7 +81,6 @@ func HandleRemovedCdsAndEdsDuringRestart(cache *cache_v2.ClusterCache) error { } } return nil - } func HandleRemovedLdsDuringRestart(cache *cache_v2.ListenerCache) error { @@ -100,7 +98,6 @@ func HandleRemovedLdsDuringRestart(cache *cache_v2.ListenerCache) error { } } return nil - } func HandleRemovedRdsDuringRestart(cache *cache_v2.RouteConfigCache) error { @@ -117,5 +114,4 @@ func HandleRemovedRdsDuringRestart(cache *cache_v2.RouteConfigCache) error { } } return nil - } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 59a023995..750e17c1b 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -26,8 +26,8 @@ import ( "google.golang.org/grpc/metadata" istioGrpc "istio.io/istio/pilot/pkg/grpc" - "kmesh.net/kmesh/pkg/bpf/restart" bpfads "kmesh.net/kmesh/pkg/bpf/ads" + "kmesh.net/kmesh/pkg/bpf/restart" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/ads" From 5446dd1ac45db432dc2dd02c37c5c3a83879571b Mon Sep 17 00:00:00 2001 From: lec-bit Date: Wed, 6 Nov 2024 19:20:14 +0800 Subject: [PATCH 08/22] add ut test Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 75 ++++++++++++++++++++++++++-------- pkg/controller/ads/ads_hash.go | 2 +- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index f3f97230b..bd03ce15b 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -31,15 +31,21 @@ import ( ) func TestRestart(t *testing.T) { - t.Run("new start", func(t *testing.T) { - runTestNormal(t) + t.Run("new start DualEengine", func(t *testing.T) { + runTestNormalDualEengine(t) }) - t.Run("restart", func(t *testing.T) { - runTestRestart(t) + t.Run("new start KernelNative", func(t *testing.T) { + runTestNormalKernelNative(t) + }) + t.Run("restart DualEngine", func(t *testing.T) { + runTestRestartDualEngine(t) + }) + t.Run("restart KernelNative", func(t *testing.T) { + runTestRestartKernelNative(t) }) } -func setDir(t *testing.T) options.BpfConfig { +func setDir(t *testing.T) { if err := os.MkdirAll("/mnt/kmesh_cgroup2", 0755); err != nil { t.Fatalf("Failed to create dir /mnt/kmesh_cgroup2: %v", err) } @@ -56,6 +62,20 @@ func setDir(t *testing.T) options.BpfConfig { CleanupBpfMap() t.Fatalf("Failed to remove mem limit: %v", err) } +} + +func NormalStart(t *testing.T, config options.BpfConfig) { + bpfLoader := NewBpfLoader(&config) + if err := bpfLoader.Start(); err != nil { + assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) + } + assert.Equal(t, restart.Normal, restart.GetStartType(), "set kmesh start status failed") + restart.SetExitType(restart.Normal) + bpfLoader.Stop() +} + +func setDirDualEngine(t *testing.T) options.BpfConfig { + setDir(t) return options.BpfConfig{ Mode: constants.DualEngineMode, @@ -64,23 +84,32 @@ func setDir(t *testing.T) options.BpfConfig { } } -// Test Kmesh Normal -func runTestNormal(t *testing.T) { - config := setDir(t) +func setDirKernelNative(t *testing.T) options.BpfConfig { + setDir(t) - bpfLoader := NewBpfLoader(&config) - if err := bpfLoader.Start(); err != nil { - assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) + return options.BpfConfig{ + Mode: constants.KernelNativeMode, + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", } - assert.Equal(t, restart.Normal, restart.GetStartType(), "set kmesh start status failed") - restart.SetExitType(restart.Normal) - bpfLoader.Stop() } -// Test Kmesh Restart Normal -func runTestRestart(t *testing.T) { +// Test Kmesh Normal DualEengine +func runTestNormalDualEengine(t *testing.T) { + config := setDirDualEngine(t) + + NormalStart(t, config) +} + +// Test Kmesh Normal KernelNative +func runTestNormalKernelNative(t *testing.T) { + config := setDirKernelNative(t) + + NormalStart(t, config) +} + +func KmeshRestart(t *testing.T, config options.BpfConfig) { var versionPath string - config := setDir(t) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) @@ -106,3 +135,15 @@ func runTestRestart(t *testing.T) { restart.SetExitType(restart.Normal) bpfLoader.Stop() } + +// Test Kmesh Restart DualEngine +func runTestRestartDualEngine(t *testing.T) { + config := setDirDualEngine(t) + KmeshRestart(t, config) +} + +// Test Kmesh Restart KernelNative +func runTestRestartKernelNative(t *testing.T) { + config := setDirKernelNative(t) + KmeshRestart(t, config) +} diff --git a/pkg/controller/ads/ads_hash.go b/pkg/controller/ads/ads_hash.go index e9d3781a9..36ea4db92 100644 --- a/pkg/controller/ads/ads_hash.go +++ b/pkg/controller/ads/ads_hash.go @@ -17,9 +17,9 @@ package ads import ( - "os" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" + "os" "sigs.k8s.io/yaml" ) From 1cd03bf2a0fbeb1bf1b159afb1f2e2aa001b88a0 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Wed, 6 Nov 2024 23:37:08 +0800 Subject: [PATCH 09/22] update golanglint Signed-off-by: lec-bit --- pkg/controller/ads/ads_hash.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/controller/ads/ads_hash.go b/pkg/controller/ads/ads_hash.go index 36ea4db92..5e61b3397 100644 --- a/pkg/controller/ads/ads_hash.go +++ b/pkg/controller/ads/ads_hash.go @@ -17,10 +17,12 @@ package ads import ( - cache_v2 "kmesh.net/kmesh/pkg/cache/v2" - maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "os" + "sigs.k8s.io/yaml" + + cache_v2 "kmesh.net/kmesh/pkg/cache/v2" + maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" ) const ( From 9d2f19b6e47f8ce5d66082a089e43d3c7c4f6d79 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Thu, 7 Nov 2024 19:52:57 +0800 Subject: [PATCH 10/22] fix bpf_restart ut Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index bd03ce15b..03e7a27a6 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -110,6 +110,8 @@ func runTestNormalKernelNative(t *testing.T) { func KmeshRestart(t *testing.T, config options.BpfConfig) { var versionPath string + log.Info("start type:%v", restart.GetStartType()) + restart.SetStartType(restart.Normal) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) From ec6c35d14c558eb594834662b450d606a131498e Mon Sep 17 00:00:00 2001 From: lec-bit Date: Thu, 7 Nov 2024 20:21:02 +0800 Subject: [PATCH 11/22] update Signed-off-by: lec-bit --- pkg/bpf/ads/sock_connection.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index 4b13604b2..6257373c1 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -29,7 +29,6 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" - "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/constants" From b0a5cadb766ebc2bfd878ca938dde518cd825973 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 19 Nov 2024 11:39:21 +0000 Subject: [PATCH 12/22] adapt deserial Signed-off-by: lec-bit --- pkg/bpf/bpf.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index f698d7175..ba074af15 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -156,11 +156,11 @@ func StopMda() error { func (l *BpfLoader) Stop() { var err error if restart.GetExitType() == restart.Restart { - C.deserial_uninit() + return } closeMap(l.versionMap) - + C.deserial_uninit() if l.config.KernelNativeEnabled() { if err = l.obj.Stop(); err != nil { CleanupBpfMap() From a7b9bb8b93dbf8113f3cb0bfdfc4bd6414eb9f5c Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 19 Nov 2024 15:43:55 +0000 Subject: [PATCH 13/22] update Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index 43ccec211..e24fcc955 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -26,7 +26,6 @@ import ( "github.com/cilium/ebpf/rlimit" "github.com/stretchr/testify/assert" - "istio.io/pkg/log" corev1 "k8s.io/api/core/v1" "kmesh.net/kmesh/daemon/options" @@ -114,7 +113,6 @@ func runTestNormalKernelNative(t *testing.T) { func KmeshRestart(t *testing.T, config options.BpfConfig) { var versionPath string - log.Info("start type:%v", restart.GetStartType()) restart.SetStartType(restart.Normal) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { From 8be1dfe616dad0986198cd9b18039971701aa329 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Fri, 22 Nov 2024 01:48:56 +0000 Subject: [PATCH 14/22] optimize logic of Unpin tail_call map Signed-off-by: lec-bit --- pkg/bpf/ads/sock_connection.go | 9 ++++++--- pkg/bpf/ads/sock_ops.go | 10 +++++++--- pkg/bpf/utils/bpf_helper.go | 24 ++++++++++++------------ 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index 1a1b6a414..ab3da019e 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -168,9 +168,12 @@ func (sc *BpfSockConn) Attach() error { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { return err } - if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { - return err - } + + // Unpin tailcallmap. Considering that kmesh coredump may not have + // this path after an unexpected restart, here we unpin the file by + // directly removing it without doing error handling. + os.Remove(mapPinPath) + } else { sc.Link, err = link.AttachCgroup(cgopt) if err != nil { diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index f1f7c55d2..a1642a6da 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -193,9 +193,13 @@ func (sc *BpfSockOps) Attach() error { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { return err } - if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { - return err - } + // Unpin tailcallmap. Considering that kmesh coredump may not have + // this path after an unexpected restart, here we unpin the file by + // directly removing it without doing error handling. + os.Remove(mapPinPath) + // if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { + // return err + // } } else { sc.Link, err = link.AttachCgroup(cgopt) if err != nil { diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index 6ed3356b2..cec49ae1c 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -44,15 +44,15 @@ func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) return sclink, nil } -func BpfMapDeleteByPinPath(bpfFsPath string) error { - progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) - if err != nil { - return fmt.Errorf("loadPinnedMap failed for %s: %v, when kmesh delete by pin path", bpfFsPath, err) - } - defer progMap.Close() - if err := progMap.Unpin(); err != nil { - return fmt.Errorf("unpin failed for %s: %v", bpfFsPath, err) - } - - return nil -} +// func BpfMapDeleteByPinPath(bpfFsPath string) error { +// progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) +// if err != nil { +// return fmt.Errorf("loadPinnedMap failed for %s: %v, when kmesh delete by pin path", bpfFsPath, err) +// } +// defer progMap.Close() +// if err := progMap.Unpin(); err != nil { +// return fmt.Errorf("unpin failed for %s: %v", bpfFsPath, err) +// } + +// return nil +// } From 8d9808c736f226c3c373eb726f5c4adb46a4f023 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Fri, 22 Nov 2024 02:13:54 +0000 Subject: [PATCH 15/22] type Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index e24fcc955..897561cca 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -34,8 +34,8 @@ import ( ) func TestRestart(t *testing.T) { - t.Run("new start DualEengine", func(t *testing.T) { - runTestNormalDualEengine(t) + t.Run("new start DualEngine", func(t *testing.T) { + runTestNormalDualEngine(t) }) t.Run("new start KernelNative", func(t *testing.T) { runTestNormalKernelNative(t) @@ -97,8 +97,8 @@ func setDirKernelNative(t *testing.T) options.BpfConfig { } } -// Test Kmesh Normal DualEengine -func runTestNormalDualEengine(t *testing.T) { +// Test Kmesh Normal DualEngine +func runTestNormalDualEngine(t *testing.T) { config := setDirDualEngine(t) NormalStart(t, config) From 1de44abf1f50c3925b9ea06cfdea5e3ea6c1beb8 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Fri, 22 Nov 2024 17:46:22 +0800 Subject: [PATCH 16/22] optimize filepath Signed-off-by: lec-bit --- pkg/bpf/ads/sock_ops.go | 8 +++----- pkg/bpf/ads/trace_point.go | 21 ++------------------- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index a1642a6da..34dfd4c71 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -32,6 +32,7 @@ import ( "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" + "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) @@ -187,8 +188,8 @@ func (sc *BpfSockOps) Attach() error { // pin bpf_link and bpf_tail_call map // pin bpf_link, after restart, update prog in bpf_link // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent - mapPinPath := filepath.Join(sc.Info.BpfFsPath, "sockops_tail_call_map") - progPinPath := filepath.Join(sc.Info.BpfFsPath, "sockops_link") + mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) + progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { return err @@ -197,9 +198,6 @@ func (sc *BpfSockOps) Attach() error { // this path after an unexpected restart, here we unpin the file by // directly removing it without doing error handling. os.Remove(mapPinPath) - // if err = utils.BpfMapDeleteByPinPath(mapPinPath); err != nil { - // return err - // } } else { sc.Link, err = link.AttachCgroup(cgopt) if err != nil { diff --git a/pkg/bpf/ads/trace_point.go b/pkg/bpf/ads/trace_point.go index 61fb53a58..5f0258935 100644 --- a/pkg/bpf/ads/trace_point.go +++ b/pkg/bpf/ads/trace_point.go @@ -22,7 +22,6 @@ package ads import ( "os" "path/filepath" - "reflect" "syscall" "github.com/cilium/ebpf" @@ -31,7 +30,6 @@ import ( bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/kernelnative/enhanced" "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/restart" - "kmesh.net/kmesh/pkg/bpf/utils" "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) @@ -83,23 +81,8 @@ func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, err } } - if restart.GetStartType() == restart.Restart { - pinPath := filepath.Join(sc.Info.BpfFsPath, "connect_ret") - _, err := ebpf.LoadPinnedProgram(pinPath, nil) - if err != nil { - log.Errorf("LoadPinnedProgram failed: %v", err) - return nil, err - } - } else { - if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { - return nil, err - } - - value := reflect.ValueOf(sc.KmeshTracePointObjects.KmeshTracePointPrograms) - if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { - log.Errorf("tracepoint err: %v \n path is:%v", err, sc.Info.BpfFsPath) - return nil, err - } + if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { + return nil, err } return spec, nil From e3e1066af6f4bdc5291c8be056b3e4a3ee20ecdc Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 23 Nov 2024 00:28:23 +0800 Subject: [PATCH 17/22] fix ut golanglint failed Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index 897561cca..511f244bd 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -17,6 +17,7 @@ package bpf import ( + "fmt" "os" "path/filepath" "syscall" @@ -48,23 +49,28 @@ func TestRestart(t *testing.T) { }) } -func setDir(t *testing.T) { - if err := os.MkdirAll("/mnt/kmesh_cgroup2", 0755); err != nil { - t.Fatalf("Failed to create dir /mnt/kmesh_cgroup2: %v", err) +func setDir() (err error) { + defer func() { + if err != nil { + CleanupBpfMap() + } + }() + + if err = os.MkdirAll("/mnt/kmesh_cgroup2", 0755); err != nil { + return fmt.Errorf("Failed to create dir /mnt/kmesh_cgroup2: %v", err) } - if err := syscall.Mount("none", "/mnt/kmesh_cgroup2/", "cgroup2", 0, ""); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to mount /mnt/kmesh_cgroup2/: %v", err) + + if err = syscall.Mount("none", "/mnt/kmesh_cgroup2/", "cgroup2", 0, ""); err != nil { + return fmt.Errorf("Failed to mount /mnt/kmesh_cgroup2/: %v", err) } - if err := syscall.Mount("/sys/fs/bpf", "/sys/fs/bpf", "bpf", 0, ""); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to mount /sys/fs/bpf: %v", err) + if err = syscall.Mount("/sys/fs/bpf", "/sys/fs/bpf", "bpf", 0, ""); err != nil { + return fmt.Errorf("Failed to mount /sys/fs/bpf: %v", err) } - if err := rlimit.RemoveMemlock(); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to remove mem limit: %v", err) + if err = rlimit.RemoveMemlock(); err != nil { + return fmt.Errorf("Failed to remove mem limit: %v", err) } + return nil } func NormalStart(t *testing.T, config options.BpfConfig) { @@ -78,7 +84,9 @@ func NormalStart(t *testing.T, config options.BpfConfig) { } func setDirDualEngine(t *testing.T) options.BpfConfig { - setDir(t) + if err := setDir(); err != nil { + t.Fatalf("setDir Failed: %v", err) + } return options.BpfConfig{ Mode: constants.DualEngineMode, @@ -88,8 +96,9 @@ func setDirDualEngine(t *testing.T) options.BpfConfig { } func setDirKernelNative(t *testing.T) options.BpfConfig { - setDir(t) - + if err := setDir(); err != nil { + t.Fatalf("setDir Failed: %v", err) + } return options.BpfConfig{ Mode: constants.KernelNativeMode, BpfFsPath: "/sys/fs/bpf", From 8f43e658db0eac1f198dd3f3ef39d25b2505a9b3 Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 23 Nov 2024 15:58:33 +0800 Subject: [PATCH 18/22] fix ut Signed-off-by: lec-bit --- pkg/bpf/bpf_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index ed642554f..ef55d9dea 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -33,7 +33,7 @@ import ( ) func TestGetKmeshConfigMap(t *testing.T) { - config := setDir(t) + config := setDirDualEngine(t) bpfLoader := NewBpfLoader(&config) err := bpfLoader.Start() assert.NoError(t, err) @@ -44,7 +44,7 @@ func TestGetKmeshConfigMap(t *testing.T) { } func TestUpdateKmeshConfigMap(t *testing.T) { - config := setDir(t) + config := setDirDualEngine(t) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) From 55c04f8cb2628b1cddd8db99be5febc8d1b02bfa Mon Sep 17 00:00:00 2001 From: lec-bit Date: Sat, 23 Nov 2024 16:42:53 +0800 Subject: [PATCH 19/22] typo Signed-off-by: lec-bit --- pkg/bpf/ads/sock_connection.go | 2 +- pkg/bpf/ads/sock_ops.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index ab3da019e..a84be268a 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -161,7 +161,7 @@ func (sc *BpfSockConn) Attach() error { } // pin bpf_tail_call map - // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index 34dfd4c71..13ac90658 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -187,7 +187,7 @@ func (sc *BpfSockOps) Attach() error { // pin bpf_link and bpf_tail_call map // pin bpf_link, after restart, update prog in bpf_link - // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independent + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { From 5a0e629b4bfc00d99cbc9585f42a72ba1ab93f0e Mon Sep 17 00:00:00 2001 From: lec-bit Date: Mon, 25 Nov 2024 01:50:55 +0000 Subject: [PATCH 20/22] typo, unnecessary code Signed-off-by: lec-bit --- pkg/bpf/utils/bpf_helper.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index cec49ae1c..467f6af8c 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -43,16 +43,3 @@ func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) } return sclink, nil } - -// func BpfMapDeleteByPinPath(bpfFsPath string) error { -// progMap, err := ebpf.LoadPinnedMap(bpfFsPath, nil) -// if err != nil { -// return fmt.Errorf("loadPinnedMap failed for %s: %v, when kmesh delete by pin path", bpfFsPath, err) -// } -// defer progMap.Close() -// if err := progMap.Unpin(); err != nil { -// return fmt.Errorf("unpin failed for %s: %v", bpfFsPath, err) -// } - -// return nil -// } From f316c0540b2c5e8dd6b66caec105bd2233a9e37b Mon Sep 17 00:00:00 2001 From: lec-bit Date: Tue, 26 Nov 2024 06:22:12 +0000 Subject: [PATCH 21/22] Refactor configuration recovery Signed-off-by: lec-bit --- .../deserialization_to_bpf_map.c | 1 + pkg/bpf/bpf.go | 2 +- pkg/cache/v2/cluster.go | 19 ++- pkg/cache/v2/listener.go | 19 ++- pkg/cache/v2/route.go | 19 ++- pkg/controller/ads/ads_hash.go | 119 ------------------ pkg/controller/ads/ads_processor.go | 17 --- pkg/controller/client.go | 12 -- 8 files changed, 44 insertions(+), 164 deletions(-) delete mode 100644 pkg/controller/ads/ads_hash.go diff --git a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c index 98d8ff267..37730a36c 100644 --- a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c +++ b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c @@ -869,6 +869,7 @@ static void *create_struct(struct op_context *ctx, int *err) } ctx->value = value; + ((ProtobufCMessage *)value)->descriptor = desc; for (i = 0; i < desc->n_fields; i++) { const ProtobufCFieldDescriptor *field = desc->fields + i; diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index ffc06c664..9e727b97b 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -156,12 +156,12 @@ func StopMda() error { func (l *BpfLoader) Stop() { var err error + C.deserial_uninit() if restart.GetExitType() == restart.Restart { return } closeMap(l.versionMap) - C.deserial_uninit() if l.config.KernelNativeEnabled() { if err = l.obj.Stop(); err != nil { CleanupBpfMap() diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index fb997990e..910aa38dd 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -28,6 +28,7 @@ import ( cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" bpfads "kmesh.net/kmesh/pkg/bpf/ads" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/consistenthash/maglev" "kmesh.net/kmesh/pkg/utils" @@ -56,8 +57,20 @@ func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCac if bpfAds != nil { clusterStatsMap = bpfAds.GetClusterStatsMap() } + apiClusterCache := newApiClusterCache() + if restart.GetStartType() == restart.Restart { + Clusters, err := maps_v2.ClusterLookupAll() + if err != nil { + log.Errorf("ClusterLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, Cluster := range Clusters { + apiClusterCache[Cluster.Name] = &cluster_v2.Cluster{} + } + } return ClusterCache{ - apiClusterCache: newApiClusterCache(), + apiClusterCache: apiClusterCache, resourceHash: make(map[string][2]uint64), hashName: hashName, clusterStatsMap: clusterStatsMap, @@ -232,7 +245,3 @@ func (cache *ClusterCache) Dump() []*cluster_v2.Cluster { } return clusters } - -func (cache *ClusterCache) GetClusterHashPtr() *map[string][2]uint64 { - return &cache.resourceHash -} diff --git a/pkg/cache/v2/listener.go b/pkg/cache/v2/listener.go index 61cfa73c7..17e3c91fe 100644 --- a/pkg/cache/v2/listener.go +++ b/pkg/cache/v2/listener.go @@ -26,6 +26,7 @@ import ( core_v2 "kmesh.net/kmesh/api/v2/core" listener_v2 "kmesh.net/kmesh/api/v2/listener" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/logger" ) @@ -41,8 +42,20 @@ type ListenerCache struct { } func NewListenerCache() ListenerCache { + apiListenerCache := NewApiListenerCache() + if restart.GetStartType() == restart.Restart { + listeners, err := maps_v2.ListenerLookupAll() + if err != nil { + log.Errorf("ListenerLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, listener := range listeners { + apiListenerCache[listener.Name] = &listener_v2.Listener{Address: listener.Address} + } + } return ListenerCache{ - apiListenerCache: NewApiListenerCache(), + apiListenerCache: apiListenerCache, resourceHash: make(map[string]uint64), } } @@ -136,7 +149,3 @@ func (cache *ListenerCache) Dump() []*listener_v2.Listener { } return listeners } - -func (cache *ListenerCache) GetListenerHashPtr() *map[string]uint64 { - return &cache.resourceHash -} diff --git a/pkg/cache/v2/route.go b/pkg/cache/v2/route.go index 328365217..3fe5b14cd 100644 --- a/pkg/cache/v2/route.go +++ b/pkg/cache/v2/route.go @@ -23,6 +23,7 @@ import ( core_v2 "kmesh.net/kmesh/api/v2/core" route_v2 "kmesh.net/kmesh/api/v2/route" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" ) @@ -33,8 +34,20 @@ type RouteConfigCache struct { } func NewRouteConfigCache() RouteConfigCache { + apiRouteConfigCache := newApiRouteConfigurationCache() + if restart.GetStartType() == restart.Restart { + routes, err := maps_v2.RouteConfigLookupAll() + if err != nil { + log.Errorf("RouteConfigLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, route := range routes { + apiRouteConfigCache[route.Name] = &route_v2.RouteConfiguration{} + } + } return RouteConfigCache{ - apiRouteConfigCache: newApiRouteConfigurationCache(), + apiRouteConfigCache: apiRouteConfigCache, resourceHash: make(map[string]uint64), } } @@ -117,7 +130,3 @@ func (cache *RouteConfigCache) Dump() []*route_v2.RouteConfiguration { } return mapCache } - -func (cache *RouteConfigCache) GetRouteHashPtr() *map[string]uint64 { - return &cache.resourceHash -} diff --git a/pkg/controller/ads/ads_hash.go b/pkg/controller/ads/ads_hash.go deleted file mode 100644 index 5e61b3397..000000000 --- a/pkg/controller/ads/ads_hash.go +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright The Kmesh Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ads - -import ( - "os" - - "sigs.k8s.io/yaml" - - cache_v2 "kmesh.net/kmesh/pkg/cache/v2" - maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" -) - -const ( - persistPath = "/mnt/kernel_native_hash_name.yaml" -) - -// HashName converts a string to a uint32 integer as the key of bpf map -type HashName struct { - NameToCds map[string][2]uint64 - NameToLds map[string]uint64 - NameToRds map[string]uint64 -} - -// HashName creates a new HashName instance -func NewHashName() *HashName { - return &HashName{ - NameToCds: make(map[string][2]uint64), - NameToLds: make(map[string]uint64), - NameToRds: make(map[string]uint64), - } -} - -func ReadFromPersistFile(h *HashName) error { - data, err := os.ReadFile(persistPath) - if err != nil { - return nil - } - - return yaml.Unmarshal(data, h) -} - -func WritePersistFile(h *HashName) error { - data, err := yaml.Marshal(h) - if err != nil { - return err - } - - return os.WriteFile(persistPath, data, 0644) -} - -// Should only be used by test -func ResetPersistFile() { - os.Remove(persistPath) -} - -func HandleRemovedCdsAndEdsDuringRestart(cache *cache_v2.ClusterCache) error { - hashName := NewHashName() - if ReadFromPersistFile(hashName) != nil { - return nil - } - - for key := range hashName.NameToCds { - if cache.GetEdsHash(key) == 0 { - err := maps_v2.ClusterDelete(key) - if err != nil { - return err - } - } - } - return nil -} - -func HandleRemovedLdsDuringRestart(cache *cache_v2.ListenerCache) error { - hashName := NewHashName() - if ReadFromPersistFile(hashName) != nil { - return nil - } - for key := range hashName.NameToLds { - if cache.GetLdsHash(key) == 0 { - listener := cache.GetApiListener(key) - err := maps_v2.ListenerDelete(listener.GetAddress()) - if err != nil { - return err - } - } - } - return nil -} - -func HandleRemovedRdsDuringRestart(cache *cache_v2.RouteConfigCache) error { - hashName := NewHashName() - if ReadFromPersistFile(hashName) != nil { - return nil - } - for key := range hashName.NameToRds { - if cache.GetRdsHash(key) == 0 { - err := maps_v2.RouteConfigDelete(key) - if err != nil { - return err - } - } - } - return nil -} diff --git a/pkg/controller/ads/ads_processor.go b/pkg/controller/ads/ads_processor.go index 5d2343079..5892ab586 100644 --- a/pkg/controller/ads/ads_processor.go +++ b/pkg/controller/ads/ads_processor.go @@ -18,7 +18,6 @@ package ads import ( "fmt" - "sync" config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -56,7 +55,6 @@ type processor struct { lastNonce *lastNonce // the channel used to send domains to dns resolver. key is domain name and value is refreshrate DnsResolverChan chan []*config_cluster_v3.Cluster - once [3]sync.Once } func newProcessor(bpfAds *bpfads.BpfAds) *processor { @@ -236,11 +234,6 @@ func (p *processor) handleEdsResponse(resp *service_discovery_v3.DiscoveryRespon } p.Cache.ClusterCache.Flush() - p.once[0].Do(func() { - if err = HandleRemovedCdsAndEdsDuringRestart(&p.Cache.ClusterCache); err != nil { - fmt.Println("Error:", err) - } - }) return err } @@ -282,11 +275,6 @@ func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryRespon } p.Cache.ListenerCache.Flush() - p.once[1].Do(func() { - if err := HandleRemovedLdsDuringRestart(&p.Cache.ListenerCache); err != nil { - fmt.Println("Error:", err) - } - }) if !slices.EqualUnordered(p.Cache.routeNames, lastRouteNames) { // we cannot set the nonce here. @@ -327,11 +315,6 @@ func (p *processor) handleRdsResponse(resp *service_discovery_v3.DiscoveryRespon p.Cache.RouteCache.UpdateApiRouteStatus(key, core_v2.ApiStatus_DELETE) } p.Cache.RouteCache.Flush() - p.once[2].Do(func() { - if err = HandleRemovedRdsDuringRestart(&p.Cache.RouteCache); err != nil { - fmt.Println("Error:", err) - } - }) return err } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 53fb5bb2c..53183aba5 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -27,7 +27,6 @@ import ( istioGrpc "istio.io/istio/pilot/pkg/grpc" bpfads "kmesh.net/kmesh/pkg/bpf/ads" - "kmesh.net/kmesh/pkg/bpf/restart" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/ads" @@ -174,16 +173,5 @@ func (c *XdsClient) closeStreamClient() { } func (c *XdsClient) Close() error { - if restart.GetExitType() == restart.Restart { - hashName := &ads.HashName{ - NameToCds: *c.AdsController.Processor.Cache.ClusterCache.GetClusterHashPtr(), - NameToLds: *c.AdsController.Processor.Cache.ListenerCache.GetListenerHashPtr(), - NameToRds: *c.AdsController.Processor.Cache.RouteCache.GetRouteHashPtr(), - } - ads.ResetPersistFile() - if err := ads.WritePersistFile(hashName); err != nil { - return err - } - } return nil } From 43f8129cba890719440a4334789b5102be3570da Mon Sep 17 00:00:00 2001 From: lec-bit Date: Thu, 28 Nov 2024 12:34:27 +0800 Subject: [PATCH 22/22] typo Signed-off-by: lec-bit --- pkg/bpf/ads/sock_connection.go | 7 ++++--- pkg/bpf/ads/sock_ops.go | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index a84be268a..dffe94d53 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -162,7 +162,8 @@ func (sc *BpfSockConn) Attach() error { // pin bpf_tail_call map // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently - mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) + // When we need to update tail_call map, delete the old map and then pin the new one. + tailCallmapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { @@ -172,7 +173,7 @@ func (sc *BpfSockConn) Attach() error { // Unpin tailcallmap. Considering that kmesh coredump may not have // this path after an unexpected restart, here we unpin the file by // directly removing it without doing error handling. - os.Remove(mapPinPath) + os.Remove(tailCallmapPinPath) } else { sc.Link, err = link.AttachCgroup(cgopt) @@ -184,7 +185,7 @@ func (sc *BpfSockConn) Attach() error { return err } } - if err = sc.KmeshCgroupSockMaps.KmeshTailCallProg.Pin(mapPinPath); err != nil { + if err = sc.KmeshCgroupSockMaps.KmeshTailCallProg.Pin(tailCallmapPinPath); err != nil { return err } return nil diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index 13ac90658..838ed4f28 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -188,7 +188,8 @@ func (sc *BpfSockOps) Attach() error { // pin bpf_link and bpf_tail_call map // pin bpf_link, after restart, update prog in bpf_link // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently - mapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) + // When we need to update tail_call map, delete the old map and then pin the new one. + tailCallmapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) if restart.GetStartType() == restart.Restart { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { @@ -197,7 +198,7 @@ func (sc *BpfSockOps) Attach() error { // Unpin tailcallmap. Considering that kmesh coredump may not have // this path after an unexpected restart, here we unpin the file by // directly removing it without doing error handling. - os.Remove(mapPinPath) + os.Remove(tailCallmapPinPath) } else { sc.Link, err = link.AttachCgroup(cgopt) if err != nil { @@ -207,7 +208,7 @@ func (sc *BpfSockOps) Attach() error { return err } } - if err = sc.KmeshSockopsMaps.KmeshTailCallProg.Pin(mapPinPath); err != nil { + if err = sc.KmeshSockopsMaps.KmeshTailCallProg.Pin(tailCallmapPinPath); err != nil { return err } return nil