diff --git a/api/cluster/cluster.proto b/api/cluster/cluster.proto index bf5048c45..ed9e83309 100644 --- a/api/cluster/cluster.proto +++ b/api/cluster/cluster.proto @@ -16,6 +16,7 @@ message Cluster { core.ApiStatus api_status = 128; string name = 1; + uint32 id = 2; uint32 connect_timeout = 4; LbPolicy lb_policy = 6; diff --git a/api/v2-c/cluster/cluster.pb-c.c b/api/v2-c/cluster/cluster.pb-c.c index 934ff42fb..6309db44e 100644 --- a/api/v2-c/cluster/cluster.pb-c.c +++ b/api/v2-c/cluster/cluster.pb-c.c @@ -82,7 +82,7 @@ const ProtobufCEnumDescriptor cluster__cluster__lb_policy__descriptor = cluster__cluster__lb_policy__value_ranges, NULL,NULL,NULL,NULL /* reserved[1234] */ }; -static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = +static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[7] = { { "name", @@ -96,6 +96,18 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "id", + 2, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_UINT32, + 0, /* quantifier_offset */ + offsetof(Cluster__Cluster, id), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, { "connect_timeout", 4, @@ -158,22 +170,23 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = }, }; static const unsigned cluster__cluster__field_indices_by_name[] = { - 5, /* field[5] = api_status */ - 3, /* field[3] = circuit_breakers */ - 1, /* field[1] = connect_timeout */ - 2, /* field[2] = lb_policy */ - 4, /* field[4] = load_assignment */ + 6, /* field[6] = api_status */ + 4, /* field[4] = circuit_breakers */ + 2, /* field[2] = connect_timeout */ + 1, /* field[1] = id */ + 3, /* field[3] = lb_policy */ + 5, /* field[5] = load_assignment */ 0, /* field[0] = name */ }; static const ProtobufCIntRange cluster__cluster__number_ranges[6 + 1] = { { 1, 0 }, - { 4, 1 }, - { 6, 2 }, - { 10, 3 }, - { 33, 4 }, - { 128, 5 }, - { 0, 6 } + { 4, 2 }, + { 6, 3 }, + { 10, 4 }, + { 33, 5 }, + { 128, 6 }, + { 0, 7 } }; const ProtobufCMessageDescriptor cluster__cluster__descriptor = { @@ -183,7 +196,7 @@ const ProtobufCMessageDescriptor cluster__cluster__descriptor = "Cluster__Cluster", "cluster", sizeof(Cluster__Cluster), - 6, + 7, cluster__cluster__field_descriptors, cluster__cluster__field_indices_by_name, 6, cluster__cluster__number_ranges, diff --git a/api/v2-c/cluster/cluster.pb-c.h b/api/v2-c/cluster/cluster.pb-c.h index 3c44b57db..a07d5dc6f 100644 --- a/api/v2-c/cluster/cluster.pb-c.h +++ b/api/v2-c/cluster/cluster.pb-c.h @@ -37,6 +37,7 @@ struct Cluster__Cluster ProtobufCMessage base; Core__ApiStatus api_status; char *name; + uint32_t id; uint32_t connect_timeout; Cluster__Cluster__LbPolicy lb_policy; Endpoint__ClusterLoadAssignment *load_assignment; @@ -44,7 +45,7 @@ struct Cluster__Cluster }; #define CLUSTER__CLUSTER__INIT \ { PROTOBUF_C_MESSAGE_INIT (&cluster__cluster__descriptor) \ - , CORE__API_STATUS__NONE, (char *)protobuf_c_empty_string, 0, CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN, NULL, NULL } + , CORE__API_STATUS__NONE, (char *)protobuf_c_empty_string, 0, 0, CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN, NULL, NULL } /* Cluster__Cluster methods */ diff --git a/api/v2/cluster/cluster.pb.go b/api/v2/cluster/cluster.pb.go index dd9e35b8b..6469fb3e0 100644 --- a/api/v2/cluster/cluster.pb.go +++ b/api/v2/cluster/cluster.pb.go @@ -78,6 +78,7 @@ type Cluster struct { ApiStatus core.ApiStatus `protobuf:"varint,128,opt,name=api_status,json=apiStatus,proto3,enum=core.ApiStatus" json:"api_status,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Id uint32 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` ConnectTimeout uint32 `protobuf:"varint,4,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` LbPolicy Cluster_LbPolicy `protobuf:"varint,6,opt,name=lb_policy,json=lbPolicy,proto3,enum=cluster.Cluster_LbPolicy" json:"lb_policy,omitempty"` LoadAssignment *endpoint.ClusterLoadAssignment `protobuf:"bytes,33,opt,name=load_assignment,json=loadAssignment,proto3" json:"load_assignment,omitempty"` @@ -130,6 +131,13 @@ func (x *Cluster) GetName() string { return "" } +func (x *Cluster) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + func (x *Cluster) GetConnectTimeout() uint32 { if x != nil { return x.ConnectTimeout @@ -168,12 +176,13 @@ var file_api_cluster_cluster_proto_rawDesc = []byte{ 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x13, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x62, - 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfa, 0x02, 0x0a, 0x07, 0x43, 0x6c, + 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x03, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x2f, 0x0a, 0x0a, 0x61, 0x70, 0x69, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x80, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x69, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x09, 0x61, 0x70, 0x69, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x36, 0x0a, 0x09, 0x6c, 0x62, 0x5f, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h new file mode 100644 index 000000000..50e36e8c7 --- /dev/null +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -0,0 +1,178 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ +/* Copyright Authors of Kmesh */ + +#include "bpf_log.h" +#include "kmesh_common.h" +#include "bpf_common.h" + +#ifndef __KMESH_CIRCUIT_BREAKER_H__ +#define __KMESH_CIRCUIT_BREAKER_H__ + +#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN + +#pragma pack(1) +struct cluster_stats { + __u32 active_connections; +}; + +struct cluster_stats_key { + __u64 netns_cookie; + __u32 cluster_id; +}; +#pragma pack() + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __uint(key_size, sizeof(struct cluster_stats_key)); + __uint(value_size, sizeof(struct cluster_stats)); + __uint(map_flags, BPF_F_NO_PREALLOC); + __uint(max_entries, MAP_SIZE_OF_CLUSTER); +} map_of_cluster_stats SEC(".maps"); + +struct cluster_sock_data { + __u32 cluster_id; +}; + +struct { + __uint(type, BPF_MAP_TYPE_SK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct cluster_sock_data); +} map_of_cluster_sock SEC(".maps"); + +static inline void update_cluster_active_connections(const struct cluster_stats_key *key, int delta) +{ + struct cluster_stats *stats = NULL; + if (!key) { + return; + } + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); + if (!stats) { + struct cluster_stats init_value = {0}; + bpf_map_update_elem(&map_of_cluster_stats, key, &init_value, BPF_NOEXIST); + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); + } + + if (!stats) { + BPF_LOG(ERR, CIRCUIT_BREAKER, "failed to get cluster stats"); + return; + } + if (delta < 0 && -delta > stats->active_connections) { + BPF_LOG(ERR, CIRCUIT_BREAKER, "invalid delta update"); + return; + } + + __sync_fetch_and_add(&stats->active_connections, delta); + + BPF_LOG( + DEBUG, + CIRCUIT_BREAKER, + "update existing stats(netns_cookie = %lld, cluster_id = %ld), " + "current active connections: %d", + key->netns_cookie, + key->cluster_id, + stats->active_connections); +} + +static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster) +{ + __u32 cluster_id = cluster->id; + struct cluster_stats_key key = {0}; + __u64 cookie = bpf_get_netns_cookie(ctx); + key.cluster_id = cluster_id; + key.netns_cookie = cookie; + struct cluster_stats *stats = NULL; + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, &key); + + if (stats != NULL) { + Cluster__CircuitBreakers *cbs = NULL; + cbs = kmesh_get_ptr_val(cluster->circuit_breakers); + if (cbs != NULL && stats->active_connections >= cbs->max_connections) { + BPF_LOG( + DEBUG, + CIRCUIT_BREAKER, + "Current active connections %d exceeded max connections " + "%d, reject connection", + stats->active_connections, + cbs->max_connections); + return -1; + } + } + + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock bind for cluster id = %ld", cluster_id); + + struct cluster_sock_data *data = NULL; + if (!ctx->sk) { + BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL"); + return 0; + } + data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!data) { + BPF_LOG(ERR, CIRCUIT_BREAKER, "on_cluster_sock_bind call bpf_sk_storage_get failed"); + return 0; + } + data->cluster_id = cluster_id; + return 0; +} + +static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) +{ + struct cluster_sock_data *data = NULL; + if (!sk) { + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "provided sock is NULL"); + return NULL; + } + + data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, 0); + return data; +} + +static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) +{ + if (!ctx) { + return; + } + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); + if (!data) { + return; + } + __u64 cookie = bpf_get_netns_cookie(ctx); + struct cluster_stats_key key = {0}; + key.netns_cookie = cookie; + key.cluster_id = data->cluster_id; + BPF_LOG( + DEBUG, + CIRCUIT_BREAKER, + "increase cluster active connections(netns_cookie = %lld, cluster " + "id = %ld)", + key.netns_cookie, + key.cluster_id); + update_cluster_active_connections(&key, 1); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock connection for cluster id = %ld", data->cluster_id); +} + +static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) +{ + if (!ctx) { + return; + } + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); + if (!data) { + return; + } + __u64 cookie = bpf_get_netns_cookie(ctx); + struct cluster_stats_key key = {0}; + key.netns_cookie = cookie; + key.cluster_id = data->cluster_id; + update_cluster_active_connections(&key, -1); + BPF_LOG( + DEBUG, + CIRCUIT_BREAKER, + "decrease cluster active connections(netns_cookie = %lld, cluster " + "id = %ld)", + key.netns_cookie, + key.cluster_id); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock close for cluster id = %ld", data->cluster_id); +} + +#endif \ No newline at end of file diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index e5fd4a358..1c3af82c1 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -9,6 +9,7 @@ #include "tail_call.h" #include "cluster/cluster.pb-c.h" #include "endpoint/endpoint.pb-c.h" +#include "circuit_breaker.h" #define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN @@ -263,7 +264,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_ name = kmesh_get_ptr_val(cluster->name); if (!name) { - BPF_LOG(ERR, CLUSTER, "filed to get cluster\n"); + BPF_LOG(ERR, CLUSTER, "failed to get cluster\n"); return -EAGAIN; } @@ -316,6 +317,12 @@ int cluster_manager(ctx_buff_t *ctx) if (cluster == NULL) return KMESH_TAIL_CALL_RET(ENOENT); + ret = on_cluster_sock_bind(ctx, cluster); + if (ret) { + // open circuit breaker, should reject here. + MARK_REJECTED(ctx); + return KMESH_TAIL_CALL_RET(ret); + } ret = cluster_handle_loadbalance(cluster, &addr, ctx); return KMESH_TAIL_CALL_RET(ret); } diff --git a/bpf/kmesh/ads/include/config.h b/bpf/kmesh/ads/include/config.h index e1f11366e..6eab55bce 100644 --- a/bpf/kmesh/ads/include/config.h +++ b/bpf/kmesh/ads/include/config.h @@ -46,6 +46,7 @@ #define map_of_virtual_host kmesh_virtual_host #define map_of_route kmesh_route #define map_of_cluster kmesh_cluster +#define map_of_cluster_stats kmesh_cluster_stats #define map_of_loadbalance kmesh_loadbalance #define map_of_endpoint kmesh_endpoint #define map_of_tail_call_prog kmesh_tail_call_prog diff --git a/bpf/kmesh/ads/include/ctx/sock_addr.h b/bpf/kmesh/ads/include/ctx/sock_addr.h index 0bfb2a1a8..a988450ac 100644 --- a/bpf/kmesh/ads/include/ctx/sock_addr.h +++ b/bpf/kmesh/ads/include/ctx/sock_addr.h @@ -21,4 +21,6 @@ typedef struct bpf_sock_addr ctx_buff_t; (ctx)->user_ip4 = (address)->ipv4; \ (ctx)->user_port = (address)->port +#define MARK_REJECTED(ctx) + #endif //__BPF_CTX_SOCK_ADDR_H diff --git a/bpf/kmesh/ads/include/ctx/sock_ops.h b/bpf/kmesh/ads/include/ctx/sock_ops.h index 2760635f6..e86cc9531 100644 --- a/bpf/kmesh/ads/include/ctx/sock_ops.h +++ b/bpf/kmesh/ads/include/ctx/sock_ops.h @@ -23,6 +23,13 @@ typedef struct bpf_sock_ops ctx_buff_t; #define SET_CTX_ADDRESS(ctx, address) \ (ctx)->remote_ip4 = (address)->ipv4; \ (ctx)->remote_port = (address)->port + +#define MARK_REJECTED(ctx) \ + BPF_LOG(DEBUG, KMESH, "mark reject"); \ + (ctx)->remote_ip4 = 0; \ + (ctx)->remote_port = 0 +#else +#define MARK_REJECTED(ctx) #endif #endif //__BPF_CTX_SOCK_OPS_H diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index 2df13531a..2a9c62010 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -11,13 +11,14 @@ #include "core/address.pb-c.h" #include "tail_call_index.h" -#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON -#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON -#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON -#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON -#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON -#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON -#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON +#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON +#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON +#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON +#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON +#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON +#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON +#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON +#define BPF_LOGTYPE_CIRCUIT_BREAKER BPF_DEBUG_ON #define BPF_OK 1 diff --git a/bpf/kmesh/ads/sockops.c b/bpf/kmesh/ads/sockops.c index 432f604ba..6cfac1773 100644 --- a/bpf/kmesh/ads/sockops.c +++ b/bpf/kmesh/ads/sockops.c @@ -9,6 +9,7 @@ #include "filter.h" #include "route_config.h" #include "cluster.h" +#include "circuit_breaker.h" #if KMESH_ENABLE_IPV4 #if KMESH_ENABLE_HTTP @@ -46,7 +47,6 @@ SEC("sockops") int sockops_prog(struct bpf_sock_ops *skops) { #define BPF_CONSTRUCT_PTR(low_32, high_32) (unsigned long long)(((unsigned long long)(high_32) << 32) + (low_32)) - struct bpf_mem_ptr *msg = NULL; if (skops->family != AF_INET) @@ -56,6 +56,19 @@ int sockops_prog(struct bpf_sock_ops *skops) case BPF_SOCK_OPS_TCP_DEFER_CONNECT_CB: msg = (struct bpf_mem_ptr *)BPF_CONSTRUCT_PTR(skops->args[0], skops->args[1]); (void)sockops_traffic_control(skops, msg); + break; + case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: + if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { + BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); + } else { + on_cluster_sock_connect(skops); + } + break; + case BPF_SOCK_OPS_STATE_CB: + if (skops->args[1] == BPF_TCP_CLOSE) { + on_cluster_sock_close(skops); + } + break; } return BPF_OK; } diff --git a/daemon/manager/manager.go b/daemon/manager/manager.go index f634c19ce..87b9f6368 100644 --- a/daemon/manager/manager.go +++ b/daemon/manager/manager.go @@ -91,7 +91,7 @@ func Execute(configs *options.BootstrapConfigs) error { stopCh := make(chan struct{}) defer close(stopCh) - c := controller.NewController(configs, bpfLoader.GetBpfWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog, configs.BpfConfig.EnableAccesslog) + c := controller.NewController(configs, bpfLoader.GetBpfKmesh(), bpfLoader.GetBpfWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog, configs.BpfConfig.EnableAccesslog) if err := c.Start(stopCh); err != nil { return err } diff --git a/kernel/ko_src/kmesh/defer_connect.c b/kernel/ko_src/kmesh/defer_connect.c index 24eb200d0..ef7d1d7aa 100644 --- a/kernel/ko_src/kmesh/defer_connect.c +++ b/kernel/ko_src/kmesh/defer_connect.c @@ -72,6 +72,16 @@ static int defer_connect(struct sock *sk, struct msghdr *msg, size_t size) kbuf_size); daddr = sk->sk_daddr; dport = sk->sk_dport; + + // daddr == 0 && dport == 0 are special flags meaning the circuit breaker is open + // Should reject connection here + if (daddr == 0 && dport == 0) { + tcp_set_state(sk, TCP_CLOSE); + sk->sk_route_caps = 0; + inet_sk(sk)->inet_dport = 0; + err = -1; + goto out; + } #else memset(&sock_ops, 0, offsetof(struct bpf_sock_ops_kern, temp)); if (sk_fullsock(sk)) { diff --git a/pkg/bpf/ads/loader.go b/pkg/bpf/ads/loader.go index 6058ec5c6..a38e56f5d 100644 --- a/pkg/bpf/ads/loader.go +++ b/pkg/bpf/ads/loader.go @@ -141,6 +141,10 @@ func (sc *BpfAds) Detach() error { return nil } +func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { + return sc.SockConn.KmeshCgroupSockMaps.KmeshClusterStats +} + func AdsL7Enabled() bool { return false } diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index 13b9808ef..27b37219a 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -185,6 +185,10 @@ func (sc *BpfAds) Detach() error { return nil } +func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { + return sc.SockOps.KmeshSockopsMaps.KmeshClusterStats +} + func AdsL7Enabled() bool { return false } diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 2d7c300e9..487857010 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -104,6 +104,13 @@ func (l *BpfLoader) Start() error { return nil } +func (l *BpfLoader) GetBpfKmesh() *ads.BpfAds { + if l == nil { + return nil + } + return l.obj +} + func (l *BpfLoader) GetBpfWorkload() *workload.BpfWorkload { if l == nil { return nil diff --git a/pkg/bpf/workload/sock_connection.go b/pkg/bpf/workload/sock_connection.go index aa37748f2..b38c6f912 100644 --- a/pkg/bpf/workload/sock_connection.go +++ b/pkg/bpf/workload/sock_connection.go @@ -22,13 +22,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/restart" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" diff --git a/pkg/bpf/workload/sock_ops.go b/pkg/bpf/workload/sock_ops.go index db58e8573..6199594e9 100644 --- a/pkg/bpf/workload/sock_ops.go +++ b/pkg/bpf/workload/sock_ops.go @@ -23,13 +23,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/restart" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" helper "kmesh.net/kmesh/pkg/utils" ) diff --git a/pkg/bpf/workload/xdp.go b/pkg/bpf/workload/xdp.go index 02cc77707..3b182dedc 100644 --- a/pkg/bpf/workload/xdp.go +++ b/pkg/bpf/workload/xdp.go @@ -22,13 +22,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/utils" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/utils" "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 7023d0728..b1a03b2a6 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -22,24 +22,44 @@ package cache_v2 import ( "sync" + "github.com/cilium/ebpf" "k8s.io/apimachinery/pkg/util/sets" cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" + "kmesh.net/kmesh/pkg/utils" ) +type ClusterStatsKey struct { + NetnsCookie uint64 + ClusterId uint32 +} + +type ClusterStatsValue struct { + ActiveConnections uint32 +} + type ClusterCache struct { mutex sync.RWMutex apiClusterCache apiClusterCache // resourceHash[0]:cds resourceHash[1]:eds - resourceHash map[string][2]uint64 + resourceHash map[string][2]uint64 + hashName *utils.HashName + clusterStatsMap *ebpf.Map } -func NewClusterCache() ClusterCache { +func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCache { + var clusterStatsMap *ebpf.Map + if bpfAds != nil { + clusterStatsMap = bpfAds.GetClusterStatsMap() + } return ClusterCache{ apiClusterCache: newApiClusterCache(), resourceHash: make(map[string][2]uint64), + hashName: hashName, + clusterStatsMap: clusterStatsMap, } } @@ -122,12 +142,41 @@ func (cache *ClusterCache) SetEdsHash(key string, value uint64) { cache.resourceHash[key] = [2]uint64{cache.resourceHash[key][0], value} } +func (cache *ClusterCache) clearClusterStats(clusterName string) { + if cache.clusterStatsMap == nil { + return + } + clusterId := cache.hashName.StrToNum(clusterName) + var key ClusterStatsKey + var value ClusterStatsValue + var keysToDelete []ClusterStatsKey + it := cache.clusterStatsMap.Iterate() + + for it.Next(&key, &value) { + if key.ClusterId == clusterId { + keysToDelete = append(keysToDelete, key) + } + } + if len(keysToDelete) > 0 { + log.Debugf("remove cluster stats: %v", keysToDelete) + _, err := cache.clusterStatsMap.BatchDelete(keysToDelete, nil) + if err != nil { + log.Errorf("failed to remove cluster stats: %v", err) + } + } + + if err := it.Err(); err != nil { + log.Errorf("delete iteration error: %s", err) + } +} + // Flush flushes the cluster to bpf map. func (cache *ClusterCache) Flush() { cache.mutex.Lock() defer cache.mutex.Unlock() for name, cluster := range cache.apiClusterCache { if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE { + cluster.Id = cache.hashName.StrToNum(name) err := maps_v2.ClusterUpdate(name, cluster) if err == nil { // reset api status after successfully updated @@ -136,10 +185,12 @@ func (cache *ClusterCache) Flush() { log.Errorf("cluster %s %s flush failed: %v", name, cluster.ApiStatus, err) } } else if cluster.GetApiStatus() == core_v2.ApiStatus_DELETE { + cache.clearClusterStats(name) err := maps_v2.ClusterDelete(name) if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) + cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) } @@ -157,6 +208,7 @@ func (cache *ClusterCache) Delete() { if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) + cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) } diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index e0fe1185b..f1ce6a400 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -33,6 +33,7 @@ import ( maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" "kmesh.net/kmesh/pkg/utils/hash" "kmesh.net/kmesh/pkg/utils/test" ) @@ -57,7 +58,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -100,7 +101,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -154,7 +155,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UNCHANGED, Name: "ut-cluster1", @@ -335,7 +336,7 @@ func BenchmarkClusterFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - cache := NewClusterCache() + cache := NewClusterCache(nil, utils.NewHashName()) cluster.Name = rand.String(6) cluster.ApiStatus = core_v2.ApiStatus_UPDATE cache.SetApiCluster(cluster.Name, &cluster) @@ -344,3 +345,49 @@ func BenchmarkClusterFlush(b *testing.B) { assert.Equal(t, core_v2.ApiStatus_NONE, cluster.GetApiStatus()) } } + +func TestClearClusterStats(t *testing.T) { + config := options.BpfConfig{ + Mode: constants.KernelNativeMode, + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, loader := test.InitBpfMap(t, config) + t.Cleanup(cleanup) + + adsObj := loader.GetBpfKmesh() + assert.NotNil(t, adsObj) + hashName := utils.NewHashName() + clusterCache := NewClusterCache(adsObj, hashName) + testClusters := []string{"test_cluster1", "test_cluster2", "test_cluster3"} + testKeys := []ClusterStatsKey{ + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 1, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 2, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[1])}, + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[2])}, + } + + testValues := []ClusterStatsValue{ + {ActiveConnections: 1}, + {ActiveConnections: 2}, + {ActiveConnections: 3}, + {ActiveConnections: 4}, + {ActiveConnections: 5}, + } + + clusterStatsMap := adsObj.GetClusterStatsMap() + clusterStatsMap.BatchUpdate(testKeys, testValues, nil) + + var key ClusterStatsKey + var value ClusterStatsValue + for _, cluster := range testClusters { + clusterCache.clearClusterStats(cluster) + iter := clusterStatsMap.Iterate() + clusterId := hashName.StrToNum(cluster) + for iter.Next(&key, &value) { + assert.NotEqual(t, clusterId, key.ClusterId) + } + assert.Nil(t, iter.Err()) + } +} diff --git a/pkg/controller/ads/ads_controller.go b/pkg/controller/ads/ads_controller.go index 7b57840d3..cc993f319 100644 --- a/pkg/controller/ads/ads_controller.go +++ b/pkg/controller/ads/ads_controller.go @@ -24,6 +24,7 @@ import ( resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "istio.io/istio/pkg/channels" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" "kmesh.net/kmesh/pkg/logger" ) @@ -42,9 +43,9 @@ type connection struct { stopCh chan struct{} } -func NewController() *Controller { +func NewController(bpfAds *bpfads.BpfAds) *Controller { return &Controller{ - Processor: newProcessor(), + Processor: newProcessor(bpfAds), } } diff --git a/pkg/controller/ads/ads_controller_test.go b/pkg/controller/ads/ads_controller_test.go index b8f22e01d..70c4a36d6 100644 --- a/pkg/controller/ads/ads_controller_test.go +++ b/pkg/controller/ads/ads_controller_test.go @@ -117,7 +117,7 @@ func TestHandleAdsStream(t *testing.T) { } defer fakeClient.Cleanup() - adsStream := NewController() + adsStream := NewController(nil) adsStream.con = &connection{Stream: fakeClient.AdsClient, requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](), stopCh: make(chan struct{})} patches1 := gomonkey.NewPatches() diff --git a/pkg/controller/ads/ads_processor.go b/pkg/controller/ads/ads_processor.go index d49f22de1..7f0a17d1b 100644 --- a/pkg/controller/ads/ads_processor.go +++ b/pkg/controller/ads/ads_processor.go @@ -32,6 +32,7 @@ import ( admin_v2 "kmesh.net/kmesh/api/v2/admin" core_v2 "kmesh.net/kmesh/api/v2/core" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/config" "kmesh.net/kmesh/pkg/utils/hash" @@ -56,9 +57,9 @@ type processor struct { DnsResolverChan chan []*config_cluster_v3.Cluster } -func newProcessor() *processor { +func newProcessor(bpfAds *bpfads.BpfAds) *processor { return &processor{ - Cache: NewAdsCache(), + Cache: NewAdsCache(bpfAds), ack: nil, req: nil, lastNonce: &lastNonce{}, diff --git a/pkg/controller/ads/ads_processor_test.go b/pkg/controller/ads/ads_processor_test.go index d53cdb936..dfdf46c15 100644 --- a/pkg/controller/ads/ads_processor_test.go +++ b/pkg/controller/ads/ads_processor_test.go @@ -36,6 +36,7 @@ import ( "kmesh.net/kmesh/daemon/options" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" "kmesh.net/kmesh/pkg/constants" + "kmesh.net/kmesh/pkg/utils" "kmesh.net/kmesh/pkg/utils/hash" "kmesh.net/kmesh/pkg/utils/test" ) @@ -49,7 +50,7 @@ func TestHandleCdsResponse(t *testing.T) { cleanup, _ := test.InitBpfMap(t, config) t.Cleanup(cleanup) t.Run("new cluster, cluster type is eds", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -78,7 +79,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("new cluster, cluster type is not eds", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -107,7 +108,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("cluster update case", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -158,7 +159,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("multiClusters: add a new eds cluster", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) multiClusters := []*config_cluster_v3.Cluster{ { @@ -241,7 +242,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("multiClusters: remove cluster", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -301,9 +302,9 @@ func TestHandleEdsResponse(t *testing.T) { cleanup, _ := test.InitBpfMap(t, config) t.Cleanup(cleanup) t.Run("cluster's apiStatus is UPDATE", func(t *testing.T) { - p := newProcessor() - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + p := newProcessor(nil) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_UPDATE, @@ -334,9 +335,9 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("cluster's apiStatus is Waiting", func(t *testing.T) { - p := newProcessor() - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + p := newProcessor(nil) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, @@ -362,14 +363,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("not apiStatus_UPDATE", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_ALL, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ @@ -391,14 +392,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("already have cluster, not update", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -422,11 +423,11 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("no apicluster, p.ack not be changed", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{} adsLoader.ClusterCache.SetApiCluster("", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -447,14 +448,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("empty loadAssignment", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -483,12 +484,12 @@ func TestHandleLdsResponse(t *testing.T) { } _, loader := test.InitBpfMap(t, config) t.Run("normal function test", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader filterHttp := &filters_network_http.HttpConnectionManager{ RouteSpecifier: &filters_network_http.HttpConnectionManager_Rds{ @@ -543,12 +544,12 @@ func TestHandleLdsResponse(t *testing.T) { }) t.Run("listenerCache already has resource and it has not been changed", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader listener := &config_listener_v3.Listener{ Name: "ut-listener", @@ -582,12 +583,12 @@ func TestHandleLdsResponse(t *testing.T) { }) t.Run("listenerCache already has resource and it has been changed", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader listener := &config_listener_v3.Listener{ Name: "ut-listener", @@ -661,7 +662,7 @@ func TestHandleRdsResponse(t *testing.T) { } _, loader := test.InitBpfMap(t, config) t.Run("normal function test", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -691,7 +692,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("empty routeConfig", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -714,7 +715,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("already have a Rds, RdsHash has been changed", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -756,7 +757,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("already have a Rds, RdsHash has been change. And have multiRouteconfig in resp", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index c40767a29..f22b4e6bf 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -35,8 +35,10 @@ import ( filter_v2 "kmesh.net/kmesh/api/v2/filter" listener_v2 "kmesh.net/kmesh/api/v2/listener" route_v2 "kmesh.net/kmesh/api/v2/route" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" ) type AdsCache struct { @@ -49,18 +51,20 @@ type AdsCache struct { RouteCache cache_v2.RouteConfigCache } -func NewAdsCache() *AdsCache { +func NewAdsCache(bpfAds *bpfads.BpfAds) *AdsCache { + hashName := utils.NewHashName() return &AdsCache{ ListenerCache: cache_v2.NewListenerCache(), - ClusterCache: cache_v2.NewClusterCache(), + ClusterCache: cache_v2.NewClusterCache(bpfAds, hashName), RouteCache: cache_v2.NewRouteConfigCache(), } } func (load *AdsCache) CreateApiClusterByCds(status core_v2.ApiStatus, cluster *config_cluster_v3.Cluster) { + clusterName := cluster.GetName() apiCluster := &cluster_v2.Cluster{ ApiStatus: status, - Name: cluster.GetName(), + Name: clusterName, ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), @@ -69,14 +73,15 @@ func (load *AdsCache) CreateApiClusterByCds(status core_v2.ApiStatus, cluster *c if cluster.GetType() != config_cluster_v3.Cluster_EDS { apiCluster.LoadAssignment = newApiClusterLoadAssignment(cluster.GetLoadAssignment()) } - load.ClusterCache.SetApiCluster(cluster.GetName(), apiCluster) + load.ClusterCache.SetApiCluster(clusterName, apiCluster) } // UpdateApiClusterIfExists only update api cluster if it exists func (load *AdsCache) UpdateApiClusterIfExists(status core_v2.ApiStatus, cluster *config_cluster_v3.Cluster) bool { + clusterName := cluster.GetName() apiCluster := &cluster_v2.Cluster{ ApiStatus: status, - Name: cluster.GetName(), + Name: clusterName, ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), @@ -84,7 +89,7 @@ func (load *AdsCache) UpdateApiClusterIfExists(status core_v2.ApiStatus, cluster if cluster.GetType() != config_cluster_v3.Cluster_EDS { apiCluster.LoadAssignment = newApiClusterLoadAssignment(cluster.GetLoadAssignment()) } - return load.ClusterCache.UpdateApiClusterIfExists(cluster.GetName(), apiCluster) + return load.ClusterCache.UpdateApiClusterIfExists(clusterName, apiCluster) } func (load *AdsCache) UpdateApiClusterStatus(key string, status core_v2.ApiStatus) { diff --git a/pkg/controller/ads/cache_test.go b/pkg/controller/ads/cache_test.go index 2c4df8b23..195341bb3 100644 --- a/pkg/controller/ads/cache_test.go +++ b/pkg/controller/ads/cache_test.go @@ -46,7 +46,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }{ { name: "test1: ApiStatus is update, cluster type is EDS", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -75,7 +75,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test2: ApiStatus is update, cluster type is not EDS", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -104,7 +104,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test3: Apistatus is update, cluster type is EDS and cluster not has name", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ ConnectTimeout: &durationpb.Duration{ @@ -132,7 +132,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test4: Apistatus is update, cluster type is not EDS and cluster not has name", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ ConnectTimeout: &durationpb.Duration{ @@ -421,7 +421,7 @@ func TestNewApiSocketAddress(t *testing.T) { func TestCreateApiListenerByLds(t *testing.T) { t.Run("listener filter configtype is filter_typedconfig", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -474,7 +474,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("listener filter configtype is filter_ConfigDiscover", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -526,7 +526,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("status is UNCHANGED", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -574,7 +574,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("status is UNCHANGED, filterName is pkg_wellknown.HTTPConnectionManager", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 1bee12eac..9032c4c19 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -24,6 +24,7 @@ import ( discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/grpc" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/ads" @@ -47,7 +48,7 @@ type XdsClient struct { xdsConfig *config.XdsConfig } -func NewXdsClient(mode string, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog bool) *XdsClient { +func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog bool) *XdsClient { client := &XdsClient{ mode: mode, xdsConfig: config.GetConfig(mode), @@ -56,7 +57,7 @@ func NewXdsClient(mode string, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog b if mode == constants.DualEngineMode { client.WorkloadController = workload.NewController(bpfWorkload, enableAccesslog) } else if mode == constants.KernelNativeMode { - client.AdsController = ads.NewController() + client.AdsController = ads.NewController(bpfAds) } client.ctx, client.cancel = context.WithCancel(context.Background()) diff --git a/pkg/controller/client_test.go b/pkg/controller/client_test.go index c75a79691..4a82e5d86 100644 --- a/pkg/controller/client_test.go +++ b/pkg/controller/client_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/workload" @@ -37,7 +38,7 @@ import ( func TestRecoverConnection(t *testing.T) { t.Run("test reconnect success", func(t *testing.T) { - utClient := NewXdsClient(constants.KernelNativeMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) patches := gomonkey.NewPatches() defer patches.Reset() iteration := 0 @@ -78,7 +79,7 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.KernelNativeMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) err := utClient.createGrpcStreamClient() assert.NoError(t, err) @@ -125,7 +126,7 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.DualEngineMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) err := utClient.createGrpcStreamClient() assert.NoError(t, err) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 78fe193a2..74fd2f7d4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -21,6 +21,7 @@ import ( "fmt" "kmesh.net/kmesh/daemon/options" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/bypass" @@ -38,6 +39,7 @@ var ( type Controller struct { mode string + bpfAdsObj *bpfads.BpfAds bpfWorkloadObj *bpfwl.BpfWorkload client *XdsClient enableByPass bool @@ -47,10 +49,11 @@ type Controller struct { enableAccesslog bool } -func NewController(opts *options.BootstrapConfigs, bpfWorkloadObj *bpfwl.BpfWorkload, bpfFsPath string, enableBpfLog, enableAccesslog bool) *Controller { +func NewController(opts *options.BootstrapConfigs, bpfAdsObj *bpfads.BpfAds, bpfWorkloadObj *bpfwl.BpfWorkload, bpfFsPath string, enableBpfLog, enableAccesslog bool) *Controller { return &Controller{ mode: opts.BpfConfig.Mode, enableByPass: opts.ByPassConfig.EnableByPass, + bpfAdsObj: bpfAdsObj, bpfWorkloadObj: bpfWorkloadObj, enableSecretManager: opts.SecretManagerConfig.Enable, bpfFsPath: bpfFsPath, @@ -103,7 +106,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { return fmt.Errorf("fail to start ringbuf reader: %v", err) } } - c.client = NewXdsClient(c.mode, c.bpfWorkloadObj, c.enableAccesslog) + c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.enableAccesslog) if c.client.WorkloadController != nil { c.client.WorkloadController.Run(ctx) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 1745d1585..f3f606641 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -41,6 +41,7 @@ import ( bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" ) const ( @@ -52,7 +53,7 @@ type Processor struct { ack *service_discovery_v3.DeltaDiscoveryRequest req *service_discovery_v3.DeltaDiscoveryRequest - hashName *HashName + hashName *utils.HashName bpf *bpf.Cache nodeName string WorkloadCache cache.WorkloadCache @@ -64,7 +65,7 @@ type Processor struct { func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { return &Processor{ - hashName: NewHashName(), + hashName: utils.NewHashName(), bpf: bpf.NewCache(workloadMap), nodeName: os.Getenv("NODE_NAME"), WorkloadCache: cache.NewWorkloadCache(), @@ -630,7 +631,7 @@ func (p *Processor) handleRemovedAddressesDuringRestart() { // but not in userspace cache, that means the data in the bpf map load // from the last epoch is inconsistent with the data that should // actually be stored now. then we should delete it from bpf map - for str, num := range p.hashName.strToNum { + for str, num := range p.hashName.GetStrToNum() { if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { log.Debugf("GetWorkloadByUid and GetService nil:%v", str) @@ -702,7 +703,7 @@ func (p *Processor) handleRemovedAuthzPolicyDuringRestart(rbac *auth.Rbac) { * actually be stored now. then we should delete it from bpf map */ policyCache := rbac.GetAllPolicies() - for str, num := range p.hashName.strToNum { + for str, num := range p.hashName.GetStrToNum() { if _, exists := policyCache[str]; !exists { if err := maps_v2.AuthorizationLookup(num, &policyValue); err == nil { log.Debugf("Find policy: [%v:%v] Remove authz policy", str, num) diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 02fa61265..55c89ebea 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -626,7 +626,7 @@ func TestRestart(t *testing.T) { // The hashname will be saved as a file by default. // If it is not cleaned, it will affect other use cases. func hashNameClean(p *Processor) { - for str := range p.hashName.strToNum { + for str := range p.hashName.GetStrToNum() { if err := p.removeWorkloadFromBpfMap(str); err != nil { log.Errorf("RemoveWorkloadResource failed: %v", err) } diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 78c91bad5..409e10d05 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -52,7 +52,7 @@ type fakeDNSServer struct { func TestDNS(t *testing.T) { fakeDNSServer := newFakeDNSServer() - testDNSResolver, err := NewDNSResolver(ads.NewAdsCache()) + testDNSResolver, err := NewDNSResolver(ads.NewAdsCache(nil)) if err != nil { t.Fatal(err) } @@ -170,7 +170,7 @@ func TestDNS(t *testing.T) { // This test aims to evaluate the concurrent writing behavior of the adsCache by utilizing the test race feature. // The test verifies the ability of the adsCache to handle concurrent access and updates correctly in a multi-goroutine environment. func TestADSCacheConcurrentWriting(t *testing.T) { - adsCache := ads.NewAdsCache() + adsCache := ads.NewAdsCache(nil) cluster := &clusterv3.Cluster{ Name: "ut-cluster", ClusterDiscoveryType: &clusterv3.Cluster_Type{ @@ -504,10 +504,10 @@ func TestHandleCdsResponseWithDns(t *testing.T) { }, } - p := ads.NewController().Processor + p := ads.NewController(nil).Processor stopCh := make(chan struct{}) defer close(stopCh) - dnsResolver, err := NewDNSResolver(ads.NewAdsCache()) + dnsResolver, err := NewDNSResolver(ads.NewAdsCache(nil)) assert.NoError(t, err) dnsResolver.StartDNSResolver(stopCh) p.DnsResolverChan = dnsResolver.DnsResolverChan diff --git a/pkg/controller/workload/workload_hash.go b/pkg/utils/hash_name.go similarity index 94% rename from pkg/controller/workload/workload_hash.go rename to pkg/utils/hash_name.go index a81fe6271..b542670b5 100644 --- a/pkg/controller/workload/workload_hash.go +++ b/pkg/utils/hash_name.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package workload +package utils import ( "fmt" @@ -27,7 +27,7 @@ import ( ) const ( - persistPath = "/mnt/workload_hash_name.yaml" + persistPath = "/mnt/hash_name.yaml" ) // HashName converts a string to a uint32 integer as the key of bpf map @@ -124,6 +124,14 @@ func (h *HashName) NumToStr(num uint32) string { return h.numToStr[num] } +func (h *HashName) StrToNum(str string) uint32 { + return h.strToNum[str] +} + +func (h *HashName) GetStrToNum() map[string]uint32 { + return h.strToNum +} + func (h *HashName) Delete(str string) { // only when the num exists, we do the logic if num, exists := h.strToNum[str]; exists { diff --git a/pkg/controller/workload/workload_hash_test.go b/pkg/utils/hash_name_test.go similarity index 99% rename from pkg/controller/workload/workload_hash_test.go rename to pkg/utils/hash_name_test.go index 82837c9cd..1acd0c3c2 100644 --- a/pkg/controller/workload/workload_hash_test.go +++ b/pkg/utils/hash_name_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package workload +package utils import ( "hash/fnv" diff --git a/samples/circuitbreaker/circuitbreaker.yml b/samples/circuitbreaker/circuitbreaker.yml new file mode 100644 index 000000000..78f0c3adb --- /dev/null +++ b/samples/circuitbreaker/circuitbreaker.yml @@ -0,0 +1,16 @@ +apiVersion: networking.istio.io/v1beta1 +kind: DestinationRule +metadata: + name: httpbin-destination-rule +spec: + host: httpbin + trafficPolicy: + portLevelSettings: + - port: + number: 8000 + connectionPool: + http: + http1MaxPendingRequests: 1 + maxRequestsPerConnection: 1 + tcp: + maxConnections: 1 \ No newline at end of file