diff --git a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c index 98d8ff267..37730a36c 100644 --- a/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c +++ b/bpf/deserialization_to_bpf_map/deserialization_to_bpf_map.c @@ -869,6 +869,7 @@ static void *create_struct(struct op_context *ctx, int *err) } ctx->value = value; + ((ProtobufCMessage *)value)->descriptor = desc; for (i = 0; i < desc->n_fields; i++) { const ProtobufCFieldDescriptor *field = desc->fields + i; diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index ed8f5d59b..3944c5378 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -43,14 +43,16 @@ type BpfAds struct { func NewBpfAds(cfg *options.BpfConfig) (*BpfAds, error) { sc := &BpfAds{} - sc.TracePoint.NewBpf(cfg) + if err := sc.TracePoint.NewBpf(cfg); err != nil { + return nil, err + } if err := sc.SockOps.NewBpf(cfg); err != nil { - return sc, err + return nil, err } if err := sc.SockConn.NewBpf(cfg); err != nil { - return sc, err + return nil, err } return sc, nil } diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index af4fb3985..dffe94d53 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -22,12 +22,16 @@ package ads import "C" import ( "os" + "path/filepath" "reflect" "syscall" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" + "kmesh.net/kmesh/pkg/bpf/restart" + "kmesh.net/kmesh/pkg/constants" + bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/kernelnative/normal" "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/utils" @@ -98,11 +102,6 @@ func (sc *BpfSockConn) loadKmeshSockConnObjects() (*ebpf.CollectionSpec, error) return nil, err } - value := reflect.ValueOf(sc.KmeshCgroupSockObjects.KmeshCgroupSockPrograms) - if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { - return nil, err - } - return spec, nil } @@ -154,18 +153,41 @@ func (sc *BpfSockConn) close() error { } func (sc *BpfSockConn) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: sc.Info.Cgroup2Path, Attach: sc.Info.AttachType, Program: sc.KmeshCgroupSockObjects.CgroupConnect4Prog, } - lk, err := link.AttachCgroup(cgopt) - if err != nil { + // pin bpf_tail_call map + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently + // When we need to update tail_call map, delete the old map and then pin the new one. + tailCallmapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) + progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) + if restart.GetStartType() == restart.Restart { + if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { + return err + } + + // Unpin tailcallmap. Considering that kmesh coredump may not have + // this path after an unexpected restart, here we unpin the file by + // directly removing it without doing error handling. + os.Remove(tailCallmapPinPath) + + } else { + sc.Link, err = link.AttachCgroup(cgopt) + if err != nil { + return err + } + + if err := sc.Link.Pin(progPinPath); err != nil { + return err + } + } + if err = sc.KmeshCgroupSockMaps.KmeshTailCallProg.Pin(tailCallmapPinPath); err != nil { return err } - sc.Link = lk - return nil } diff --git a/pkg/bpf/ads/sock_ops.go b/pkg/bpf/ads/sock_ops.go index 2d13e5c98..838ed4f28 100644 --- a/pkg/bpf/ads/sock_ops.go +++ b/pkg/bpf/ads/sock_ops.go @@ -21,6 +21,7 @@ package ads import ( "os" + "path/filepath" "reflect" "syscall" @@ -29,7 +30,9 @@ import ( bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/kernelnative/enhanced" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" + "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) @@ -81,11 +84,6 @@ func (sc *BpfSockOps) loadKmeshSockopsObjects() (*ebpf.CollectionSpec, error) { return nil, err } - value := reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) - if err = utils.PinPrograms(&value, sc.Info.BpfFsPath); err != nil { - return nil, err - } - return spec, nil } @@ -180,18 +178,39 @@ func (sc *BpfSockOps) Load() error { } func (sc *BpfSockOps) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: sc.Info.Cgroup2Path, Attach: sc.Info.AttachType, Program: sc.KmeshSockopsObjects.SockopsProg, } - lk, err := link.AttachCgroup(cgopt) - if err != nil { + // pin bpf_link and bpf_tail_call map + // pin bpf_link, after restart, update prog in bpf_link + // tail_call map cannot pin in SetMapPinType->LoadAndAssign, we pin them independently + // When we need to update tail_call map, delete the old map and then pin the new one. + tailCallmapPinPath := filepath.Join(sc.Info.BpfFsPath, constants.TailCallMap) + progPinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) + if restart.GetStartType() == restart.Restart { + if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { + return err + } + // Unpin tailcallmap. Considering that kmesh coredump may not have + // this path after an unexpected restart, here we unpin the file by + // directly removing it without doing error handling. + os.Remove(tailCallmapPinPath) + } else { + sc.Link, err = link.AttachCgroup(cgopt) + if err != nil { + return err + } + if err = sc.Link.Pin(progPinPath); err != nil { + return err + } + } + if err = sc.KmeshSockopsMaps.KmeshTailCallProg.Pin(tailCallmapPinPath); err != nil { return err } - sc.Link = lk - return nil } diff --git a/pkg/bpf/ads/trace_point.go b/pkg/bpf/ads/trace_point.go index 0e2993272..5f0258935 100644 --- a/pkg/bpf/ads/trace_point.go +++ b/pkg/bpf/ads/trace_point.go @@ -20,11 +20,17 @@ package ads import ( + "os" + "path/filepath" + "syscall" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/kernelnative/enhanced" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" + "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) @@ -34,10 +40,24 @@ type BpfTracePoint struct { bpf2go.KmeshTracePointObjects } -func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) { - sc.Info.MapPath = cfg.BpfFsPath - sc.Info.BpfFsPath = cfg.BpfFsPath +func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) error { + sc.Info.MapPath = cfg.BpfFsPath + "/bpf_kmesh/map/" + sc.Info.BpfFsPath = cfg.BpfFsPath + "/bpf_kmesh/tracepoint/" sc.Info.Cgroup2Path = cfg.Cgroup2Path + + if err := os.MkdirAll(sc.Info.MapPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + if err := os.MkdirAll(sc.Info.BpfFsPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + return nil } func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, error) { @@ -76,17 +96,29 @@ func (sc *BpfTracePoint) Load() error { } func (sc *BpfTracePoint) Attach() error { + var err error tpopt := link.RawTracepointOptions{ Name: "connect_ret", Program: sc.KmeshTracePointObjects.ConnectRet, } - lk, err := link.AttachRawTracepoint(tpopt) - if err != nil { - return err - } - sc.Link = lk + pinPath := filepath.Join(sc.Info.BpfFsPath, constants.Prog_link) + if restart.GetStartType() == restart.Restart { + sc.Link, err = link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + if err != nil { + return err + } + } else { + sc.Link, err = link.AttachRawTracepoint(tpopt) + if err != nil { + return err + } + + if err := sc.Link.Pin(pinPath); err != nil { + return err + } + } return nil } diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index c3fb7405e..9e727b97b 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -156,14 +156,12 @@ func StopMda() error { func (l *BpfLoader) Stop() { var err error - if restart.GetExitType() == restart.Restart && l.config.DualEngineEnabled() { - C.deserial_uninit() - log.Infof("kmesh restart, not clean bpf map and prog") + C.deserial_uninit() + if restart.GetExitType() == restart.Restart { return } closeMap(l.versionMap) - if l.config.KernelNativeEnabled() { if err = l.obj.Stop(); err != nil { CleanupBpfMap() diff --git a/pkg/bpf/bpf_test.go b/pkg/bpf/bpf_test.go index 8dc834dcc..ef55d9dea 100644 --- a/pkg/bpf/bpf_test.go +++ b/pkg/bpf/bpf_test.go @@ -17,6 +17,7 @@ package bpf import ( + "fmt" "os" "path/filepath" "syscall" @@ -32,7 +33,7 @@ import ( ) func TestGetKmeshConfigMap(t *testing.T) { - config := setDir(t) + config := setDirDualEngine(t) bpfLoader := NewBpfLoader(&config) err := bpfLoader.Start() assert.NoError(t, err) @@ -43,7 +44,7 @@ func TestGetKmeshConfigMap(t *testing.T) { } func TestUpdateKmeshConfigMap(t *testing.T) { - config := setDir(t) + config := setDirDualEngine(t) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) @@ -64,43 +65,45 @@ func TestUpdateKmeshConfigMap(t *testing.T) { } func TestRestart(t *testing.T) { - t.Run("new start", func(t *testing.T) { - runTestNormal(t) + t.Run("new start DualEngine", func(t *testing.T) { + runTestNormalDualEngine(t) }) - t.Run("restart", func(t *testing.T) { - runTestRestart(t) + t.Run("new start KernelNative", func(t *testing.T) { + runTestNormalKernelNative(t) + }) + t.Run("restart DualEngine", func(t *testing.T) { + runTestRestartDualEngine(t) + }) + t.Run("restart KernelNative", func(t *testing.T) { + runTestRestartKernelNative(t) }) } -func setDir(t *testing.T) options.BpfConfig { - if err := os.MkdirAll("/mnt/kmesh_cgroup2", 0755); err != nil { - t.Fatalf("Failed to create dir /mnt/kmesh_cgroup2: %v", err) - } - if err := syscall.Mount("none", "/mnt/kmesh_cgroup2/", "cgroup2", 0, ""); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to mount /mnt/kmesh_cgroup2/: %v", err) - } - if err := syscall.Mount("/sys/fs/bpf", "/sys/fs/bpf", "bpf", 0, ""); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to mount /sys/fs/bpf: %v", err) +func setDir() (err error) { + defer func() { + if err != nil { + CleanupBpfMap() + } + }() + + if err = os.MkdirAll("/mnt/kmesh_cgroup2", 0755); err != nil { + return fmt.Errorf("Failed to create dir /mnt/kmesh_cgroup2: %v", err) } - if err := rlimit.RemoveMemlock(); err != nil { - CleanupBpfMap() - t.Fatalf("Failed to remove mem limit: %v", err) + if err = syscall.Mount("none", "/mnt/kmesh_cgroup2/", "cgroup2", 0, ""); err != nil { + return fmt.Errorf("Failed to mount /mnt/kmesh_cgroup2/: %v", err) + } + if err = syscall.Mount("/sys/fs/bpf", "/sys/fs/bpf", "bpf", 0, ""); err != nil { + return fmt.Errorf("Failed to mount /sys/fs/bpf: %v", err) } - return options.BpfConfig{ - Mode: constants.DualEngineMode, - BpfFsPath: "/sys/fs/bpf", - Cgroup2Path: "/mnt/kmesh_cgroup2", + if err = rlimit.RemoveMemlock(); err != nil { + return fmt.Errorf("Failed to remove mem limit: %v", err) } + return nil } -// Test Kmesh Normal -func runTestNormal(t *testing.T) { - config := setDir(t) - +func NormalStart(t *testing.T, config options.BpfConfig) { bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) @@ -110,10 +113,46 @@ func runTestNormal(t *testing.T) { bpfLoader.Stop() } -// Test Kmesh Restart Normal -func runTestRestart(t *testing.T) { +func setDirDualEngine(t *testing.T) options.BpfConfig { + if err := setDir(); err != nil { + t.Fatalf("setDir Failed: %v", err) + } + + return options.BpfConfig{ + Mode: constants.DualEngineMode, + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } +} + +func setDirKernelNative(t *testing.T) options.BpfConfig { + if err := setDir(); err != nil { + t.Fatalf("setDir Failed: %v", err) + } + return options.BpfConfig{ + Mode: constants.KernelNativeMode, + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } +} + +// Test Kmesh Normal DualEngine +func runTestNormalDualEngine(t *testing.T) { + config := setDirDualEngine(t) + + NormalStart(t, config) +} + +// Test Kmesh Normal KernelNative +func runTestNormalKernelNative(t *testing.T) { + config := setDirKernelNative(t) + + NormalStart(t, config) +} + +func KmeshRestart(t *testing.T, config options.BpfConfig) { var versionPath string - config := setDir(t) + restart.SetStartType(restart.Normal) bpfLoader := NewBpfLoader(&config) if err := bpfLoader.Start(); err != nil { assert.ErrorIsf(t, err, nil, "bpfLoader start failed %v", err) @@ -140,6 +179,18 @@ func runTestRestart(t *testing.T) { bpfLoader.Stop() } +// Test Kmesh Restart DualEngine +func runTestRestartDualEngine(t *testing.T) { + config := setDirDualEngine(t) + KmeshRestart(t, config) +} + +// Test Kmesh Restart KernelNative +func runTestRestartKernelNative(t *testing.T) { + config := setDirKernelNative(t) + KmeshRestart(t, config) +} + func TestGetNodePodSubGateway(t *testing.T) { type args struct { node *corev1.Node diff --git a/pkg/bpf/utils/bpf_helper.go b/pkg/bpf/utils/bpf_helper.go index bb44a651b..467f6af8c 100644 --- a/pkg/bpf/utils/bpf_helper.go +++ b/pkg/bpf/utils/bpf_helper.go @@ -20,7 +20,10 @@ import ( "os" "strconv" + "fmt" + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" ) func SetEnvByBpfMapId(m *ebpf.Map, key string) error { @@ -29,3 +32,14 @@ func SetEnvByBpfMapId(m *ebpf.Map, key string) error { stringId := strconv.Itoa(int(id)) return os.Setenv(key, stringId) } + +func BpfProgUpdate(pinPath string, cgopt link.CgroupOptions) (link.Link, error) { + sclink, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) + if err != nil { + return nil, err + } + if err := sclink.Update(cgopt.Program); err != nil { + return nil, fmt.Errorf("updating link %s failed: %w", pinPath, err) + } + return sclink, nil +} diff --git a/pkg/bpf/workload/sendmsg.go b/pkg/bpf/workload/sendmsg.go index 9dfc451c8..84a6db9c7 100644 --- a/pkg/bpf/workload/sendmsg.go +++ b/pkg/bpf/workload/sendmsg.go @@ -17,7 +17,6 @@ package workload import ( - "fmt" "os" "path/filepath" "reflect" @@ -33,17 +32,6 @@ import ( helper "kmesh.net/kmesh/pkg/utils" ) -func bpfProgUpdate(pinPath string, cgopt link.CgroupOptions) error { - sclink, err := link.LoadPinnedLink(pinPath, &ebpf.LoadPinOptions{}) - if err != nil { - return err - } - if err := sclink.Update(cgopt.Program); err != nil { - return fmt.Errorf("updating link %s failed: %w", pinPath, err) - } - return nil -} - type BpfSendMsgWorkload struct { Info BpfInfo AttachFD int diff --git a/pkg/bpf/workload/sock_connection.go b/pkg/bpf/workload/sock_connection.go index c302462fb..b98ab969e 100644 --- a/pkg/bpf/workload/sock_connection.go +++ b/pkg/bpf/workload/sock_connection.go @@ -142,11 +142,11 @@ func (sc *SockConnWorkload) Attach() error { pinPath6 := filepath.Join(sc.Info.BpfFsPath, "sockconn6_prog") if restart.GetStartType() == restart.Restart { - if err = bpfProgUpdate(pinPath4, cgopt4); err != nil { + if sc.Link, err = utils.BpfProgUpdate(pinPath4, cgopt4); err != nil { return err } - if err = bpfProgUpdate(pinPath6, cgopt6); err != nil { + if sc.Link6, err = utils.BpfProgUpdate(pinPath6, cgopt6); err != nil { return err } } else { diff --git a/pkg/bpf/workload/sock_ops.go b/pkg/bpf/workload/sock_ops.go index dbd48b471..09770351e 100644 --- a/pkg/bpf/workload/sock_ops.go +++ b/pkg/bpf/workload/sock_ops.go @@ -103,6 +103,7 @@ func (so *BpfSockOpsWorkload) LoadSockOps() error { } func (so *BpfSockOpsWorkload) Attach() error { + var err error cgopt := link.CgroupOptions{ Path: so.Info.Cgroup2Path, Attach: so.Info.AttachType, @@ -111,7 +112,7 @@ func (so *BpfSockOpsWorkload) Attach() error { pinPath := filepath.Join(so.Info.BpfFsPath, "cgroup_sockops_prog") if restart.GetStartType() == restart.Restart { - if err := bpfProgUpdate(pinPath, cgopt); err != nil { + if so.Link, err = utils.BpfProgUpdate(pinPath, cgopt); err != nil { return err } } else { diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index fccf30cfe..910aa38dd 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -28,6 +28,7 @@ import ( cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" bpfads "kmesh.net/kmesh/pkg/bpf/ads" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/consistenthash/maglev" "kmesh.net/kmesh/pkg/utils" @@ -56,8 +57,20 @@ func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCac if bpfAds != nil { clusterStatsMap = bpfAds.GetClusterStatsMap() } + apiClusterCache := newApiClusterCache() + if restart.GetStartType() == restart.Restart { + Clusters, err := maps_v2.ClusterLookupAll() + if err != nil { + log.Errorf("ClusterLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, Cluster := range Clusters { + apiClusterCache[Cluster.Name] = &cluster_v2.Cluster{} + } + } return ClusterCache{ - apiClusterCache: newApiClusterCache(), + apiClusterCache: apiClusterCache, resourceHash: make(map[string][2]uint64), hashName: hashName, clusterStatsMap: clusterStatsMap, diff --git a/pkg/cache/v2/listener.go b/pkg/cache/v2/listener.go index 7228e7f74..17e3c91fe 100644 --- a/pkg/cache/v2/listener.go +++ b/pkg/cache/v2/listener.go @@ -26,6 +26,7 @@ import ( core_v2 "kmesh.net/kmesh/api/v2/core" listener_v2 "kmesh.net/kmesh/api/v2/listener" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/logger" ) @@ -41,8 +42,20 @@ type ListenerCache struct { } func NewListenerCache() ListenerCache { + apiListenerCache := NewApiListenerCache() + if restart.GetStartType() == restart.Restart { + listeners, err := maps_v2.ListenerLookupAll() + if err != nil { + log.Errorf("ListenerLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, listener := range listeners { + apiListenerCache[listener.Name] = &listener_v2.Listener{Address: listener.Address} + } + } return ListenerCache{ - apiListenerCache: NewApiListenerCache(), + apiListenerCache: apiListenerCache, resourceHash: make(map[string]uint64), } } diff --git a/pkg/cache/v2/route.go b/pkg/cache/v2/route.go index ce2e69e4b..3fe5b14cd 100644 --- a/pkg/cache/v2/route.go +++ b/pkg/cache/v2/route.go @@ -23,6 +23,7 @@ import ( core_v2 "kmesh.net/kmesh/api/v2/core" route_v2 "kmesh.net/kmesh/api/v2/route" + "kmesh.net/kmesh/pkg/bpf/restart" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" ) @@ -33,8 +34,20 @@ type RouteConfigCache struct { } func NewRouteConfigCache() RouteConfigCache { + apiRouteConfigCache := newApiRouteConfigurationCache() + if restart.GetStartType() == restart.Restart { + routes, err := maps_v2.RouteConfigLookupAll() + if err != nil { + log.Errorf("RouteConfigLookupAll failed: %v, restart with last xDS config failed, "+ + "may some old xDS config will not be cleanup or update", err) + } + + for _, route := range routes { + apiRouteConfigCache[route.Name] = &route_v2.RouteConfiguration{} + } + } return RouteConfigCache{ - apiRouteConfigCache: newApiRouteConfigurationCache(), + apiRouteConfigCache: apiRouteConfigCache, resourceHash: make(map[string]uint64), } } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 44aae9ef9..eb6b4f8a1 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -65,4 +65,7 @@ const ( VersionPath = "/bpf_kmesh/map/" WorkloadVersionPath = "/bpf_kmesh_workload/map/" + + TailCallMap = "tail_call_map" + Prog_link = "prog_link" ) diff --git a/pkg/controller/ads/ads_processor.go b/pkg/controller/ads/ads_processor.go index e38cdfab8..5892ab586 100644 --- a/pkg/controller/ads/ads_processor.go +++ b/pkg/controller/ads/ads_processor.go @@ -197,6 +197,7 @@ func (p *processor) handleCdsResponse(resp *service_discovery_v3.DiscoveryRespon } func (p *processor) handleEdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { + var err error var loadAssignment = &config_endpoint_v3.ClusterLoadAssignment{} p.lastNonce.edsNonce = resp.Nonce for _, resource := range resp.GetResources() { @@ -234,7 +235,7 @@ func (p *processor) handleEdsResponse(resp *service_discovery_v3.DiscoveryRespon p.Cache.ClusterCache.Flush() - return nil + return err } func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { @@ -281,10 +282,11 @@ func (p *processor) handleLdsResponse(resp *service_discovery_v3.DiscoveryRespon // Then it will lead to this request been ignored, we will lose the new rds resource p.req = newAdsRequest(resource_v3.RouteType, p.Cache.routeNames, "") } - return nil + return err } func (p *processor) handleRdsResponse(resp *service_discovery_v3.DiscoveryResponse) error { + var err error routeConfiguration := &config_route_v3.RouteConfiguration{} p.lastNonce.rdsNonce = resp.Nonce @@ -313,7 +315,7 @@ func (p *processor) handleRdsResponse(resp *service_discovery_v3.DiscoveryRespon p.Cache.RouteCache.UpdateApiRouteStatus(key, core_v2.ApiStatus_DELETE) } p.Cache.RouteCache.Flush() - return nil + return err } func (p *processor) Reset() {