From c6d6de3ab4aa6552326e70d40a79679909f954a0 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Thu, 11 Jul 2024 13:43:39 +0800 Subject: [PATCH 01/19] monitor cluster connection Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 53 +++++++++++++++++++++++++ bpf/kmesh/ads/include/cluster.h | 5 ++- bpf/kmesh/ads/include/kmesh_common.h | 46 +++++++++++++++++++++ bpf/kmesh/ads/sockops.c | 19 ++++++++- 4 files changed, 120 insertions(+), 3 deletions(-) create mode 100644 bpf/kmesh/ads/include/circuit_breaker.h diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h new file mode 100644 index 000000000..0c922f792 --- /dev/null +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -0,0 +1,53 @@ +#include "bpf_log.h" +#include "kmesh_common.h" +#include "bpf_common.h" + +#ifndef __KMESH_CIRCUIT_BREAKER_H__ +#define __KMESH_CIRCUIT_BREAKER_H__ + +static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char* cluster_name) { + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s\n", cluster_name); + struct cluster_sock_data *data = NULL; + if (!sk) { + BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); + return; + } + + data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!data) { + BPF_LOG(ERR, KMESH, "record_cluster_sock call bpf_sk_storage_get failed\n"); + return; + } + + bpf_strncpy(data->cluster_name, BPF_DATA_MAX_LEN, (char *)cluster_name); + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s done\n", cluster_name); +} + +static inline struct cluster_sock_data* get_cluster_sk_data(struct bpf_sock *sk) { + struct cluster_sock_data *data = NULL; + if (!sk) { + BPF_LOG(DEBUG, KMESH, "provided sock is NULL\n"); + return NULL; + } + + data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, 0); + return data; +} + +static inline void on_cluster_sock_connect(struct bpf_sock *sk) { + struct cluster_sock_data *data = get_cluster_sk_data(sk); + if (!data) { + return; + } + BPF_LOG(DEBUG, KMESH, "record sock connection for cluster %s\n", data->cluster_name); +} + +static inline void on_cluster_sock_close(struct bpf_sock *sk) { + struct cluster_sock_data *data = get_cluster_sk_data(sk); + if (!data) { + return; + } + BPF_LOG(DEBUG, KMESH, "record sock close for cluster %s", data->cluster_name); +} + +#endif \ No newline at end of file diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index e5fd4a358..58bdf64f0 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -9,6 +9,7 @@ #include "tail_call.h" #include "cluster/cluster.pb-c.h" #include "endpoint/endpoint.pb-c.h" +#include "circuit_breaker.h" #define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN @@ -263,7 +264,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_ name = kmesh_get_ptr_val(cluster->name); if (!name) { - BPF_LOG(ERR, CLUSTER, "filed to get cluster\n"); + BPF_LOG(ERR, CLUSTER, "failed to get cluster\n"); return -EAGAIN; } @@ -315,7 +316,7 @@ int cluster_manager(ctx_buff_t *ctx) kmesh_tail_delete_ctx(&ctx_key); if (cluster == NULL) return KMESH_TAIL_CALL_RET(ENOENT); - + on_cluster_sock_bind(ctx->sk, (const char *)ctx_val->data); ret = cluster_handle_loadbalance(cluster, &addr, ctx); return KMESH_TAIL_CALL_RET(ret); } diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index 2df13531a..cedf90188 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -70,6 +70,52 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src) } #endif +struct cluster_sock_data { + char cluster_name[BPF_DATA_MAX_LEN]; +}; + +struct resource { + // current value + __u64 curr; + __u64 max; +}; + +struct cluster_resources { + struct resource connections; +}; + +struct { + __uint(type, BPF_MAP_TYPE_SK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct cluster_sock_data); +} map_of_cluster_sock SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_ARRAY_OF_MAPS); + __uint(key_size, sizeof(__u32)); + __uint(value_size, sizeof(__u32)); + __uint(max_entries, MAP_SIZE_OF_OUTTER_MAP); + __uint(map_flags, 0); +} outer_map SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_ARRAY); + __uint(key_size, sizeof(__u32)); + __uint(value_size, BPF_INNER_MAP_DATA_LEN); + __uint(max_entries, 1); + __uint(map_flags, 0); +} inner_map SEC(".maps"); + +typedef enum { + KMESH_TAIL_CALL_LISTENER = 1, + KMESH_TAIL_CALL_FILTER_CHAIN, + KMESH_TAIL_CALL_FILTER, + KMESH_TAIL_CALL_ROUTER, + KMESH_TAIL_CALL_CLUSTER, + KMESH_TAIL_CALL_ROUTER_CONFIG, +} tail_call_index_t; + typedef Core__SocketAddress address_t; // bpf return value diff --git a/bpf/kmesh/ads/sockops.c b/bpf/kmesh/ads/sockops.c index 432f604ba..c1482c721 100644 --- a/bpf/kmesh/ads/sockops.c +++ b/bpf/kmesh/ads/sockops.c @@ -9,6 +9,7 @@ #include "filter.h" #include "route_config.h" #include "cluster.h" +#include "circuit_breaker.h" #if KMESH_ENABLE_IPV4 #if KMESH_ENABLE_HTTP @@ -46,7 +47,6 @@ SEC("sockops") int sockops_prog(struct bpf_sock_ops *skops) { #define BPF_CONSTRUCT_PTR(low_32, high_32) (unsigned long long)(((unsigned long long)(high_32) << 32) + (low_32)) - struct bpf_mem_ptr *msg = NULL; if (skops->family != AF_INET) @@ -56,6 +56,23 @@ int sockops_prog(struct bpf_sock_ops *skops) case BPF_SOCK_OPS_TCP_DEFER_CONNECT_CB: msg = (struct bpf_mem_ptr *)BPF_CONSTRUCT_PTR(skops->args[0], skops->args[1]); (void)sockops_traffic_control(skops, msg); + break; + case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: + if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { + BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); + } + on_cluster_sock_connect(skops->sk); + break; + case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: + if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { + BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); + } + on_cluster_sock_connect(skops->sk); + case BPF_SOCK_OPS_STATE_CB: + if (skops->args[1] == BPF_TCP_CLOSE) { + on_cluster_sock_close(skops->sk); + } + break; } return BPF_OK; } From 4d496c745d897015515ab47e88fedef9b0076992 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Fri, 12 Jul 2024 22:09:31 +0800 Subject: [PATCH 02/19] add basic apis Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 103 +++++++++++++++++++++--- bpf/kmesh/ads/include/cluster.h | 15 ++++ bpf/kmesh/ads/include/kmesh_common.h | 21 ----- bpf/kmesh/ads/sockops.c | 9 ++- 4 files changed, 115 insertions(+), 33 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 0c922f792..5904e0466 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -5,7 +5,67 @@ #ifndef __KMESH_CIRCUIT_BREAKER_H__ #define __KMESH_CIRCUIT_BREAKER_H__ -static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char* cluster_name) { +#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN + +struct cluster_stats { + __u32 active_connections; +}; + +struct cluster_stats_key { + __u64 netns_cookie; + __u64 cluster_id; +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __uint(key_size, sizeof(struct cluster_stats_key)); + __uint(value_size, sizeof(struct cluster_stats)); + __uint(map_flags, BPF_F_NO_PREALLOC); + __uint(max_entries, MAP_SIZE_OF_CLUSTER); +} map_of_cluster_stats SEC(".maps"); + +struct cluster_sock_data { + __u64 cluster_id; +}; + +struct { + __uint(type, BPF_MAP_TYPE_SK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct cluster_sock_data); +} map_of_cluster_sock SEC(".maps"); + +static inline void update_cluster_active_connections(const struct cluster_stats_key *key, int delta) +{ + if (!key) { + return; + } + struct cluster_stats *stats = NULL; + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); + if (!stats) { + struct cluster_stats new_stats = {0}; + new_stats.active_connections = delta; + BPF_LOG( + DEBUG, + KMESH, + "create new stats(netns_cookie = %lld, cluster_id = %lld)", + key->netns_cookie, + key->cluster_id); + kmesh_map_update_elem(&map_of_cluster_stats, key, &new_stats); + } else { + stats->active_connections += delta; + kmesh_map_update_elem(&map_of_cluster_stats, key, stats); + BPF_LOG( + DEBUG, + KMESH, + "update existing stats(netns_cookie = %lld, cluster_id = %lld)", + key->netns_cookie, + key->cluster_id); + } +} + +static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char *cluster_name) +{ BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s\n", cluster_name); struct cluster_sock_data *data = NULL; if (!sk) { @@ -19,11 +79,14 @@ static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char* cluster return; } - bpf_strncpy(data->cluster_name, BPF_DATA_MAX_LEN, (char *)cluster_name); + // bpf_strncpy(data->cluster_name, CLUSTER_NAME_MAX_LEN, (char *)cluster_name); + // TODO(lzh): how to map cluster to id? + data->cluster_id = 1; BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s done\n", cluster_name); } -static inline struct cluster_sock_data* get_cluster_sk_data(struct bpf_sock *sk) { +static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) +{ struct cluster_sock_data *data = NULL; if (!sk) { BPF_LOG(DEBUG, KMESH, "provided sock is NULL\n"); @@ -34,20 +97,42 @@ static inline struct cluster_sock_data* get_cluster_sk_data(struct bpf_sock *sk) return data; } -static inline void on_cluster_sock_connect(struct bpf_sock *sk) { - struct cluster_sock_data *data = get_cluster_sk_data(sk); +static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) +{ + if (!ctx) { + return; + } + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); if (!data) { return; } - BPF_LOG(DEBUG, KMESH, "record sock connection for cluster %s\n", data->cluster_name); + __u64 cookie = bpf_get_netns_cookie(ctx); + BPF_LOG(INFO, KMESH, "here we got netns cookie: %lld", cookie); + struct cluster_stats_key key = {0}; + key.netns_cookie = cookie; + key.cluster_id = data->cluster_id; + BPF_LOG(DEBUG, KMESH, "increase cluster active connections(netns_cookie = %lld, cluster = %lld)", key.netns_cookie, key.cluster_id); + update_cluster_active_connections(&key, 1); + BPF_LOG(DEBUG, KMESH, "record sock connection for cluster %lld\n", data->cluster_id); } -static inline void on_cluster_sock_close(struct bpf_sock *sk) { - struct cluster_sock_data *data = get_cluster_sk_data(sk); +static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) +{ + if (!ctx) { + return; + } + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); if (!data) { return; } - BPF_LOG(DEBUG, KMESH, "record sock close for cluster %s", data->cluster_name); + __u64 cookie = bpf_get_netns_cookie(ctx); + BPF_LOG(INFO, KMESH, "here we got netns cookie: %lld", cookie); + struct cluster_stats_key key = {0}; + key.netns_cookie = cookie; + key.cluster_id = data->cluster_id; + update_cluster_active_connections(&key, -1); + BPF_LOG(DEBUG, KMESH, "decrease cluster active connections(netns_cookie = %lld, cluster = %lld)", key.netns_cookie, key.cluster_id); + BPF_LOG(DEBUG, KMESH, "record sock close for cluster %lld", data->cluster_id); } #endif \ No newline at end of file diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index 58bdf64f0..4c6103dcb 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -69,6 +69,20 @@ static inline int map_add_cluster_eps(const char *cluster_name, const struct clu return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps); } +static inline Cluster__CircuitBreakers *get_cluster_circuit_breakers(const char *cluster_name) +{ + const Cluster__Cluster *cluster = NULL; + cluster = map_lookup_cluster(cluster_name); + if (!cluster) { + return NULL; + } + Cluster__CircuitBreakers *cbs = NULL; + cbs = kmesh_get_ptr_val(cluster->circuit_breakers); + if (cbs != NULL) + BPF_LOG(DEBUG, KMESH, "get cluster's circuit breaker: max connections = %ld\n", cbs->max_connections); + return cbs; +} + static inline int cluster_add_endpoints(const Endpoint__LocalityLbEndpoints *lb_ep, struct cluster_endpoints *cluster_eps) { @@ -316,6 +330,7 @@ int cluster_manager(ctx_buff_t *ctx) kmesh_tail_delete_ctx(&ctx_key); if (cluster == NULL) return KMESH_TAIL_CALL_RET(ENOENT); + on_cluster_sock_bind(ctx->sk, (const char *)ctx_val->data); ret = cluster_handle_loadbalance(cluster, &addr, ctx); return KMESH_TAIL_CALL_RET(ret); diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index cedf90188..aee16fe6b 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -70,27 +70,6 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src) } #endif -struct cluster_sock_data { - char cluster_name[BPF_DATA_MAX_LEN]; -}; - -struct resource { - // current value - __u64 curr; - __u64 max; -}; - -struct cluster_resources { - struct resource connections; -}; - -struct { - __uint(type, BPF_MAP_TYPE_SK_STORAGE); - __uint(map_flags, BPF_F_NO_PREALLOC); - __type(key, int); - __type(value, struct cluster_sock_data); -} map_of_cluster_sock SEC(".maps"); - struct { __uint(type, BPF_MAP_TYPE_ARRAY_OF_MAPS); __uint(key_size, sizeof(__u32)); diff --git a/bpf/kmesh/ads/sockops.c b/bpf/kmesh/ads/sockops.c index c1482c721..9ca168814 100644 --- a/bpf/kmesh/ads/sockops.c +++ b/bpf/kmesh/ads/sockops.c @@ -60,17 +60,20 @@ int sockops_prog(struct bpf_sock_ops *skops) case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); + } else { + on_cluster_sock_connect(skops); } - on_cluster_sock_connect(skops->sk); break; case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); + } else { + on_cluster_sock_connect(skops); } - on_cluster_sock_connect(skops->sk); + break; case BPF_SOCK_OPS_STATE_CB: if (skops->args[1] == BPF_TCP_CLOSE) { - on_cluster_sock_close(skops->sk); + on_cluster_sock_close(skops); } break; } From 224dfecfc7ff6dbfc29c328eb15c3ba7f2dd5255 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Sat, 13 Jul 2024 14:36:13 +0800 Subject: [PATCH 03/19] add copy right Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 5904e0466..3ab77f751 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -1,3 +1,6 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ +/* Copyright Authors of Kmesh */ + #include "bpf_log.h" #include "kmesh_common.h" #include "bpf_common.h" @@ -111,7 +114,12 @@ static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) struct cluster_stats_key key = {0}; key.netns_cookie = cookie; key.cluster_id = data->cluster_id; - BPF_LOG(DEBUG, KMESH, "increase cluster active connections(netns_cookie = %lld, cluster = %lld)", key.netns_cookie, key.cluster_id); + BPF_LOG( + DEBUG, + KMESH, + "increase cluster active connections(netns_cookie = %lld, cluster = %lld)", + key.netns_cookie, + key.cluster_id); update_cluster_active_connections(&key, 1); BPF_LOG(DEBUG, KMESH, "record sock connection for cluster %lld\n", data->cluster_id); } @@ -131,7 +139,12 @@ static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) key.netns_cookie = cookie; key.cluster_id = data->cluster_id; update_cluster_active_connections(&key, -1); - BPF_LOG(DEBUG, KMESH, "decrease cluster active connections(netns_cookie = %lld, cluster = %lld)", key.netns_cookie, key.cluster_id); + BPF_LOG( + DEBUG, + KMESH, + "decrease cluster active connections(netns_cookie = %lld, cluster = %lld)", + key.netns_cookie, + key.cluster_id); BPF_LOG(DEBUG, KMESH, "record sock close for cluster %lld", data->cluster_id); } From 49aab5f84e8c4e4fd083afe4a969d9589ef7556a Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Tue, 16 Jul 2024 10:58:26 +0800 Subject: [PATCH 04/19] map cluster name to cluster id Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- api/cluster/cluster.proto | 1 + api/v2-c/cluster/cluster.pb-c.c | 39 ++++++++++++------- api/v2-c/cluster/cluster.pb-c.h | 3 +- api/v2/cluster/cluster.pb.go | 13 ++++++- bpf/kmesh/ads/include/circuit_breaker.h | 30 ++++++-------- bpf/kmesh/ads/include/cluster.h | 2 +- pkg/cache/v2/cluster.go | 7 +++- pkg/cache/v2/cluster_test.go | 9 +++-- pkg/controller/ads/ads_processor_test.go | 13 ++++--- pkg/controller/ads/cache.go | 18 ++++++--- pkg/controller/workload/workload_processor.go | 15 ++++--- .../workload_hash.go => utils/hash_name.go} | 4 +- .../hash_name_test.go} | 2 +- 13 files changed, 96 insertions(+), 60 deletions(-) rename pkg/{controller/workload/workload_hash.go => utils/hash_name.go} (98%) rename pkg/{controller/workload/workload_hash_test.go => utils/hash_name_test.go} (99%) diff --git a/api/cluster/cluster.proto b/api/cluster/cluster.proto index bf5048c45..ed9e83309 100644 --- a/api/cluster/cluster.proto +++ b/api/cluster/cluster.proto @@ -16,6 +16,7 @@ message Cluster { core.ApiStatus api_status = 128; string name = 1; + uint32 id = 2; uint32 connect_timeout = 4; LbPolicy lb_policy = 6; diff --git a/api/v2-c/cluster/cluster.pb-c.c b/api/v2-c/cluster/cluster.pb-c.c index 934ff42fb..6309db44e 100644 --- a/api/v2-c/cluster/cluster.pb-c.c +++ b/api/v2-c/cluster/cluster.pb-c.c @@ -82,7 +82,7 @@ const ProtobufCEnumDescriptor cluster__cluster__lb_policy__descriptor = cluster__cluster__lb_policy__value_ranges, NULL,NULL,NULL,NULL /* reserved[1234] */ }; -static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = +static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[7] = { { "name", @@ -96,6 +96,18 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "id", + 2, + PROTOBUF_C_LABEL_NONE, + PROTOBUF_C_TYPE_UINT32, + 0, /* quantifier_offset */ + offsetof(Cluster__Cluster, id), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, { "connect_timeout", 4, @@ -158,22 +170,23 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] = }, }; static const unsigned cluster__cluster__field_indices_by_name[] = { - 5, /* field[5] = api_status */ - 3, /* field[3] = circuit_breakers */ - 1, /* field[1] = connect_timeout */ - 2, /* field[2] = lb_policy */ - 4, /* field[4] = load_assignment */ + 6, /* field[6] = api_status */ + 4, /* field[4] = circuit_breakers */ + 2, /* field[2] = connect_timeout */ + 1, /* field[1] = id */ + 3, /* field[3] = lb_policy */ + 5, /* field[5] = load_assignment */ 0, /* field[0] = name */ }; static const ProtobufCIntRange cluster__cluster__number_ranges[6 + 1] = { { 1, 0 }, - { 4, 1 }, - { 6, 2 }, - { 10, 3 }, - { 33, 4 }, - { 128, 5 }, - { 0, 6 } + { 4, 2 }, + { 6, 3 }, + { 10, 4 }, + { 33, 5 }, + { 128, 6 }, + { 0, 7 } }; const ProtobufCMessageDescriptor cluster__cluster__descriptor = { @@ -183,7 +196,7 @@ const ProtobufCMessageDescriptor cluster__cluster__descriptor = "Cluster__Cluster", "cluster", sizeof(Cluster__Cluster), - 6, + 7, cluster__cluster__field_descriptors, cluster__cluster__field_indices_by_name, 6, cluster__cluster__number_ranges, diff --git a/api/v2-c/cluster/cluster.pb-c.h b/api/v2-c/cluster/cluster.pb-c.h index 3c44b57db..a07d5dc6f 100644 --- a/api/v2-c/cluster/cluster.pb-c.h +++ b/api/v2-c/cluster/cluster.pb-c.h @@ -37,6 +37,7 @@ struct Cluster__Cluster ProtobufCMessage base; Core__ApiStatus api_status; char *name; + uint32_t id; uint32_t connect_timeout; Cluster__Cluster__LbPolicy lb_policy; Endpoint__ClusterLoadAssignment *load_assignment; @@ -44,7 +45,7 @@ struct Cluster__Cluster }; #define CLUSTER__CLUSTER__INIT \ { PROTOBUF_C_MESSAGE_INIT (&cluster__cluster__descriptor) \ - , CORE__API_STATUS__NONE, (char *)protobuf_c_empty_string, 0, CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN, NULL, NULL } + , CORE__API_STATUS__NONE, (char *)protobuf_c_empty_string, 0, 0, CLUSTER__CLUSTER__LB_POLICY__ROUND_ROBIN, NULL, NULL } /* Cluster__Cluster methods */ diff --git a/api/v2/cluster/cluster.pb.go b/api/v2/cluster/cluster.pb.go index dd9e35b8b..6469fb3e0 100644 --- a/api/v2/cluster/cluster.pb.go +++ b/api/v2/cluster/cluster.pb.go @@ -78,6 +78,7 @@ type Cluster struct { ApiStatus core.ApiStatus `protobuf:"varint,128,opt,name=api_status,json=apiStatus,proto3,enum=core.ApiStatus" json:"api_status,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Id uint32 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` ConnectTimeout uint32 `protobuf:"varint,4,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` LbPolicy Cluster_LbPolicy `protobuf:"varint,6,opt,name=lb_policy,json=lbPolicy,proto3,enum=cluster.Cluster_LbPolicy" json:"lb_policy,omitempty"` LoadAssignment *endpoint.ClusterLoadAssignment `protobuf:"bytes,33,opt,name=load_assignment,json=loadAssignment,proto3" json:"load_assignment,omitempty"` @@ -130,6 +131,13 @@ func (x *Cluster) GetName() string { return "" } +func (x *Cluster) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + func (x *Cluster) GetConnectTimeout() uint32 { if x != nil { return x.ConnectTimeout @@ -168,12 +176,13 @@ var file_api_cluster_cluster_proto_rawDesc = []byte{ 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x13, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x62, - 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfa, 0x02, 0x0a, 0x07, 0x43, 0x6c, + 0x61, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x03, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x2f, 0x0a, 0x0a, 0x61, 0x70, 0x69, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x80, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x69, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x09, 0x61, 0x70, 0x69, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x36, 0x0a, 0x09, 0x6c, 0x62, 0x5f, 0x70, 0x6f, 0x6c, 0x69, 0x63, 0x79, diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 3ab77f751..b39de2c11 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -16,7 +16,7 @@ struct cluster_stats { struct cluster_stats_key { __u64 netns_cookie; - __u64 cluster_id; + __u32 cluster_id; }; struct { @@ -28,7 +28,7 @@ struct { } map_of_cluster_stats SEC(".maps"); struct cluster_sock_data { - __u64 cluster_id; + __u32 cluster_id; }; struct { @@ -51,7 +51,7 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ BPF_LOG( DEBUG, KMESH, - "create new stats(netns_cookie = %lld, cluster_id = %lld)", + "create new stats(netns_cookie = %lld, cluster_id = %ld)", key->netns_cookie, key->cluster_id); kmesh_map_update_elem(&map_of_cluster_stats, key, &new_stats); @@ -61,15 +61,15 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ BPF_LOG( DEBUG, KMESH, - "update existing stats(netns_cookie = %lld, cluster_id = %lld)", + "update existing stats(netns_cookie = %lld, cluster_id = %ld)", key->netns_cookie, key->cluster_id); } } -static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char *cluster_name) +static inline void on_cluster_sock_bind(struct bpf_sock *sk, __u32 cluster_id) { - BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s\n", cluster_name); + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster id = %ld\n", cluster_id); struct cluster_sock_data *data = NULL; if (!sk) { BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); @@ -78,14 +78,10 @@ static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char *cluster data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); if (!data) { - BPF_LOG(ERR, KMESH, "record_cluster_sock call bpf_sk_storage_get failed\n"); + BPF_LOG(ERR, KMESH, "on_cluster_sock_bind call bpf_sk_storage_get failed\n"); return; } - - // bpf_strncpy(data->cluster_name, CLUSTER_NAME_MAX_LEN, (char *)cluster_name); - // TODO(lzh): how to map cluster to id? - data->cluster_id = 1; - BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s done\n", cluster_name); + data->cluster_id = cluster_id; } static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) @@ -110,18 +106,17 @@ static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) return; } __u64 cookie = bpf_get_netns_cookie(ctx); - BPF_LOG(INFO, KMESH, "here we got netns cookie: %lld", cookie); struct cluster_stats_key key = {0}; key.netns_cookie = cookie; key.cluster_id = data->cluster_id; BPF_LOG( DEBUG, KMESH, - "increase cluster active connections(netns_cookie = %lld, cluster = %lld)", + "increase cluster active connections(netns_cookie = %lld, cluster id = %ld)", key.netns_cookie, key.cluster_id); update_cluster_active_connections(&key, 1); - BPF_LOG(DEBUG, KMESH, "record sock connection for cluster %lld\n", data->cluster_id); + BPF_LOG(DEBUG, KMESH, "record sock connection for cluster id = %ld\n", data->cluster_id); } static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) @@ -134,7 +129,6 @@ static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) return; } __u64 cookie = bpf_get_netns_cookie(ctx); - BPF_LOG(INFO, KMESH, "here we got netns cookie: %lld", cookie); struct cluster_stats_key key = {0}; key.netns_cookie = cookie; key.cluster_id = data->cluster_id; @@ -142,10 +136,10 @@ static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) BPF_LOG( DEBUG, KMESH, - "decrease cluster active connections(netns_cookie = %lld, cluster = %lld)", + "decrease cluster active connections(netns_cookie = %lld, cluster id = %ld)", key.netns_cookie, key.cluster_id); - BPF_LOG(DEBUG, KMESH, "record sock close for cluster %lld", data->cluster_id); + BPF_LOG(DEBUG, KMESH, "record sock close for cluster id = %ld", data->cluster_id); } #endif \ No newline at end of file diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index 4c6103dcb..f39601518 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -331,7 +331,7 @@ int cluster_manager(ctx_buff_t *ctx) if (cluster == NULL) return KMESH_TAIL_CALL_RET(ENOENT); - on_cluster_sock_bind(ctx->sk, (const char *)ctx_val->data); + on_cluster_sock_bind(ctx->sk, cluster->id); ret = cluster_handle_loadbalance(cluster, &addr, ctx); return KMESH_TAIL_CALL_RET(ret); } diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 7023d0728..62ae2c7e0 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -27,6 +27,7 @@ import ( cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" + "kmesh.net/kmesh/pkg/utils" ) type ClusterCache struct { @@ -34,12 +35,14 @@ type ClusterCache struct { apiClusterCache apiClusterCache // resourceHash[0]:cds resourceHash[1]:eds resourceHash map[string][2]uint64 + hashName *utils.HashName } -func NewClusterCache() ClusterCache { +func NewClusterCache(hashName *utils.HashName) ClusterCache { return ClusterCache{ apiClusterCache: newApiClusterCache(), resourceHash: make(map[string][2]uint64), + hashName: hashName, } } @@ -140,6 +143,7 @@ func (cache *ClusterCache) Flush() { if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) + cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) } @@ -157,6 +161,7 @@ func (cache *ClusterCache) Delete() { if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) + cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) } diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index e0fe1185b..69e7b812e 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -33,6 +33,7 @@ import ( maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" "kmesh.net/kmesh/pkg/utils/hash" "kmesh.net/kmesh/pkg/utils/test" ) @@ -57,7 +58,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -100,7 +101,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -154,7 +155,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache() + cache := NewClusterCache(utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UNCHANGED, Name: "ut-cluster1", @@ -335,7 +336,7 @@ func BenchmarkClusterFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - cache := NewClusterCache() + cache := NewClusterCache(utils.NewHashName()) cluster.Name = rand.String(6) cluster.ApiStatus = core_v2.ApiStatus_UPDATE cache.SetApiCluster(cluster.Name, &cluster) diff --git a/pkg/controller/ads/ads_processor_test.go b/pkg/controller/ads/ads_processor_test.go index d53cdb936..1e703008e 100644 --- a/pkg/controller/ads/ads_processor_test.go +++ b/pkg/controller/ads/ads_processor_test.go @@ -36,6 +36,7 @@ import ( "kmesh.net/kmesh/daemon/options" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" "kmesh.net/kmesh/pkg/constants" + "kmesh.net/kmesh/pkg/utils" "kmesh.net/kmesh/pkg/utils/hash" "kmesh.net/kmesh/pkg/utils/test" ) @@ -303,7 +304,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("cluster's apiStatus is UPDATE", func(t *testing.T) { p := newProcessor() adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_UPDATE, @@ -336,7 +337,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("cluster's apiStatus is Waiting", func(t *testing.T) { p := newProcessor() adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, @@ -363,7 +364,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("not apiStatus_UPDATE", func(t *testing.T) { adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_ALL, @@ -392,7 +393,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("already have cluster, not update", func(t *testing.T) { adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, @@ -423,7 +424,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("no apicluster, p.ack not be changed", func(t *testing.T) { adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{} adsLoader.ClusterCache.SetApiCluster("", cluster) p := newProcessor() @@ -448,7 +449,7 @@ func TestHandleEdsResponse(t *testing.T) { t.Run("empty loadAssignment", func(t *testing.T) { adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache() + adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index c40767a29..c125ed0c1 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -37,6 +37,7 @@ import ( route_v2 "kmesh.net/kmesh/api/v2/route" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" ) type AdsCache struct { @@ -47,20 +48,25 @@ type AdsCache struct { ListenerCache cache_v2.ListenerCache ClusterCache cache_v2.ClusterCache RouteCache cache_v2.RouteConfigCache + hashName *utils.HashName } func NewAdsCache() *AdsCache { + hashName := utils.NewHashName() return &AdsCache{ ListenerCache: cache_v2.NewListenerCache(), - ClusterCache: cache_v2.NewClusterCache(), + ClusterCache: cache_v2.NewClusterCache(hashName), RouteCache: cache_v2.NewRouteConfigCache(), + hashName: hashName, } } func (load *AdsCache) CreateApiClusterByCds(status core_v2.ApiStatus, cluster *config_cluster_v3.Cluster) { + clusterName := cluster.GetName() apiCluster := &cluster_v2.Cluster{ ApiStatus: status, - Name: cluster.GetName(), + Name: clusterName, + Id: load.hashName.StrToNum(clusterName), ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), @@ -69,14 +75,16 @@ func (load *AdsCache) CreateApiClusterByCds(status core_v2.ApiStatus, cluster *c if cluster.GetType() != config_cluster_v3.Cluster_EDS { apiCluster.LoadAssignment = newApiClusterLoadAssignment(cluster.GetLoadAssignment()) } - load.ClusterCache.SetApiCluster(cluster.GetName(), apiCluster) + load.ClusterCache.SetApiCluster(clusterName, apiCluster) } // UpdateApiClusterIfExists only update api cluster if it exists func (load *AdsCache) UpdateApiClusterIfExists(status core_v2.ApiStatus, cluster *config_cluster_v3.Cluster) bool { + clusterName := cluster.GetName() apiCluster := &cluster_v2.Cluster{ ApiStatus: status, - Name: cluster.GetName(), + Name: clusterName, + Id: load.hashName.StrToNum(clusterName), ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), @@ -84,7 +92,7 @@ func (load *AdsCache) UpdateApiClusterIfExists(status core_v2.ApiStatus, cluster if cluster.GetType() != config_cluster_v3.Cluster_EDS { apiCluster.LoadAssignment = newApiClusterLoadAssignment(cluster.GetLoadAssignment()) } - return load.ClusterCache.UpdateApiClusterIfExists(cluster.GetName(), apiCluster) + return load.ClusterCache.UpdateApiClusterIfExists(clusterName, apiCluster) } func (load *AdsCache) UpdateApiClusterStatus(key string, status core_v2.ApiStatus) { diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 1745d1585..be7172458 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -41,6 +41,7 @@ import ( bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" ) const ( @@ -52,11 +53,13 @@ type Processor struct { ack *service_discovery_v3.DeltaDiscoveryRequest req *service_discovery_v3.DeltaDiscoveryRequest - hashName *HashName - bpf *bpf.Cache - nodeName string - WorkloadCache cache.WorkloadCache - ServiceCache cache.ServiceCache + hashName *utils.HashName + // workloads indexer, svc key -> workload id + endpointsByService map[string]map[string]struct{} + bpf *bpf.Cache + nodeName string + WorkloadCache cache.WorkloadCache + ServiceCache cache.ServiceCache once sync.Once authzOnce sync.Once @@ -64,7 +67,7 @@ type Processor struct { func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { return &Processor{ - hashName: NewHashName(), + hashName: utils.NewHashName(), bpf: bpf.NewCache(workloadMap), nodeName: os.Getenv("NODE_NAME"), WorkloadCache: cache.NewWorkloadCache(), diff --git a/pkg/controller/workload/workload_hash.go b/pkg/utils/hash_name.go similarity index 98% rename from pkg/controller/workload/workload_hash.go rename to pkg/utils/hash_name.go index a81fe6271..22b977048 100644 --- a/pkg/controller/workload/workload_hash.go +++ b/pkg/utils/hash_name.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package workload +package utils import ( "fmt" @@ -27,7 +27,7 @@ import ( ) const ( - persistPath = "/mnt/workload_hash_name.yaml" + persistPath = "/mnt/hash_name.yaml" ) // HashName converts a string to a uint32 integer as the key of bpf map diff --git a/pkg/controller/workload/workload_hash_test.go b/pkg/utils/hash_name_test.go similarity index 99% rename from pkg/controller/workload/workload_hash_test.go rename to pkg/utils/hash_name_test.go index 82837c9cd..1acd0c3c2 100644 --- a/pkg/controller/workload/workload_hash_test.go +++ b/pkg/utils/hash_name_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package workload +package utils import ( "hash/fnv" From 641b686a42747128220985481d8f48ff78b45ddd Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Tue, 16 Jul 2024 22:06:03 +0800 Subject: [PATCH 05/19] fix concurrency problem Signed-off-by: 923048992@qq.com <923048992@qq.com> --- pkg/cache/v2/cluster.go | 1 + pkg/controller/ads/cache.go | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 62ae2c7e0..8e7f3db1b 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -135,6 +135,7 @@ func (cache *ClusterCache) Flush() { if err == nil { // reset api status after successfully updated cluster.ApiStatus = core_v2.ApiStatus_NONE + cluster.Id = cache.hashName.StrToNum(name) } else { log.Errorf("cluster %s %s flush failed: %v", name, cluster.ApiStatus, err) } diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index c125ed0c1..aafd4feb7 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -48,16 +48,13 @@ type AdsCache struct { ListenerCache cache_v2.ListenerCache ClusterCache cache_v2.ClusterCache RouteCache cache_v2.RouteConfigCache - hashName *utils.HashName } func NewAdsCache() *AdsCache { - hashName := utils.NewHashName() return &AdsCache{ ListenerCache: cache_v2.NewListenerCache(), - ClusterCache: cache_v2.NewClusterCache(hashName), + ClusterCache: cache_v2.NewClusterCache(utils.NewHashName()), RouteCache: cache_v2.NewRouteConfigCache(), - hashName: hashName, } } @@ -66,7 +63,6 @@ func (load *AdsCache) CreateApiClusterByCds(status core_v2.ApiStatus, cluster *c apiCluster := &cluster_v2.Cluster{ ApiStatus: status, Name: clusterName, - Id: load.hashName.StrToNum(clusterName), ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), @@ -84,7 +80,6 @@ func (load *AdsCache) UpdateApiClusterIfExists(status core_v2.ApiStatus, cluster apiCluster := &cluster_v2.Cluster{ ApiStatus: status, Name: clusterName, - Id: load.hashName.StrToNum(clusterName), ConnectTimeout: uint32(cluster.GetConnectTimeout().GetSeconds()), LbPolicy: cluster_v2.Cluster_LbPolicy(cluster.GetLbPolicy()), CircuitBreakers: newApiCircuitBreakers(cluster.GetCircuitBreakers()), From e9d88a6e5cef84d15f98b77d2df080720300989c Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Thu, 18 Jul 2024 14:56:21 +0800 Subject: [PATCH 06/19] remove cluster_sock_connect hook in passive connect cb Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/sockops.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/bpf/kmesh/ads/sockops.c b/bpf/kmesh/ads/sockops.c index 9ca168814..63860ebeb 100644 --- a/bpf/kmesh/ads/sockops.c +++ b/bpf/kmesh/ads/sockops.c @@ -67,8 +67,6 @@ int sockops_prog(struct bpf_sock_ops *skops) case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); - } else { - on_cluster_sock_connect(skops); } break; case BPF_SOCK_OPS_STATE_CB: From a2f7b49d71ca270703aba25f898aa20b3d882daa Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Fri, 19 Jul 2024 18:16:15 +0800 Subject: [PATCH 07/19] fix logic Signed-off-by: 923048992@qq.com <923048992@qq.com> --- pkg/cache/v2/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 8e7f3db1b..a4aa19383 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -131,11 +131,11 @@ func (cache *ClusterCache) Flush() { defer cache.mutex.Unlock() for name, cluster := range cache.apiClusterCache { if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE { + cluster.Id = cache.hashName.StrToNum(name) err := maps_v2.ClusterUpdate(name, cluster) if err == nil { // reset api status after successfully updated cluster.ApiStatus = core_v2.ApiStatus_NONE - cluster.Id = cache.hashName.StrToNum(name) } else { log.Errorf("cluster %s %s flush failed: %v", name, cluster.ApiStatus, err) } From 1125f80557e024ca95b33fdb4dc9ec0c6de6a683 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Sat, 20 Jul 2024 20:21:36 +0800 Subject: [PATCH 08/19] add connection reject logic Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 30 ++++++++++++++++++----- bpf/kmesh/ads/include/cluster.h | 21 +++++----------- bpf/kmesh/ads/include/ctx/sock_addr.h | 2 ++ bpf/kmesh/ads/include/ctx/sock_ops.h | 7 ++++++ kernel/ko_src/kmesh/defer_connect.c | 8 ++++++ samples/circuitbreaker/circuitbreaker.yml | 16 ++++++++++++ 6 files changed, 63 insertions(+), 21 deletions(-) create mode 100644 samples/circuitbreaker/circuitbreaker.yml diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index b39de2c11..22e7d1464 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -67,21 +67,39 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ } } -static inline void on_cluster_sock_bind(struct bpf_sock *sk, __u32 cluster_id) +static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster) { + __u32 cluster_id = cluster->id; + struct cluster_stats_key key = {0}; + __u64 cookie = bpf_get_netns_cookie(ctx); + key.cluster_id = cluster_id; + key.netns_cookie = cookie; + struct cluster_stats *stats = NULL; + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, &key); + + if (stats != NULL) { + Cluster__CircuitBreakers *cbs = NULL; + cbs = kmesh_get_ptr_val(cluster->circuit_breakers); + if (cbs != NULL && stats->active_connections >= cbs->max_connections) { + BPF_LOG(DEBUG, KMESH, "Current active connections %d exceeded max connections %d, reject connection\n", stats->active_connections, cbs->max_connections); + return 0; + } + } + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster id = %ld\n", cluster_id); + struct cluster_sock_data *data = NULL; - if (!sk) { + if (!ctx->sk) { BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); - return; + return 1; } - - data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); + data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); if (!data) { BPF_LOG(ERR, KMESH, "on_cluster_sock_bind call bpf_sk_storage_get failed\n"); - return; + return 1; } data->cluster_id = cluster_id; + return 1; } static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index f39601518..1d1ae95b7 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -69,20 +69,6 @@ static inline int map_add_cluster_eps(const char *cluster_name, const struct clu return kmesh_map_update_elem(&map_of_cluster_eps, cluster_name, eps); } -static inline Cluster__CircuitBreakers *get_cluster_circuit_breakers(const char *cluster_name) -{ - const Cluster__Cluster *cluster = NULL; - cluster = map_lookup_cluster(cluster_name); - if (!cluster) { - return NULL; - } - Cluster__CircuitBreakers *cbs = NULL; - cbs = kmesh_get_ptr_val(cluster->circuit_breakers); - if (cbs != NULL) - BPF_LOG(DEBUG, KMESH, "get cluster's circuit breaker: max connections = %ld\n", cbs->max_connections); - return cbs; -} - static inline int cluster_add_endpoints(const Endpoint__LocalityLbEndpoints *lb_ep, struct cluster_endpoints *cluster_eps) { @@ -331,7 +317,12 @@ int cluster_manager(ctx_buff_t *ctx) if (cluster == NULL) return KMESH_TAIL_CALL_RET(ENOENT); - on_cluster_sock_bind(ctx->sk, cluster->id); + ret = on_cluster_sock_bind(ctx, cluster); + if (!ret) { + // open circuit breaker, should reject here. + MARK_REJECTED(ctx); + return KMESH_TAIL_CALL_RET(ret); + } ret = cluster_handle_loadbalance(cluster, &addr, ctx); return KMESH_TAIL_CALL_RET(ret); } diff --git a/bpf/kmesh/ads/include/ctx/sock_addr.h b/bpf/kmesh/ads/include/ctx/sock_addr.h index 0bfb2a1a8..a988450ac 100644 --- a/bpf/kmesh/ads/include/ctx/sock_addr.h +++ b/bpf/kmesh/ads/include/ctx/sock_addr.h @@ -21,4 +21,6 @@ typedef struct bpf_sock_addr ctx_buff_t; (ctx)->user_ip4 = (address)->ipv4; \ (ctx)->user_port = (address)->port +#define MARK_REJECTED(ctx) + #endif //__BPF_CTX_SOCK_ADDR_H diff --git a/bpf/kmesh/ads/include/ctx/sock_ops.h b/bpf/kmesh/ads/include/ctx/sock_ops.h index 2760635f6..516212496 100644 --- a/bpf/kmesh/ads/include/ctx/sock_ops.h +++ b/bpf/kmesh/ads/include/ctx/sock_ops.h @@ -23,6 +23,13 @@ typedef struct bpf_sock_ops ctx_buff_t; #define SET_CTX_ADDRESS(ctx, address) \ (ctx)->remote_ip4 = (address)->ipv4; \ (ctx)->remote_port = (address)->port + +#define MARK_REJECTED(ctx) \ + BPF_LOG(INFO, KMESH, "mark reject"); \ + (ctx)->remote_ip4 = 0; \ + (ctx)->remote_port = 0 +#else +#define MARK_REJECTED(ctx) #endif #endif //__BPF_CTX_SOCK_OPS_H diff --git a/kernel/ko_src/kmesh/defer_connect.c b/kernel/ko_src/kmesh/defer_connect.c index 24eb200d0..a8b4010c6 100644 --- a/kernel/ko_src/kmesh/defer_connect.c +++ b/kernel/ko_src/kmesh/defer_connect.c @@ -72,6 +72,14 @@ static int defer_connect(struct sock *sk, struct msghdr *msg, size_t size) kbuf_size); daddr = sk->sk_daddr; dport = sk->sk_dport; + + if (daddr == 0 && dport == 0) { + tcp_set_state(sk, TCP_CLOSE); + sk->sk_route_caps = 0; + inet_sk(sk)->inet_dport = 0; + err = -1; + goto out; + } #else memset(&sock_ops, 0, offsetof(struct bpf_sock_ops_kern, temp)); if (sk_fullsock(sk)) { diff --git a/samples/circuitbreaker/circuitbreaker.yml b/samples/circuitbreaker/circuitbreaker.yml new file mode 100644 index 000000000..78f0c3adb --- /dev/null +++ b/samples/circuitbreaker/circuitbreaker.yml @@ -0,0 +1,16 @@ +apiVersion: networking.istio.io/v1beta1 +kind: DestinationRule +metadata: + name: httpbin-destination-rule +spec: + host: httpbin + trafficPolicy: + portLevelSettings: + - port: + number: 8000 + connectionPool: + http: + http1MaxPendingRequests: 1 + maxRequestsPerConnection: 1 + tcp: + maxConnections: 1 \ No newline at end of file From 09a0dcca385c81d6564916a42018f91b2b65a088 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Sat, 20 Jul 2024 20:26:37 +0800 Subject: [PATCH 09/19] fix lint Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 7 ++++++- bpf/kmesh/ads/include/ctx/sock_ops.h | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 22e7d1464..e893964b8 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -81,7 +81,12 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * Cluster__CircuitBreakers *cbs = NULL; cbs = kmesh_get_ptr_val(cluster->circuit_breakers); if (cbs != NULL && stats->active_connections >= cbs->max_connections) { - BPF_LOG(DEBUG, KMESH, "Current active connections %d exceeded max connections %d, reject connection\n", stats->active_connections, cbs->max_connections); + BPF_LOG( + DEBUG, + KMESH, + "Current active connections %d exceeded max connections %d, reject connection\n", + stats->active_connections, + cbs->max_connections); return 0; } } diff --git a/bpf/kmesh/ads/include/ctx/sock_ops.h b/bpf/kmesh/ads/include/ctx/sock_ops.h index 516212496..fa3b6930d 100644 --- a/bpf/kmesh/ads/include/ctx/sock_ops.h +++ b/bpf/kmesh/ads/include/ctx/sock_ops.h @@ -24,9 +24,9 @@ typedef struct bpf_sock_ops ctx_buff_t; (ctx)->remote_ip4 = (address)->ipv4; \ (ctx)->remote_port = (address)->port -#define MARK_REJECTED(ctx) \ - BPF_LOG(INFO, KMESH, "mark reject"); \ - (ctx)->remote_ip4 = 0; \ +#define MARK_REJECTED(ctx) \ + BPF_LOG(INFO, KMESH, "mark reject"); \ + (ctx)->remote_ip4 = 0; \ (ctx)->remote_port = 0 #else #define MARK_REJECTED(ctx) From 2427a1d69099888ee34df7e3aa32c24de35e2563 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Tue, 23 Jul 2024 09:47:59 +0800 Subject: [PATCH 10/19] fix lint Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/ctx/sock_ops.h | 2 +- kernel/ko_src/kmesh/defer_connect.c | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bpf/kmesh/ads/include/ctx/sock_ops.h b/bpf/kmesh/ads/include/ctx/sock_ops.h index fa3b6930d..e86cc9531 100644 --- a/bpf/kmesh/ads/include/ctx/sock_ops.h +++ b/bpf/kmesh/ads/include/ctx/sock_ops.h @@ -25,7 +25,7 @@ typedef struct bpf_sock_ops ctx_buff_t; (ctx)->remote_port = (address)->port #define MARK_REJECTED(ctx) \ - BPF_LOG(INFO, KMESH, "mark reject"); \ + BPF_LOG(DEBUG, KMESH, "mark reject"); \ (ctx)->remote_ip4 = 0; \ (ctx)->remote_port = 0 #else diff --git a/kernel/ko_src/kmesh/defer_connect.c b/kernel/ko_src/kmesh/defer_connect.c index a8b4010c6..ef7d1d7aa 100644 --- a/kernel/ko_src/kmesh/defer_connect.c +++ b/kernel/ko_src/kmesh/defer_connect.c @@ -73,6 +73,8 @@ static int defer_connect(struct sock *sk, struct msghdr *msg, size_t size) daddr = sk->sk_daddr; dport = sk->sk_dport; + // daddr == 0 && dport == 0 are special flags meaning the circuit breaker is open + // Should reject connection here if (daddr == 0 && dport == 0) { tcp_set_state(sk, TCP_CLOSE); sk->sk_route_caps = 0; From 5eb14e09c4f01de05177626463b86ff3edbe64be Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Tue, 23 Jul 2024 11:16:55 +0800 Subject: [PATCH 11/19] fix some errors Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 12 ++++++++---- bpf/kmesh/ads/include/cluster.h | 2 +- bpf/kmesh/ads/sockops.c | 5 ----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index e893964b8..b22049492 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -47,6 +47,10 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); if (!stats) { struct cluster_stats new_stats = {0}; + if (delta < 0) { + BPF_LOG(DEBUG, KMESH, "invalid delta update"); + return; + } new_stats.active_connections = delta; BPF_LOG( DEBUG, @@ -87,7 +91,7 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * "Current active connections %d exceeded max connections %d, reject connection\n", stats->active_connections, cbs->max_connections); - return 0; + return 1; } } @@ -96,15 +100,15 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * struct cluster_sock_data *data = NULL; if (!ctx->sk) { BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); - return 1; + return 0; } data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); if (!data) { BPF_LOG(ERR, KMESH, "on_cluster_sock_bind call bpf_sk_storage_get failed\n"); - return 1; + return 0; } data->cluster_id = cluster_id; - return 1; + return 0; } static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) diff --git a/bpf/kmesh/ads/include/cluster.h b/bpf/kmesh/ads/include/cluster.h index 1d1ae95b7..1c3af82c1 100644 --- a/bpf/kmesh/ads/include/cluster.h +++ b/bpf/kmesh/ads/include/cluster.h @@ -318,7 +318,7 @@ int cluster_manager(ctx_buff_t *ctx) return KMESH_TAIL_CALL_RET(ENOENT); ret = on_cluster_sock_bind(ctx, cluster); - if (!ret) { + if (ret) { // open circuit breaker, should reject here. MARK_REJECTED(ctx); return KMESH_TAIL_CALL_RET(ret); diff --git a/bpf/kmesh/ads/sockops.c b/bpf/kmesh/ads/sockops.c index 63860ebeb..6cfac1773 100644 --- a/bpf/kmesh/ads/sockops.c +++ b/bpf/kmesh/ads/sockops.c @@ -64,11 +64,6 @@ int sockops_prog(struct bpf_sock_ops *skops) on_cluster_sock_connect(skops); } break; - case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: - if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) { - BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); - } - break; case BPF_SOCK_OPS_STATE_CB: if (skops->args[1] == BPF_TCP_CLOSE) { on_cluster_sock_close(skops); From a8ff81f34c4624f0501061723ef1ae755382ef7b Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Tue, 23 Jul 2024 20:51:01 +0800 Subject: [PATCH 12/19] make it thread safe Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 44 ++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index b22049492..9a25e312a 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -46,29 +46,29 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ struct cluster_stats *stats = NULL; stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); if (!stats) { - struct cluster_stats new_stats = {0}; - if (delta < 0) { - BPF_LOG(DEBUG, KMESH, "invalid delta update"); - return; - } - new_stats.active_connections = delta; - BPF_LOG( - DEBUG, - KMESH, - "create new stats(netns_cookie = %lld, cluster_id = %ld)", - key->netns_cookie, - key->cluster_id); - kmesh_map_update_elem(&map_of_cluster_stats, key, &new_stats); - } else { - stats->active_connections += delta; - kmesh_map_update_elem(&map_of_cluster_stats, key, stats); - BPF_LOG( - DEBUG, - KMESH, - "update existing stats(netns_cookie = %lld, cluster_id = %ld)", - key->netns_cookie, - key->cluster_id); + struct cluster_stats init_value = {0}; + bpf_map_update_elem(&map_of_cluster_stats, key, &init_value, BPF_NOEXIST); + stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); + } + + if (!stats) { + BPF_LOG(ERR, KMESH, "failed to get cluster stats"); + return; + } + if (delta < 0 && -delta > stats->active_connections) { + BPF_LOG(ERR, KMESH, "invalid delta update"); + return; } + + __sync_fetch_and_add(&stats->active_connections, delta); + BPF_LOG(DEBUG, KMESH, "current active connections: %d\n", stats->active_connections); + + BPF_LOG( + DEBUG, + KMESH, + "update existing stats(netns_cookie = %lld, cluster_id = %ld)", + key->netns_cookie, + key->cluster_id); } static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster) From 5d476ab9d664ba536e10cdb49d4f1b63479feeac Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Tue, 23 Jul 2024 21:01:36 +0800 Subject: [PATCH 13/19] change return val Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 9a25e312a..617ea3701 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -91,7 +91,7 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * "Current active connections %d exceeded max connections %d, reject connection\n", stats->active_connections, cbs->max_connections); - return 1; + return -1; } } From e9622807b5e38c97434372799e7c0ef7a680d212 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Wed, 24 Jul 2024 15:08:19 +0800 Subject: [PATCH 14/19] add log type Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 32 ++++++++++++------------- bpf/kmesh/ads/include/kmesh_common.h | 15 ++++++------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 617ea3701..76f5de3cb 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -52,23 +52,23 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ } if (!stats) { - BPF_LOG(ERR, KMESH, "failed to get cluster stats"); + BPF_LOG(ERR, CIRCUIT_BREAKER, "failed to get cluster stats"); return; } if (delta < 0 && -delta > stats->active_connections) { - BPF_LOG(ERR, KMESH, "invalid delta update"); + BPF_LOG(ERR, CIRCUIT_BREAKER, "invalid delta update"); return; } __sync_fetch_and_add(&stats->active_connections, delta); - BPF_LOG(DEBUG, KMESH, "current active connections: %d\n", stats->active_connections); BPF_LOG( DEBUG, - KMESH, - "update existing stats(netns_cookie = %lld, cluster_id = %ld)", + CIRCUIT_BREAKER, + "update existing stats(netns_cookie = %lld, cluster_id = %ld), current active connections: %d", key->netns_cookie, - key->cluster_id); + key->cluster_id, + stats->active_connections); } static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster) @@ -87,24 +87,24 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * if (cbs != NULL && stats->active_connections >= cbs->max_connections) { BPF_LOG( DEBUG, - KMESH, - "Current active connections %d exceeded max connections %d, reject connection\n", + CIRCUIT_BREAKER, + "Current active connections %d exceeded max connections %d, reject connection", stats->active_connections, cbs->max_connections); return -1; } } - BPF_LOG(DEBUG, KMESH, "record sock bind for cluster id = %ld\n", cluster_id); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock bind for cluster id = %ld", cluster_id); struct cluster_sock_data *data = NULL; if (!ctx->sk) { - BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); + BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL"); return 0; } data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); if (!data) { - BPF_LOG(ERR, KMESH, "on_cluster_sock_bind call bpf_sk_storage_get failed\n"); + BPF_LOG(ERR, CIRCUIT_BREAKER, "on_cluster_sock_bind call bpf_sk_storage_get failed"); return 0; } data->cluster_id = cluster_id; @@ -115,7 +115,7 @@ static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk) { struct cluster_sock_data *data = NULL; if (!sk) { - BPF_LOG(DEBUG, KMESH, "provided sock is NULL\n"); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "provided sock is NULL"); return NULL; } @@ -138,12 +138,12 @@ static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) key.cluster_id = data->cluster_id; BPF_LOG( DEBUG, - KMESH, + CIRCUIT_BREAKER, "increase cluster active connections(netns_cookie = %lld, cluster id = %ld)", key.netns_cookie, key.cluster_id); update_cluster_active_connections(&key, 1); - BPF_LOG(DEBUG, KMESH, "record sock connection for cluster id = %ld\n", data->cluster_id); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock connection for cluster id = %ld", data->cluster_id); } static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) @@ -162,11 +162,11 @@ static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) update_cluster_active_connections(&key, -1); BPF_LOG( DEBUG, - KMESH, + CIRCUIT_BREAKER, "decrease cluster active connections(netns_cookie = %lld, cluster id = %ld)", key.netns_cookie, key.cluster_id); - BPF_LOG(DEBUG, KMESH, "record sock close for cluster id = %ld", data->cluster_id); + BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock close for cluster id = %ld", data->cluster_id); } #endif \ No newline at end of file diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index aee16fe6b..b7cc229f9 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -11,13 +11,14 @@ #include "core/address.pb-c.h" #include "tail_call_index.h" -#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON -#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON -#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON -#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON -#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON -#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON -#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON +#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON +#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON +#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON +#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON +#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON +#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON +#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON +#define BPF_LOGTYPE_CIRCUIT_BREAKER BPF_DEBUG_ON #define BPF_OK 1 From 573156f8e75adb5b425a05c7326b1fa1ed7f4e42 Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Wed, 24 Jul 2024 15:56:30 +0800 Subject: [PATCH 15/19] clear cluster stats in user space Signed-off-by: 923048992@qq.com <923048992@qq.com> --- daemon/manager/manager.go | 2 +- pkg/bpf/ads/loader.go | 4 + pkg/bpf/ads/loader_enhanced.go | 4 + pkg/bpf/bpf.go | 7 + pkg/bpf/bpf_kmesh.go | 455 ++++++++++++++++++ pkg/bpf/workload/sock_connection.go | 3 +- pkg/bpf/workload/sock_ops.go | 3 +- pkg/bpf/workload/xdp.go | 3 +- pkg/cache/v2/cluster.go | 45 +- pkg/cache/v2/cluster_test.go | 8 +- pkg/controller/ads/ads_controller.go | 5 +- pkg/controller/ads/ads_controller_test.go | 2 +- pkg/controller/ads/ads_processor.go | 5 +- pkg/controller/ads/ads_processor_test.go | 66 +-- pkg/controller/ads/cache.go | 6 +- pkg/controller/ads/cache_test.go | 16 +- pkg/controller/client.go | 5 +- pkg/controller/client_test.go | 7 +- pkg/controller/controller.go | 7 +- pkg/controller/workload/workload_processor.go | 4 +- .../workload/workload_processor_test.go | 2 +- pkg/dns/dns_test.go | 4 +- pkg/utils/hash_name.go | 8 + 23 files changed, 597 insertions(+), 74 deletions(-) create mode 100644 pkg/bpf/bpf_kmesh.go diff --git a/daemon/manager/manager.go b/daemon/manager/manager.go index f634c19ce..87b9f6368 100644 --- a/daemon/manager/manager.go +++ b/daemon/manager/manager.go @@ -91,7 +91,7 @@ func Execute(configs *options.BootstrapConfigs) error { stopCh := make(chan struct{}) defer close(stopCh) - c := controller.NewController(configs, bpfLoader.GetBpfWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog, configs.BpfConfig.EnableAccesslog) + c := controller.NewController(configs, bpfLoader.GetBpfKmesh(), bpfLoader.GetBpfWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog, configs.BpfConfig.EnableAccesslog) if err := c.Start(stopCh); err != nil { return err } diff --git a/pkg/bpf/ads/loader.go b/pkg/bpf/ads/loader.go index 6058ec5c6..8068744aa 100644 --- a/pkg/bpf/ads/loader.go +++ b/pkg/bpf/ads/loader.go @@ -141,6 +141,10 @@ func (sc *BpfAds) Detach() error { return nil } +func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { + return nil +} + func AdsL7Enabled() bool { return false } diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index 13b9808ef..07ca3a02f 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -185,6 +185,10 @@ func (sc *BpfAds) Detach() error { return nil } +func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { + return sc.SockOps.KmeshSockopsMaps.MapOfClusterStats +} + func AdsL7Enabled() bool { return false } diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 2d7c300e9..487857010 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -104,6 +104,13 @@ func (l *BpfLoader) Start() error { return nil } +func (l *BpfLoader) GetBpfKmesh() *ads.BpfAds { + if l == nil { + return nil + } + return l.obj +} + func (l *BpfLoader) GetBpfWorkload() *workload.BpfWorkload { if l == nil { return nil diff --git a/pkg/bpf/bpf_kmesh.go b/pkg/bpf/bpf_kmesh.go new file mode 100644 index 000000000..39ca56b9f --- /dev/null +++ b/pkg/bpf/bpf_kmesh.go @@ -0,0 +1,455 @@ +//go:build enhanced +// +build enhanced + +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + + * Author: nlgwcy + * Create: 2022-02-26 + */ + +package bpf + +import ( + "os" + "reflect" + "strconv" + "syscall" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + + "kmesh.net/kmesh/bpf/kmesh/bpf2go" + "kmesh.net/kmesh/daemon/options" +) + +type BpfTracePoint struct { + Info BpfInfo + Link link.Link + bpf2go.KmeshTracePointObjects +} + +type BpfSockOps struct { + Info BpfInfo + Link link.Link + bpf2go.KmeshSockopsObjects +} + +type BpfKmesh struct { + TracePoint BpfTracePoint + SockConn BpfSockConn + SockOps BpfSockOps +} + +func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) { + sc.Info.MapPath = cfg.BpfFsPath + sc.Info.BpfFsPath = cfg.BpfFsPath + sc.Info.BpfVerifyLogSize = cfg.BpfVerifyLogSize + sc.Info.Cgroup2Path = cfg.Cgroup2Path +} + +func (sc *BpfSockOps) NewBpf(cfg *options.BpfConfig) error { + sc.Info.MapPath = cfg.BpfFsPath + "/bpf_kmesh/map/" + sc.Info.BpfFsPath = cfg.BpfFsPath + "/bpf_kmesh/sockops/" + sc.Info.BpfVerifyLogSize = cfg.BpfVerifyLogSize + sc.Info.Cgroup2Path = cfg.Cgroup2Path + + if err := os.MkdirAll(sc.Info.MapPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + if err := os.MkdirAll(sc.Info.BpfFsPath, + syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| + syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { + return err + } + + return nil +} + +func NewBpfKmesh(cfg *options.BpfConfig) (*BpfKmesh, error) { + var err error + + sc := &BpfKmesh{} + + sc.TracePoint.NewBpf(cfg) + + if err = sc.SockOps.NewBpf(cfg); err != nil { + return sc, err + } + + if err = sc.SockConn.NewBpf(cfg); err != nil { + return sc, err + } + return sc, nil +} + +func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, error) { + var ( + err error + spec *ebpf.CollectionSpec + opts ebpf.CollectionOptions + ) + opts.Programs.LogSize = sc.Info.BpfVerifyLogSize + + spec, err = bpf2go.LoadKmeshTracePoint() + if err != nil || spec == nil { + return nil, err + } + + for _, v := range spec.Programs { + if v.Name == "connect_ret" { + v.Type = ebpf.RawTracepointWritable + } + } + + if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { + return nil, err + } + + return spec, nil +} + +func (sc *BpfTracePoint) LoadTracePoint() error { + if _, err := sc.loadKmeshTracePointObjects(); err != nil { + return err + } + return nil +} + +func (sc *BpfSockOps) loadKmeshSockopsObjects() (*ebpf.CollectionSpec, error) { + var ( + err error + spec *ebpf.CollectionSpec + opts ebpf.CollectionOptions + ) + opts.Maps.PinPath = sc.Info.MapPath + opts.Programs.LogSize = sc.Info.BpfVerifyLogSize + + spec, err = bpf2go.LoadKmeshSockops() + + if err != nil || spec == nil { + return nil, err + } + + SetInnerMap(spec) + setMapPinType(spec, ebpf.PinByName) + if err = spec.LoadAndAssign(&sc.KmeshSockopsObjects, &opts); err != nil { + return nil, err + } + + value := reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) + if err = pinPrograms(&value, sc.Info.BpfFsPath); err != nil { + return nil, err + } + + return spec, nil +} + +func (sc *BpfSockOps) loadKmeshFilterObjects() (*ebpf.CollectionSpec, error) { + var ( + err error + spec *ebpf.CollectionSpec + opts ebpf.CollectionOptions + ) + opts.Maps.PinPath = sc.Info.MapPath + opts.Programs.LogSize = sc.Info.BpfVerifyLogSize + + err = sc.KmeshTailCallProg.Update( + uint32(KMESH_TAIL_CALL_FILTER_CHAIN), + uint32(sc.FilterChainManager.FD()), + ebpf.UpdateAny) + if err != nil { + return nil, err + } + + err = sc.KmeshTailCallProg.Update( + uint32(KMESH_TAIL_CALL_FILTER), + uint32(sc.FilterManager.FD()), + ebpf.UpdateAny) + if err != nil { + return nil, err + } + + return spec, nil +} + +func (sc *BpfSockOps) loadRouteConfigObjects() (*ebpf.CollectionSpec, error) { + var ( + err error + spec *ebpf.CollectionSpec + opts ebpf.CollectionOptions + ) + opts.Maps.PinPath = sc.Info.MapPath + opts.Programs.LogSize = sc.Info.BpfVerifyLogSize + + err = sc.KmeshTailCallProg.Update( + uint32(KMESH_TAIL_CALL_ROUTER_CONFIG), + uint32(sc.RouteConfigManager.FD()), + ebpf.UpdateAny) + if err != nil { + return nil, err + } + + return spec, nil +} + +func (sc *BpfSockOps) loadKmeshClusterObjects() (*ebpf.CollectionSpec, error) { + var ( + err error + spec *ebpf.CollectionSpec + opts ebpf.CollectionOptions + ) + opts.Maps.PinPath = sc.Info.MapPath + opts.Programs.LogSize = sc.Info.BpfVerifyLogSize + + err = sc.KmeshTailCallProg.Update( + uint32(KMESH_TAIL_CALL_CLUSTER), + uint32(sc.ClusterManager.FD()), + ebpf.UpdateAny) + if err != nil { + return nil, err + } + + return spec, nil +} + +func (sc *BpfSockOps) LoadSockOps() error { + /* load kmesh sockops main bpf prog */ + spec, err := sc.loadKmeshSockopsObjects() + if err != nil { + return err + } + + prog := spec.Programs["sockops_prog"] + sc.Info.Type = prog.Type + sc.Info.AttachType = prog.AttachType + + /* load kmesh sockops tail call bpf prog */ + if _, err := sc.loadKmeshFilterObjects(); err != nil { + return err + } + + if _, err := sc.loadRouteConfigObjects(); err != nil { + return err + } + + if _, err := sc.loadKmeshClusterObjects(); err != nil { + return err + } + + return nil +} + +func (sc *BpfKmesh) Load() error { + var err error + + if err = sc.TracePoint.LoadTracePoint(); err != nil { + return err + } + + if err = sc.SockOps.LoadSockOps(); err != nil { + return err + } + + if err = sc.SockConn.LoadSockConn(); err != nil { + return err + } + + return nil +} + +func (sc *BpfKmesh) ApiEnvCfg() error { + var err error + var info *ebpf.MapInfo + var id ebpf.MapID + info, err = sc.SockOps.KmeshSockopsMaps.KmeshListener.Info() + + if err != nil { + return err + } + + id, _ = info.ID() + stringId := strconv.Itoa(int(id)) + if err = os.Setenv("Listener", stringId); err != nil { + return err + } + + info, _ = sc.SockOps.KmeshSockopsMaps.OuterMap.Info() + id, _ = info.ID() + stringId = strconv.Itoa(int(id)) + if err = os.Setenv("OUTTER_MAP_ID", stringId); err != nil { + return err + } + + info, _ = sc.SockOps.KmeshSockopsMaps.InnerMap.Info() + id, _ = info.ID() + stringId = strconv.Itoa(int(id)) + if err = os.Setenv("INNER_MAP_ID", stringId); err != nil { + return err + } + + info, _ = sc.SockOps.MapOfRouterConfig.Info() + id, _ = info.ID() + stringId = strconv.Itoa(int(id)) + if err = os.Setenv("RouteConfiguration", stringId); err != nil { + return err + } + + info, _ = sc.SockOps.KmeshCluster.Info() + id, _ = info.ID() + stringId = strconv.Itoa(int(id)) + if err = os.Setenv("Cluster", stringId); err != nil { + return err + } + + return nil +} + +func (sc *BpfTracePoint) Attach() error { + tpopt := link.RawTracepointOptions{ + Name: "connect_ret", + Program: sc.KmeshTracePointObjects.ConnectRet, + } + + lk, err := link.AttachRawTracepoint(tpopt) + if err != nil { + return err + } + sc.Link = lk + + return nil +} + +func (sc *BpfSockOps) Attach() error { + cgopt := link.CgroupOptions{ + Path: sc.Info.Cgroup2Path, + Attach: sc.Info.AttachType, + Program: sc.KmeshSockopsObjects.SockopsProg, + } + + lk, err := link.AttachCgroup(cgopt) + if err != nil { + return err + } + sc.Link = lk + + return nil +} + +func (sc *BpfKmesh) Attach() error { + var err error + + if err = sc.TracePoint.Attach(); err != nil { + return err + } + + if err = sc.SockOps.Attach(); err != nil { + return err + } + + if err = sc.SockConn.Attach(); err != nil { + return err + } + return nil +} + +func (sc *BpfTracePoint) close() error { + return sc.KmeshTracePointObjects.Close() +} + +func (sc *BpfSockOps) close() error { + if err := sc.KmeshSockopsObjects.Close(); err != nil { + return err + } + return nil +} + +func (sc *BpfKmesh) close() error { + var err error + + if err = sc.SockOps.close(); err != nil { + return err + } + + if err = sc.SockConn.close(); err != nil { + return err + } + + if err = sc.TracePoint.close(); err != nil { + return err + } + return nil +} + +func (sc *BpfTracePoint) Detach() error { + if err := sc.close(); err != nil { + return err + } + + if sc.Link != nil { + return sc.Link.Close() + } + return nil +} + +func (sc *BpfSockOps) Detach() error { + var value reflect.Value + + if err := sc.close(); err != nil { + return err + } + + value = reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) + if err := unpinPrograms(&value); err != nil { + return err + } + value = reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsMaps) + if err := unpinMaps(&value); err != nil { + return err + } + + if err := os.RemoveAll(sc.Info.BpfFsPath); err != nil && !os.IsNotExist(err) { + return err + } + + if sc.Link != nil { + return sc.Link.Close() + } + return nil +} + +func (sc *BpfKmesh) Detach() error { + var err error + + if err = sc.TracePoint.Detach(); err != nil { + return err + } + + if err = sc.SockOps.Detach(); err != nil { + return err + } + + if err = sc.SockConn.Detach(); err != nil { + return err + } + return nil +} + +func (sc *BpfKmesh) GetClusterStatsMap() *ebpf.Map { + return sc.SockOps.KmeshSockopsMaps.MapOfClusterStats +} diff --git a/pkg/bpf/workload/sock_connection.go b/pkg/bpf/workload/sock_connection.go index aa37748f2..b38c6f912 100644 --- a/pkg/bpf/workload/sock_connection.go +++ b/pkg/bpf/workload/sock_connection.go @@ -22,13 +22,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/restart" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" diff --git a/pkg/bpf/workload/sock_ops.go b/pkg/bpf/workload/sock_ops.go index db58e8573..6199594e9 100644 --- a/pkg/bpf/workload/sock_ops.go +++ b/pkg/bpf/workload/sock_ops.go @@ -23,13 +23,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/restart" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/restart" "kmesh.net/kmesh/pkg/bpf/utils" helper "kmesh.net/kmesh/pkg/utils" ) diff --git a/pkg/bpf/workload/xdp.go b/pkg/bpf/workload/xdp.go index 02cc77707..3b182dedc 100644 --- a/pkg/bpf/workload/xdp.go +++ b/pkg/bpf/workload/xdp.go @@ -22,13 +22,12 @@ import ( "reflect" "syscall" - "kmesh.net/kmesh/pkg/bpf/utils" - "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "kmesh.net/kmesh/bpf/kmesh/bpf2go" "kmesh.net/kmesh/daemon/options" + "kmesh.net/kmesh/pkg/bpf/utils" "kmesh.net/kmesh/pkg/constants" helper "kmesh.net/kmesh/pkg/utils" ) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index a4aa19383..5634f3777 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -22,27 +22,44 @@ package cache_v2 import ( "sync" + "github.com/cilium/ebpf" "k8s.io/apimachinery/pkg/util/sets" cluster_v2 "kmesh.net/kmesh/api/v2/cluster" core_v2 "kmesh.net/kmesh/api/v2/core" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" maps_v2 "kmesh.net/kmesh/pkg/cache/v2/maps" "kmesh.net/kmesh/pkg/utils" ) +type ClusterStatsKey struct { + NetnsCookie uint64 + ClusterId uint32 +} + +type ClusterStatsValue struct { + ActiveConnections uint32 +} + type ClusterCache struct { mutex sync.RWMutex apiClusterCache apiClusterCache // resourceHash[0]:cds resourceHash[1]:eds - resourceHash map[string][2]uint64 - hashName *utils.HashName + resourceHash map[string][2]uint64 + hashName *utils.HashName + clusterStatsMap *ebpf.Map } -func NewClusterCache(hashName *utils.HashName) ClusterCache { +func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCache { + var clusterStatsMap *ebpf.Map + if bpfAds != nil { + clusterStatsMap = bpfAds.GetClusterStatsMap() + } return ClusterCache{ apiClusterCache: newApiClusterCache(), resourceHash: make(map[string][2]uint64), hashName: hashName, + clusterStatsMap: clusterStatsMap, } } @@ -125,6 +142,27 @@ func (cache *ClusterCache) SetEdsHash(key string, value uint64) { cache.resourceHash[key] = [2]uint64{cache.resourceHash[key][0], value} } +func (cache *ClusterCache) clearClusterStats(clusterName string) { + if cache.clusterStatsMap == nil { + return + } + clusterId := cache.hashName.StrToNum(clusterName) + var key ClusterStatsKey + it := cache.clusterStatsMap.Iterate() + for it.Next(&key, nil) { + if key.ClusterId == clusterId { + if err := cache.clusterStatsMap.Delete(&key); err != nil { + log.Errorf("failed to delete key %v: %s", key, err) + } else { + log.Debugf("remove cluster stats with key %v", key) + } + } + } + if err := it.Err(); err != nil { + log.Errorf("delete iteration error: %s", err) + } +} + // Flush flushes the cluster to bpf map. func (cache *ClusterCache) Flush() { cache.mutex.Lock() @@ -144,6 +182,7 @@ func (cache *ClusterCache) Flush() { if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) + cache.clearClusterStats(name) cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index 69e7b812e..03f9916c9 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -58,7 +58,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache(utils.NewHashName()) + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -101,7 +101,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache(utils.NewHashName()) + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UPDATE, Name: "ut-cluster1", @@ -155,7 +155,7 @@ func TestClusterFlush(t *testing.T) { patches2.Reset() }() - cache := NewClusterCache(utils.NewHashName()) + cache := NewClusterCache(nil, utils.NewHashName()) cluster1 := &cluster_v2.Cluster{ ApiStatus: core_v2.ApiStatus_UNCHANGED, Name: "ut-cluster1", @@ -336,7 +336,7 @@ func BenchmarkClusterFlush(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - cache := NewClusterCache(utils.NewHashName()) + cache := NewClusterCache(nil, utils.NewHashName()) cluster.Name = rand.String(6) cluster.ApiStatus = core_v2.ApiStatus_UPDATE cache.SetApiCluster(cluster.Name, &cluster) diff --git a/pkg/controller/ads/ads_controller.go b/pkg/controller/ads/ads_controller.go index 7b57840d3..cc993f319 100644 --- a/pkg/controller/ads/ads_controller.go +++ b/pkg/controller/ads/ads_controller.go @@ -24,6 +24,7 @@ import ( resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "istio.io/istio/pkg/channels" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" "kmesh.net/kmesh/pkg/logger" ) @@ -42,9 +43,9 @@ type connection struct { stopCh chan struct{} } -func NewController() *Controller { +func NewController(bpfAds *bpfads.BpfAds) *Controller { return &Controller{ - Processor: newProcessor(), + Processor: newProcessor(bpfAds), } } diff --git a/pkg/controller/ads/ads_controller_test.go b/pkg/controller/ads/ads_controller_test.go index b8f22e01d..70c4a36d6 100644 --- a/pkg/controller/ads/ads_controller_test.go +++ b/pkg/controller/ads/ads_controller_test.go @@ -117,7 +117,7 @@ func TestHandleAdsStream(t *testing.T) { } defer fakeClient.Cleanup() - adsStream := NewController() + adsStream := NewController(nil) adsStream.con = &connection{Stream: fakeClient.AdsClient, requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](), stopCh: make(chan struct{})} patches1 := gomonkey.NewPatches() diff --git a/pkg/controller/ads/ads_processor.go b/pkg/controller/ads/ads_processor.go index d49f22de1..7f0a17d1b 100644 --- a/pkg/controller/ads/ads_processor.go +++ b/pkg/controller/ads/ads_processor.go @@ -32,6 +32,7 @@ import ( admin_v2 "kmesh.net/kmesh/api/v2/admin" core_v2 "kmesh.net/kmesh/api/v2/core" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/config" "kmesh.net/kmesh/pkg/utils/hash" @@ -56,9 +57,9 @@ type processor struct { DnsResolverChan chan []*config_cluster_v3.Cluster } -func newProcessor() *processor { +func newProcessor(bpfAds *bpfads.BpfAds) *processor { return &processor{ - Cache: NewAdsCache(), + Cache: NewAdsCache(bpfAds), ack: nil, req: nil, lastNonce: &lastNonce{}, diff --git a/pkg/controller/ads/ads_processor_test.go b/pkg/controller/ads/ads_processor_test.go index 1e703008e..dfdf46c15 100644 --- a/pkg/controller/ads/ads_processor_test.go +++ b/pkg/controller/ads/ads_processor_test.go @@ -50,7 +50,7 @@ func TestHandleCdsResponse(t *testing.T) { cleanup, _ := test.InitBpfMap(t, config) t.Cleanup(cleanup) t.Run("new cluster, cluster type is eds", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -79,7 +79,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("new cluster, cluster type is not eds", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -108,7 +108,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("cluster update case", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -159,7 +159,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("multiClusters: add a new eds cluster", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) multiClusters := []*config_cluster_v3.Cluster{ { @@ -242,7 +242,7 @@ func TestHandleCdsResponse(t *testing.T) { }) t.Run("multiClusters: remove cluster", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.DnsResolverChan = make(chan []*config_cluster_v3.Cluster, 5) cluster := &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -302,9 +302,9 @@ func TestHandleEdsResponse(t *testing.T) { cleanup, _ := test.InitBpfMap(t, config) t.Cleanup(cleanup) t.Run("cluster's apiStatus is UPDATE", func(t *testing.T) { - p := newProcessor() - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + p := newProcessor(nil) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_UPDATE, @@ -335,9 +335,9 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("cluster's apiStatus is Waiting", func(t *testing.T) { - p := newProcessor() - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + p := newProcessor(nil) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, @@ -363,14 +363,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("not apiStatus_UPDATE", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_ALL, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ @@ -392,14 +392,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("already have cluster, not update", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -423,11 +423,11 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("no apicluster, p.ack not be changed", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{} adsLoader.ClusterCache.SetApiCluster("", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -448,14 +448,14 @@ func TestHandleEdsResponse(t *testing.T) { }) t.Run("empty loadAssignment", func(t *testing.T) { - adsLoader := NewAdsCache() - adsLoader.ClusterCache = cache_v2.NewClusterCache(utils.NewHashName()) + adsLoader := NewAdsCache(nil) + adsLoader.ClusterCache = cache_v2.NewClusterCache(nil, utils.NewHashName()) cluster := &cluster_v2.Cluster{ Name: "ut-cluster", ApiStatus: core_v2.ApiStatus_WAITING, } adsLoader.ClusterCache.SetApiCluster("ut-cluster", cluster) - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader loadAssignment := &config_endpoint_v3.ClusterLoadAssignment{ ClusterName: "ut-cluster", @@ -484,12 +484,12 @@ func TestHandleLdsResponse(t *testing.T) { } _, loader := test.InitBpfMap(t, config) t.Run("normal function test", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader filterHttp := &filters_network_http.HttpConnectionManager{ RouteSpecifier: &filters_network_http.HttpConnectionManager_Rds{ @@ -544,12 +544,12 @@ func TestHandleLdsResponse(t *testing.T) { }) t.Run("listenerCache already has resource and it has not been changed", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader listener := &config_listener_v3.Listener{ Name: "ut-listener", @@ -583,12 +583,12 @@ func TestHandleLdsResponse(t *testing.T) { }) t.Run("listenerCache already has resource and it has been changed", func(t *testing.T) { - adsLoader := NewAdsCache() + adsLoader := NewAdsCache(nil) adsLoader.routeNames = []string{ "ut-route-to-client", "ut-route-to-service", } - p := newProcessor() + p := newProcessor(nil) p.Cache = adsLoader listener := &config_listener_v3.Listener{ Name: "ut-listener", @@ -662,7 +662,7 @@ func TestHandleRdsResponse(t *testing.T) { } _, loader := test.InitBpfMap(t, config) t.Run("normal function test", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -692,7 +692,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("empty routeConfig", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -715,7 +715,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("already have a Rds, RdsHash has been changed", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", @@ -757,7 +757,7 @@ func TestHandleRdsResponse(t *testing.T) { }) t.Run("already have a Rds, RdsHash has been change. And have multiRouteconfig in resp", func(t *testing.T) { - p := newProcessor() + p := newProcessor(nil) p.ack = &service_discovery_v3.DiscoveryRequest{ ResourceNames: []string{ "ut-routeclient", diff --git a/pkg/controller/ads/cache.go b/pkg/controller/ads/cache.go index aafd4feb7..f22b4e6bf 100644 --- a/pkg/controller/ads/cache.go +++ b/pkg/controller/ads/cache.go @@ -35,6 +35,7 @@ import ( filter_v2 "kmesh.net/kmesh/api/v2/filter" listener_v2 "kmesh.net/kmesh/api/v2/listener" route_v2 "kmesh.net/kmesh/api/v2/route" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" cache_v2 "kmesh.net/kmesh/pkg/cache/v2" "kmesh.net/kmesh/pkg/nets" "kmesh.net/kmesh/pkg/utils" @@ -50,10 +51,11 @@ type AdsCache struct { RouteCache cache_v2.RouteConfigCache } -func NewAdsCache() *AdsCache { +func NewAdsCache(bpfAds *bpfads.BpfAds) *AdsCache { + hashName := utils.NewHashName() return &AdsCache{ ListenerCache: cache_v2.NewListenerCache(), - ClusterCache: cache_v2.NewClusterCache(utils.NewHashName()), + ClusterCache: cache_v2.NewClusterCache(bpfAds, hashName), RouteCache: cache_v2.NewRouteConfigCache(), } } diff --git a/pkg/controller/ads/cache_test.go b/pkg/controller/ads/cache_test.go index 2c4df8b23..195341bb3 100644 --- a/pkg/controller/ads/cache_test.go +++ b/pkg/controller/ads/cache_test.go @@ -46,7 +46,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }{ { name: "test1: ApiStatus is update, cluster type is EDS", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -75,7 +75,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test2: ApiStatus is update, cluster type is not EDS", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ Name: "ut-cluster", @@ -104,7 +104,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test3: Apistatus is update, cluster type is EDS and cluster not has name", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ ConnectTimeout: &durationpb.Duration{ @@ -132,7 +132,7 @@ func TestCreateApiClusterByCds(t *testing.T) { }, { name: "test4: Apistatus is update, cluster type is not EDS and cluster not has name", - loader: NewAdsCache(), + loader: NewAdsCache(nil), status: core_v2.ApiStatus_UPDATE, cluster: &config_cluster_v3.Cluster{ ConnectTimeout: &durationpb.Duration{ @@ -421,7 +421,7 @@ func TestNewApiSocketAddress(t *testing.T) { func TestCreateApiListenerByLds(t *testing.T) { t.Run("listener filter configtype is filter_typedconfig", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -474,7 +474,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("listener filter configtype is filter_ConfigDiscover", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -526,7 +526,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("status is UNCHANGED", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } @@ -574,7 +574,7 @@ func TestCreateApiListenerByLds(t *testing.T) { }) t.Run("status is UNCHANGED, filterName is pkg_wellknown.HTTPConnectionManager", func(t *testing.T) { - loader := NewAdsCache() + loader := NewAdsCache(nil) loader.routeNames = []string{ "ut-route", } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 1bee12eac..9032c4c19 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -24,6 +24,7 @@ import ( discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/grpc" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/ads" @@ -47,7 +48,7 @@ type XdsClient struct { xdsConfig *config.XdsConfig } -func NewXdsClient(mode string, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog bool) *XdsClient { +func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog bool) *XdsClient { client := &XdsClient{ mode: mode, xdsConfig: config.GetConfig(mode), @@ -56,7 +57,7 @@ func NewXdsClient(mode string, bpfWorkload *bpfwl.BpfWorkload, enableAccesslog b if mode == constants.DualEngineMode { client.WorkloadController = workload.NewController(bpfWorkload, enableAccesslog) } else if mode == constants.KernelNativeMode { - client.AdsController = ads.NewController() + client.AdsController = ads.NewController(bpfAds) } client.ctx, client.cancel = context.WithCancel(context.Background()) diff --git a/pkg/controller/client_test.go b/pkg/controller/client_test.go index c75a79691..4a82e5d86 100644 --- a/pkg/controller/client_test.go +++ b/pkg/controller/client_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/workload" @@ -37,7 +38,7 @@ import ( func TestRecoverConnection(t *testing.T) { t.Run("test reconnect success", func(t *testing.T) { - utClient := NewXdsClient(constants.KernelNativeMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) patches := gomonkey.NewPatches() defer patches.Reset() iteration := 0 @@ -78,7 +79,7 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.KernelNativeMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) err := utClient.createGrpcStreamClient() assert.NoError(t, err) @@ -125,7 +126,7 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.DualEngineMode, &bpfwl.BpfWorkload{}, false) + utClient := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false) err := utClient.createGrpcStreamClient() assert.NoError(t, err) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 78fe193a2..74fd2f7d4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -21,6 +21,7 @@ import ( "fmt" "kmesh.net/kmesh/daemon/options" + bpfads "kmesh.net/kmesh/pkg/bpf/ads" bpfwl "kmesh.net/kmesh/pkg/bpf/workload" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/bypass" @@ -38,6 +39,7 @@ var ( type Controller struct { mode string + bpfAdsObj *bpfads.BpfAds bpfWorkloadObj *bpfwl.BpfWorkload client *XdsClient enableByPass bool @@ -47,10 +49,11 @@ type Controller struct { enableAccesslog bool } -func NewController(opts *options.BootstrapConfigs, bpfWorkloadObj *bpfwl.BpfWorkload, bpfFsPath string, enableBpfLog, enableAccesslog bool) *Controller { +func NewController(opts *options.BootstrapConfigs, bpfAdsObj *bpfads.BpfAds, bpfWorkloadObj *bpfwl.BpfWorkload, bpfFsPath string, enableBpfLog, enableAccesslog bool) *Controller { return &Controller{ mode: opts.BpfConfig.Mode, enableByPass: opts.ByPassConfig.EnableByPass, + bpfAdsObj: bpfAdsObj, bpfWorkloadObj: bpfWorkloadObj, enableSecretManager: opts.SecretManagerConfig.Enable, bpfFsPath: bpfFsPath, @@ -103,7 +106,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { return fmt.Errorf("fail to start ringbuf reader: %v", err) } } - c.client = NewXdsClient(c.mode, c.bpfWorkloadObj, c.enableAccesslog) + c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.enableAccesslog) if c.client.WorkloadController != nil { c.client.WorkloadController.Run(ctx) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index be7172458..be23af2cc 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -633,7 +633,7 @@ func (p *Processor) handleRemovedAddressesDuringRestart() { // but not in userspace cache, that means the data in the bpf map load // from the last epoch is inconsistent with the data that should // actually be stored now. then we should delete it from bpf map - for str, num := range p.hashName.strToNum { + for str, num := range p.hashName.GetStrToNum() { if p.WorkloadCache.GetWorkloadByUid(str) == nil && p.ServiceCache.GetService(str) == nil { log.Debugf("GetWorkloadByUid and GetService nil:%v", str) @@ -705,7 +705,7 @@ func (p *Processor) handleRemovedAuthzPolicyDuringRestart(rbac *auth.Rbac) { * actually be stored now. then we should delete it from bpf map */ policyCache := rbac.GetAllPolicies() - for str, num := range p.hashName.strToNum { + for str, num := range p.hashName.GetStrToNum() { if _, exists := policyCache[str]; !exists { if err := maps_v2.AuthorizationLookup(num, &policyValue); err == nil { log.Debugf("Find policy: [%v:%v] Remove authz policy", str, num) diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 02fa61265..55c89ebea 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -626,7 +626,7 @@ func TestRestart(t *testing.T) { // The hashname will be saved as a file by default. // If it is not cleaned, it will affect other use cases. func hashNameClean(p *Processor) { - for str := range p.hashName.strToNum { + for str := range p.hashName.GetStrToNum() { if err := p.removeWorkloadFromBpfMap(str); err != nil { log.Errorf("RemoveWorkloadResource failed: %v", err) } diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 78c91bad5..68887f831 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -52,7 +52,7 @@ type fakeDNSServer struct { func TestDNS(t *testing.T) { fakeDNSServer := newFakeDNSServer() - testDNSResolver, err := NewDNSResolver(ads.NewAdsCache()) + testDNSResolver, err := NewDNSResolver(ads.NewAdsCache(nil)) if err != nil { t.Fatal(err) } @@ -170,7 +170,7 @@ func TestDNS(t *testing.T) { // This test aims to evaluate the concurrent writing behavior of the adsCache by utilizing the test race feature. // The test verifies the ability of the adsCache to handle concurrent access and updates correctly in a multi-goroutine environment. func TestADSCacheConcurrentWriting(t *testing.T) { - adsCache := ads.NewAdsCache() + adsCache := ads.NewAdsCache(nil) cluster := &clusterv3.Cluster{ Name: "ut-cluster", ClusterDiscoveryType: &clusterv3.Cluster_Type{ diff --git a/pkg/utils/hash_name.go b/pkg/utils/hash_name.go index 22b977048..b542670b5 100644 --- a/pkg/utils/hash_name.go +++ b/pkg/utils/hash_name.go @@ -124,6 +124,14 @@ func (h *HashName) NumToStr(num uint32) string { return h.numToStr[num] } +func (h *HashName) StrToNum(str string) uint32 { + return h.strToNum[str] +} + +func (h *HashName) GetStrToNum() map[string]uint32 { + return h.strToNum +} + func (h *HashName) Delete(str string) { // only when the num exists, we do the logic if num, exists := h.strToNum[str]; exists { From 9a3299dce67fe9f4f8a7a287405937e6320de753 Mon Sep 17 00:00:00 2001 From: Okabe-Rintarou-0 <923048992@qq.com> Date: Wed, 24 Jul 2024 17:59:56 +0800 Subject: [PATCH 16/19] fix delete logic Signed-off-by: Okabe-Rintarou-0 <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 2 ++ pkg/cache/v2/cluster.go | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index 76f5de3cb..a19b5c9dc 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -10,6 +10,7 @@ #define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN +#pragma pack(1) struct cluster_stats { __u32 active_connections; }; @@ -18,6 +19,7 @@ struct cluster_stats_key { __u64 netns_cookie; __u32 cluster_id; }; +#pragma pack() struct { __uint(type, BPF_MAP_TYPE_HASH); diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 5634f3777..a414bbfe8 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -148,16 +148,21 @@ func (cache *ClusterCache) clearClusterStats(clusterName string) { } clusterId := cache.hashName.StrToNum(clusterName) var key ClusterStatsKey + var value ClusterStatsValue + var keysToDelete []ClusterStatsKey it := cache.clusterStatsMap.Iterate() - for it.Next(&key, nil) { + + for it.Next(&key, &value) { if key.ClusterId == clusterId { - if err := cache.clusterStatsMap.Delete(&key); err != nil { - log.Errorf("failed to delete key %v: %s", key, err) - } else { - log.Debugf("remove cluster stats with key %v", key) - } + log.Debugf("remove cluster stats with key %v", key) + keysToDelete = append(keysToDelete, key) } } + if len(keysToDelete) > 0 { + log.Debugf("remove cluster stats: %v", keysToDelete) + cache.clusterStatsMap.BatchDelete(keysToDelete, nil) + } + if err := it.Err(); err != nil { log.Errorf("delete iteration error: %s", err) } @@ -178,11 +183,11 @@ func (cache *ClusterCache) Flush() { log.Errorf("cluster %s %s flush failed: %v", name, cluster.ApiStatus, err) } } else if cluster.GetApiStatus() == core_v2.ApiStatus_DELETE { + cache.clearClusterStats(name) err := maps_v2.ClusterDelete(name) if err == nil { delete(cache.apiClusterCache, name) delete(cache.resourceHash, name) - cache.clearClusterStats(name) cache.hashName.Delete(name) } else { log.Errorf("cluster %s delete failed: %v", name, err) From 283ad727bc69f7ecbf717fc5bb68a16170afa6de Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Wed, 24 Jul 2024 18:12:09 +0800 Subject: [PATCH 17/19] fix lint Signed-off-by: 923048992@qq.com <923048992@qq.com> --- pkg/cache/v2/cluster.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index a414bbfe8..061bfdbfd 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -160,7 +160,10 @@ func (cache *ClusterCache) clearClusterStats(clusterName string) { } if len(keysToDelete) > 0 { log.Debugf("remove cluster stats: %v", keysToDelete) - cache.clusterStatsMap.BatchDelete(keysToDelete, nil) + _, err := cache.clusterStatsMap.BatchDelete(keysToDelete, nil) + if err != nil { + log.Errorf("failed to remove cluster stats: %v", err) + } } if err := it.Err(); err != nil { From 8615e75acc34443812714b96be62a7dcb255d148 Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Thu, 25 Jul 2024 09:28:37 +0800 Subject: [PATCH 18/19] add ut Signed-off-by: 923048992@qq.com <923048992@qq.com> --- pkg/cache/v2/cluster_test.go | 45 ++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index 03f9916c9..c52f05414 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -345,3 +345,48 @@ func BenchmarkClusterFlush(b *testing.B) { assert.Equal(t, core_v2.ApiStatus_NONE, cluster.GetApiStatus()) } } + +func TestClearClusterStats(t *testing.T) { + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, loader := test.InitBpfMap(t, config) + t.Cleanup(cleanup) + + adsObj := loader.GetBpfKmesh() + hashName := utils.NewHashName() + clusterCache := NewClusterCache(adsObj, hashName) + testClusters := []string{"test_cluster1", "test_cluster2", "test_cluster3"} + testKeys := []ClusterStatsKey{ + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 1, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 2, ClusterId: hashName.StrToNum(testClusters[0])}, + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[1])}, + {NetnsCookie: 0, ClusterId: hashName.StrToNum(testClusters[2])}, + } + + testValues := []ClusterStatsValue{ + {ActiveConnections: 1}, + {ActiveConnections: 2}, + {ActiveConnections: 3}, + {ActiveConnections: 4}, + {ActiveConnections: 5}, + } + + clusterStatsMap := adsObj.GetClusterStatsMap() + clusterStatsMap.BatchUpdate(testKeys, testValues, nil) + + var key ClusterStatsKey + var value ClusterStatsValue + for _, cluster := range testClusters { + clusterCache.clearClusterStats(cluster) + iter := clusterStatsMap.Iterate() + clusterId := hashName.StrToNum(cluster) + for iter.Next(&key, &value) { + assert.NotEqual(t, clusterId, key.ClusterId) + } + assert.Nil(t, iter.Err()) + } +} From e9af645d30cdaf00d7ed8b0f8cc0c4b13a761894 Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Wed, 30 Oct 2024 13:17:31 +0800 Subject: [PATCH 19/19] optimize logic Signed-off-by: 923048992@qq.com <923048992@qq.com> --- bpf/kmesh/ads/include/circuit_breaker.h | 14 +- bpf/kmesh/ads/include/config.h | 1 + bpf/kmesh/ads/include/kmesh_common.h | 25 - pkg/bpf/ads/loader.go | 2 +- pkg/bpf/ads/loader_enhanced.go | 2 +- pkg/bpf/bpf_kmesh.go | 455 ------------------ pkg/cache/v2/cluster.go | 1 - pkg/cache/v2/cluster_test.go | 3 +- pkg/controller/workload/workload_processor.go | 12 +- pkg/dns/dns_test.go | 4 +- 10 files changed, 21 insertions(+), 498 deletions(-) delete mode 100644 pkg/bpf/bpf_kmesh.go diff --git a/bpf/kmesh/ads/include/circuit_breaker.h b/bpf/kmesh/ads/include/circuit_breaker.h index a19b5c9dc..50e36e8c7 100644 --- a/bpf/kmesh/ads/include/circuit_breaker.h +++ b/bpf/kmesh/ads/include/circuit_breaker.h @@ -42,10 +42,10 @@ struct { static inline void update_cluster_active_connections(const struct cluster_stats_key *key, int delta) { + struct cluster_stats *stats = NULL; if (!key) { return; } - struct cluster_stats *stats = NULL; stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key); if (!stats) { struct cluster_stats init_value = {0}; @@ -67,7 +67,8 @@ static inline void update_cluster_active_connections(const struct cluster_stats_ BPF_LOG( DEBUG, CIRCUIT_BREAKER, - "update existing stats(netns_cookie = %lld, cluster_id = %ld), current active connections: %d", + "update existing stats(netns_cookie = %lld, cluster_id = %ld), " + "current active connections: %d", key->netns_cookie, key->cluster_id, stats->active_connections); @@ -90,7 +91,8 @@ static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster * BPF_LOG( DEBUG, CIRCUIT_BREAKER, - "Current active connections %d exceeded max connections %d, reject connection", + "Current active connections %d exceeded max connections " + "%d, reject connection", stats->active_connections, cbs->max_connections); return -1; @@ -141,7 +143,8 @@ static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) BPF_LOG( DEBUG, CIRCUIT_BREAKER, - "increase cluster active connections(netns_cookie = %lld, cluster id = %ld)", + "increase cluster active connections(netns_cookie = %lld, cluster " + "id = %ld)", key.netns_cookie, key.cluster_id); update_cluster_active_connections(&key, 1); @@ -165,7 +168,8 @@ static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) BPF_LOG( DEBUG, CIRCUIT_BREAKER, - "decrease cluster active connections(netns_cookie = %lld, cluster id = %ld)", + "decrease cluster active connections(netns_cookie = %lld, cluster " + "id = %ld)", key.netns_cookie, key.cluster_id); BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock close for cluster id = %ld", data->cluster_id); diff --git a/bpf/kmesh/ads/include/config.h b/bpf/kmesh/ads/include/config.h index e1f11366e..6eab55bce 100644 --- a/bpf/kmesh/ads/include/config.h +++ b/bpf/kmesh/ads/include/config.h @@ -46,6 +46,7 @@ #define map_of_virtual_host kmesh_virtual_host #define map_of_route kmesh_route #define map_of_cluster kmesh_cluster +#define map_of_cluster_stats kmesh_cluster_stats #define map_of_loadbalance kmesh_loadbalance #define map_of_endpoint kmesh_endpoint #define map_of_tail_call_prog kmesh_tail_call_prog diff --git a/bpf/kmesh/ads/include/kmesh_common.h b/bpf/kmesh/ads/include/kmesh_common.h index b7cc229f9..2a9c62010 100644 --- a/bpf/kmesh/ads/include/kmesh_common.h +++ b/bpf/kmesh/ads/include/kmesh_common.h @@ -71,31 +71,6 @@ static inline char *bpf_strncpy(char *dst, int n, const char *src) } #endif -struct { - __uint(type, BPF_MAP_TYPE_ARRAY_OF_MAPS); - __uint(key_size, sizeof(__u32)); - __uint(value_size, sizeof(__u32)); - __uint(max_entries, MAP_SIZE_OF_OUTTER_MAP); - __uint(map_flags, 0); -} outer_map SEC(".maps"); - -struct { - __uint(type, BPF_MAP_TYPE_ARRAY); - __uint(key_size, sizeof(__u32)); - __uint(value_size, BPF_INNER_MAP_DATA_LEN); - __uint(max_entries, 1); - __uint(map_flags, 0); -} inner_map SEC(".maps"); - -typedef enum { - KMESH_TAIL_CALL_LISTENER = 1, - KMESH_TAIL_CALL_FILTER_CHAIN, - KMESH_TAIL_CALL_FILTER, - KMESH_TAIL_CALL_ROUTER, - KMESH_TAIL_CALL_CLUSTER, - KMESH_TAIL_CALL_ROUTER_CONFIG, -} tail_call_index_t; - typedef Core__SocketAddress address_t; // bpf return value diff --git a/pkg/bpf/ads/loader.go b/pkg/bpf/ads/loader.go index 8068744aa..a38e56f5d 100644 --- a/pkg/bpf/ads/loader.go +++ b/pkg/bpf/ads/loader.go @@ -142,7 +142,7 @@ func (sc *BpfAds) Detach() error { } func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { - return nil + return sc.SockConn.KmeshCgroupSockMaps.KmeshClusterStats } func AdsL7Enabled() bool { diff --git a/pkg/bpf/ads/loader_enhanced.go b/pkg/bpf/ads/loader_enhanced.go index 07ca3a02f..27b37219a 100644 --- a/pkg/bpf/ads/loader_enhanced.go +++ b/pkg/bpf/ads/loader_enhanced.go @@ -186,7 +186,7 @@ func (sc *BpfAds) Detach() error { } func (sc *BpfAds) GetClusterStatsMap() *ebpf.Map { - return sc.SockOps.KmeshSockopsMaps.MapOfClusterStats + return sc.SockOps.KmeshSockopsMaps.KmeshClusterStats } func AdsL7Enabled() bool { diff --git a/pkg/bpf/bpf_kmesh.go b/pkg/bpf/bpf_kmesh.go deleted file mode 100644 index 39ca56b9f..000000000 --- a/pkg/bpf/bpf_kmesh.go +++ /dev/null @@ -1,455 +0,0 @@ -//go:build enhanced -// +build enhanced - -/* - * Copyright The Kmesh Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - - * Author: nlgwcy - * Create: 2022-02-26 - */ - -package bpf - -import ( - "os" - "reflect" - "strconv" - "syscall" - - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - - "kmesh.net/kmesh/bpf/kmesh/bpf2go" - "kmesh.net/kmesh/daemon/options" -) - -type BpfTracePoint struct { - Info BpfInfo - Link link.Link - bpf2go.KmeshTracePointObjects -} - -type BpfSockOps struct { - Info BpfInfo - Link link.Link - bpf2go.KmeshSockopsObjects -} - -type BpfKmesh struct { - TracePoint BpfTracePoint - SockConn BpfSockConn - SockOps BpfSockOps -} - -func (sc *BpfTracePoint) NewBpf(cfg *options.BpfConfig) { - sc.Info.MapPath = cfg.BpfFsPath - sc.Info.BpfFsPath = cfg.BpfFsPath - sc.Info.BpfVerifyLogSize = cfg.BpfVerifyLogSize - sc.Info.Cgroup2Path = cfg.Cgroup2Path -} - -func (sc *BpfSockOps) NewBpf(cfg *options.BpfConfig) error { - sc.Info.MapPath = cfg.BpfFsPath + "/bpf_kmesh/map/" - sc.Info.BpfFsPath = cfg.BpfFsPath + "/bpf_kmesh/sockops/" - sc.Info.BpfVerifyLogSize = cfg.BpfVerifyLogSize - sc.Info.Cgroup2Path = cfg.Cgroup2Path - - if err := os.MkdirAll(sc.Info.MapPath, - syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| - syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { - return err - } - - if err := os.MkdirAll(sc.Info.BpfFsPath, - syscall.S_IRUSR|syscall.S_IWUSR|syscall.S_IXUSR| - syscall.S_IRGRP|syscall.S_IXGRP); err != nil && !os.IsExist(err) { - return err - } - - return nil -} - -func NewBpfKmesh(cfg *options.BpfConfig) (*BpfKmesh, error) { - var err error - - sc := &BpfKmesh{} - - sc.TracePoint.NewBpf(cfg) - - if err = sc.SockOps.NewBpf(cfg); err != nil { - return sc, err - } - - if err = sc.SockConn.NewBpf(cfg); err != nil { - return sc, err - } - return sc, nil -} - -func (sc *BpfTracePoint) loadKmeshTracePointObjects() (*ebpf.CollectionSpec, error) { - var ( - err error - spec *ebpf.CollectionSpec - opts ebpf.CollectionOptions - ) - opts.Programs.LogSize = sc.Info.BpfVerifyLogSize - - spec, err = bpf2go.LoadKmeshTracePoint() - if err != nil || spec == nil { - return nil, err - } - - for _, v := range spec.Programs { - if v.Name == "connect_ret" { - v.Type = ebpf.RawTracepointWritable - } - } - - if err = spec.LoadAndAssign(&sc.KmeshTracePointObjects, &opts); err != nil { - return nil, err - } - - return spec, nil -} - -func (sc *BpfTracePoint) LoadTracePoint() error { - if _, err := sc.loadKmeshTracePointObjects(); err != nil { - return err - } - return nil -} - -func (sc *BpfSockOps) loadKmeshSockopsObjects() (*ebpf.CollectionSpec, error) { - var ( - err error - spec *ebpf.CollectionSpec - opts ebpf.CollectionOptions - ) - opts.Maps.PinPath = sc.Info.MapPath - opts.Programs.LogSize = sc.Info.BpfVerifyLogSize - - spec, err = bpf2go.LoadKmeshSockops() - - if err != nil || spec == nil { - return nil, err - } - - SetInnerMap(spec) - setMapPinType(spec, ebpf.PinByName) - if err = spec.LoadAndAssign(&sc.KmeshSockopsObjects, &opts); err != nil { - return nil, err - } - - value := reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) - if err = pinPrograms(&value, sc.Info.BpfFsPath); err != nil { - return nil, err - } - - return spec, nil -} - -func (sc *BpfSockOps) loadKmeshFilterObjects() (*ebpf.CollectionSpec, error) { - var ( - err error - spec *ebpf.CollectionSpec - opts ebpf.CollectionOptions - ) - opts.Maps.PinPath = sc.Info.MapPath - opts.Programs.LogSize = sc.Info.BpfVerifyLogSize - - err = sc.KmeshTailCallProg.Update( - uint32(KMESH_TAIL_CALL_FILTER_CHAIN), - uint32(sc.FilterChainManager.FD()), - ebpf.UpdateAny) - if err != nil { - return nil, err - } - - err = sc.KmeshTailCallProg.Update( - uint32(KMESH_TAIL_CALL_FILTER), - uint32(sc.FilterManager.FD()), - ebpf.UpdateAny) - if err != nil { - return nil, err - } - - return spec, nil -} - -func (sc *BpfSockOps) loadRouteConfigObjects() (*ebpf.CollectionSpec, error) { - var ( - err error - spec *ebpf.CollectionSpec - opts ebpf.CollectionOptions - ) - opts.Maps.PinPath = sc.Info.MapPath - opts.Programs.LogSize = sc.Info.BpfVerifyLogSize - - err = sc.KmeshTailCallProg.Update( - uint32(KMESH_TAIL_CALL_ROUTER_CONFIG), - uint32(sc.RouteConfigManager.FD()), - ebpf.UpdateAny) - if err != nil { - return nil, err - } - - return spec, nil -} - -func (sc *BpfSockOps) loadKmeshClusterObjects() (*ebpf.CollectionSpec, error) { - var ( - err error - spec *ebpf.CollectionSpec - opts ebpf.CollectionOptions - ) - opts.Maps.PinPath = sc.Info.MapPath - opts.Programs.LogSize = sc.Info.BpfVerifyLogSize - - err = sc.KmeshTailCallProg.Update( - uint32(KMESH_TAIL_CALL_CLUSTER), - uint32(sc.ClusterManager.FD()), - ebpf.UpdateAny) - if err != nil { - return nil, err - } - - return spec, nil -} - -func (sc *BpfSockOps) LoadSockOps() error { - /* load kmesh sockops main bpf prog */ - spec, err := sc.loadKmeshSockopsObjects() - if err != nil { - return err - } - - prog := spec.Programs["sockops_prog"] - sc.Info.Type = prog.Type - sc.Info.AttachType = prog.AttachType - - /* load kmesh sockops tail call bpf prog */ - if _, err := sc.loadKmeshFilterObjects(); err != nil { - return err - } - - if _, err := sc.loadRouteConfigObjects(); err != nil { - return err - } - - if _, err := sc.loadKmeshClusterObjects(); err != nil { - return err - } - - return nil -} - -func (sc *BpfKmesh) Load() error { - var err error - - if err = sc.TracePoint.LoadTracePoint(); err != nil { - return err - } - - if err = sc.SockOps.LoadSockOps(); err != nil { - return err - } - - if err = sc.SockConn.LoadSockConn(); err != nil { - return err - } - - return nil -} - -func (sc *BpfKmesh) ApiEnvCfg() error { - var err error - var info *ebpf.MapInfo - var id ebpf.MapID - info, err = sc.SockOps.KmeshSockopsMaps.KmeshListener.Info() - - if err != nil { - return err - } - - id, _ = info.ID() - stringId := strconv.Itoa(int(id)) - if err = os.Setenv("Listener", stringId); err != nil { - return err - } - - info, _ = sc.SockOps.KmeshSockopsMaps.OuterMap.Info() - id, _ = info.ID() - stringId = strconv.Itoa(int(id)) - if err = os.Setenv("OUTTER_MAP_ID", stringId); err != nil { - return err - } - - info, _ = sc.SockOps.KmeshSockopsMaps.InnerMap.Info() - id, _ = info.ID() - stringId = strconv.Itoa(int(id)) - if err = os.Setenv("INNER_MAP_ID", stringId); err != nil { - return err - } - - info, _ = sc.SockOps.MapOfRouterConfig.Info() - id, _ = info.ID() - stringId = strconv.Itoa(int(id)) - if err = os.Setenv("RouteConfiguration", stringId); err != nil { - return err - } - - info, _ = sc.SockOps.KmeshCluster.Info() - id, _ = info.ID() - stringId = strconv.Itoa(int(id)) - if err = os.Setenv("Cluster", stringId); err != nil { - return err - } - - return nil -} - -func (sc *BpfTracePoint) Attach() error { - tpopt := link.RawTracepointOptions{ - Name: "connect_ret", - Program: sc.KmeshTracePointObjects.ConnectRet, - } - - lk, err := link.AttachRawTracepoint(tpopt) - if err != nil { - return err - } - sc.Link = lk - - return nil -} - -func (sc *BpfSockOps) Attach() error { - cgopt := link.CgroupOptions{ - Path: sc.Info.Cgroup2Path, - Attach: sc.Info.AttachType, - Program: sc.KmeshSockopsObjects.SockopsProg, - } - - lk, err := link.AttachCgroup(cgopt) - if err != nil { - return err - } - sc.Link = lk - - return nil -} - -func (sc *BpfKmesh) Attach() error { - var err error - - if err = sc.TracePoint.Attach(); err != nil { - return err - } - - if err = sc.SockOps.Attach(); err != nil { - return err - } - - if err = sc.SockConn.Attach(); err != nil { - return err - } - return nil -} - -func (sc *BpfTracePoint) close() error { - return sc.KmeshTracePointObjects.Close() -} - -func (sc *BpfSockOps) close() error { - if err := sc.KmeshSockopsObjects.Close(); err != nil { - return err - } - return nil -} - -func (sc *BpfKmesh) close() error { - var err error - - if err = sc.SockOps.close(); err != nil { - return err - } - - if err = sc.SockConn.close(); err != nil { - return err - } - - if err = sc.TracePoint.close(); err != nil { - return err - } - return nil -} - -func (sc *BpfTracePoint) Detach() error { - if err := sc.close(); err != nil { - return err - } - - if sc.Link != nil { - return sc.Link.Close() - } - return nil -} - -func (sc *BpfSockOps) Detach() error { - var value reflect.Value - - if err := sc.close(); err != nil { - return err - } - - value = reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsPrograms) - if err := unpinPrograms(&value); err != nil { - return err - } - value = reflect.ValueOf(sc.KmeshSockopsObjects.KmeshSockopsMaps) - if err := unpinMaps(&value); err != nil { - return err - } - - if err := os.RemoveAll(sc.Info.BpfFsPath); err != nil && !os.IsNotExist(err) { - return err - } - - if sc.Link != nil { - return sc.Link.Close() - } - return nil -} - -func (sc *BpfKmesh) Detach() error { - var err error - - if err = sc.TracePoint.Detach(); err != nil { - return err - } - - if err = sc.SockOps.Detach(); err != nil { - return err - } - - if err = sc.SockConn.Detach(); err != nil { - return err - } - return nil -} - -func (sc *BpfKmesh) GetClusterStatsMap() *ebpf.Map { - return sc.SockOps.KmeshSockopsMaps.MapOfClusterStats -} diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 061bfdbfd..b1a03b2a6 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -154,7 +154,6 @@ func (cache *ClusterCache) clearClusterStats(clusterName string) { for it.Next(&key, &value) { if key.ClusterId == clusterId { - log.Debugf("remove cluster stats with key %v", key) keysToDelete = append(keysToDelete, key) } } diff --git a/pkg/cache/v2/cluster_test.go b/pkg/cache/v2/cluster_test.go index c52f05414..f1ce6a400 100644 --- a/pkg/cache/v2/cluster_test.go +++ b/pkg/cache/v2/cluster_test.go @@ -348,7 +348,7 @@ func BenchmarkClusterFlush(b *testing.B) { func TestClearClusterStats(t *testing.T) { config := options.BpfConfig{ - Mode: "ads", + Mode: constants.KernelNativeMode, BpfFsPath: "/sys/fs/bpf", Cgroup2Path: "/mnt/kmesh_cgroup2", } @@ -356,6 +356,7 @@ func TestClearClusterStats(t *testing.T) { t.Cleanup(cleanup) adsObj := loader.GetBpfKmesh() + assert.NotNil(t, adsObj) hashName := utils.NewHashName() clusterCache := NewClusterCache(adsObj, hashName) testClusters := []string{"test_cluster1", "test_cluster2", "test_cluster3"} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index be23af2cc..f3f606641 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -53,13 +53,11 @@ type Processor struct { ack *service_discovery_v3.DeltaDiscoveryRequest req *service_discovery_v3.DeltaDiscoveryRequest - hashName *utils.HashName - // workloads indexer, svc key -> workload id - endpointsByService map[string]map[string]struct{} - bpf *bpf.Cache - nodeName string - WorkloadCache cache.WorkloadCache - ServiceCache cache.ServiceCache + hashName *utils.HashName + bpf *bpf.Cache + nodeName string + WorkloadCache cache.WorkloadCache + ServiceCache cache.ServiceCache once sync.Once authzOnce sync.Once diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 68887f831..409e10d05 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -504,10 +504,10 @@ func TestHandleCdsResponseWithDns(t *testing.T) { }, } - p := ads.NewController().Processor + p := ads.NewController(nil).Processor stopCh := make(chan struct{}) defer close(stopCh) - dnsResolver, err := NewDNSResolver(ads.NewAdsCache()) + dnsResolver, err := NewDNSResolver(ads.NewAdsCache(nil)) assert.NoError(t, err) dnsResolver.StartDNSResolver(stopCh) p.DnsResolverChan = dnsResolver.DnsResolverChan