Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/cluster/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ message Cluster {

core.ApiStatus api_status = 128;
string name = 1;
uint32 id = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weired, seems the tag number is random

uint32 connect_timeout = 4;
LbPolicy lb_policy = 6;

Expand Down
39 changes: 26 additions & 13 deletions api/v2-c/cluster/cluster.pb-c.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const ProtobufCEnumDescriptor cluster__cluster__lb_policy__descriptor =
cluster__cluster__lb_policy__value_ranges,
NULL,NULL,NULL,NULL /* reserved[1234] */
};
static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[7] =
{
{
"name",
Expand All @@ -96,6 +96,18 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"id",
2,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_UINT32,
0, /* quantifier_offset */
offsetof(Cluster__Cluster, id),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"connect_timeout",
4,
Expand Down Expand Up @@ -158,22 +170,23 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
},
};
static const unsigned cluster__cluster__field_indices_by_name[] = {
5, /* field[5] = api_status */
3, /* field[3] = circuit_breakers */
1, /* field[1] = connect_timeout */
2, /* field[2] = lb_policy */
4, /* field[4] = load_assignment */
6, /* field[6] = api_status */
4, /* field[4] = circuit_breakers */
2, /* field[2] = connect_timeout */
1, /* field[1] = id */
3, /* field[3] = lb_policy */
5, /* field[5] = load_assignment */
0, /* field[0] = name */
};
static const ProtobufCIntRange cluster__cluster__number_ranges[6 + 1] =
{
{ 1, 0 },
{ 4, 1 },
{ 6, 2 },
{ 10, 3 },
{ 33, 4 },
{ 128, 5 },
{ 0, 6 }
{ 4, 2 },
{ 6, 3 },
{ 10, 4 },
{ 33, 5 },
{ 128, 6 },
{ 0, 7 }
};
const ProtobufCMessageDescriptor cluster__cluster__descriptor =
{
Expand All @@ -183,7 +196,7 @@ const ProtobufCMessageDescriptor cluster__cluster__descriptor =
"Cluster__Cluster",
"cluster",
sizeof(Cluster__Cluster),
6,
7,
cluster__cluster__field_descriptors,
cluster__cluster__field_indices_by_name,
6, cluster__cluster__number_ranges,
Expand Down
3 changes: 2 additions & 1 deletion api/v2-c/cluster/cluster.pb-c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions api/v2/cluster/cluster.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 178 additions & 0 deletions bpf/kmesh/ads/include/circuit_breaker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */
/* Copyright Authors of Kmesh */

#include "bpf_log.h"
#include "kmesh_common.h"
#include "bpf_common.h"

#ifndef __KMESH_CIRCUIT_BREAKER_H__
#define __KMESH_CIRCUIT_BREAKER_H__

#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN

#pragma pack(1)
struct cluster_stats {
__u32 active_connections;
};

struct cluster_stats_key {
__u64 netns_cookie;
__u32 cluster_id;
};
#pragma pack()

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(key_size, sizeof(struct cluster_stats_key));
__uint(value_size, sizeof(struct cluster_stats));
__uint(map_flags, BPF_F_NO_PREALLOC);
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
} map_of_cluster_stats SEC(".maps");

struct cluster_sock_data {
__u32 cluster_id;
};

struct {
__uint(type, BPF_MAP_TYPE_SK_STORAGE);
__uint(map_flags, BPF_F_NO_PREALLOC);
__type(key, int);
__type(value, struct cluster_sock_data);
} map_of_cluster_sock SEC(".maps");

static inline void update_cluster_active_connections(const struct cluster_stats_key *key, int delta)
{
struct cluster_stats *stats = NULL;
if (!key) {
return;
}
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key);
if (!stats) {
struct cluster_stats init_value = {0};
bpf_map_update_elem(&map_of_cluster_stats, key, &init_value, BPF_NOEXIST);
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key);
}

if (!stats) {
BPF_LOG(ERR, CIRCUIT_BREAKER, "failed to get cluster stats");
return;
}
if (delta < 0 && -delta > stats->active_connections) {
BPF_LOG(ERR, CIRCUIT_BREAKER, "invalid delta update");
return;
}

__sync_fetch_and_add(&stats->active_connections, delta);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it only protect stats->active_connections, not the whole stats. At the same time, if a nother prog fetches stats, it can also update the value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's atomic, update will be before-or-after, only one processor will update stats->active_connections at a time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, map_lookup is a memory reference, so this atomic op is concurrent safe


BPF_LOG(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove, it makes no sense

DEBUG,
CIRCUIT_BREAKER,
"update existing stats(netns_cookie = %lld, cluster_id = %ld), "
"current active connections: %d",
key->netns_cookie,
key->cluster_id,
stats->active_connections);
}

static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster)
{
__u32 cluster_id = cluster->id;
struct cluster_stats_key key = {0};
__u64 cookie = bpf_get_netns_cookie(ctx);
key.cluster_id = cluster_id;
key.netns_cookie = cookie;
struct cluster_stats *stats = NULL;
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, &key);

if (stats != NULL) {
Cluster__CircuitBreakers *cbs = NULL;
cbs = kmesh_get_ptr_val(cluster->circuit_breakers);
if (cbs != NULL && stats->active_connections >= cbs->max_connections) {
BPF_LOG(
DEBUG,
CIRCUIT_BREAKER,
"Current active connections %d exceeded max connections "
"%d, reject connection",
stats->active_connections,
cbs->max_connections);
return -1;
}
}

BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock bind for cluster id = %ld", cluster_id);

struct cluster_sock_data *data = NULL;
if (!ctx->sk) {
BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL");
BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL\n");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i am busy these weeks. will modify it this weekend.

return 0;
}
data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE);
if (!data) {
BPF_LOG(ERR, CIRCUIT_BREAKER, "on_cluster_sock_bind call bpf_sk_storage_get failed");
return 0;
}
data->cluster_id = cluster_id;
return 0;
}

static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk)
{
struct cluster_sock_data *data = NULL;
if (!sk) {
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "provided sock is NULL");
return NULL;
}

data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, 0);
return data;
}

static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx)
{
if (!ctx) {
return;
}
struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk);
if (!data) {
return;
}
__u64 cookie = bpf_get_netns_cookie(ctx);
struct cluster_stats_key key = {0};
key.netns_cookie = cookie;
key.cluster_id = data->cluster_id;
BPF_LOG(
DEBUG,
CIRCUIT_BREAKER,
"increase cluster active connections(netns_cookie = %lld, cluster "
"id = %ld)",
key.netns_cookie,
key.cluster_id);
update_cluster_active_connections(&key, 1);
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock connection for cluster id = %ld", data->cluster_id);
}

static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx)
{
if (!ctx) {
return;
}
struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk);
if (!data) {
return;
}
__u64 cookie = bpf_get_netns_cookie(ctx);
struct cluster_stats_key key = {0};
key.netns_cookie = cookie;
key.cluster_id = data->cluster_id;
update_cluster_active_connections(&key, -1);
BPF_LOG(
DEBUG,
CIRCUIT_BREAKER,
"decrease cluster active connections(netns_cookie = %lld, cluster "
"id = %ld)",
key.netns_cookie,
key.cluster_id);
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock close for cluster id = %ld", data->cluster_id);
}

#endif
9 changes: 8 additions & 1 deletion bpf/kmesh/ads/include/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "tail_call.h"
#include "cluster/cluster.pb-c.h"
#include "endpoint/endpoint.pb-c.h"
#include "circuit_breaker.h"

#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN

Expand Down Expand Up @@ -263,7 +264,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_

name = kmesh_get_ptr_val(cluster->name);
if (!name) {
BPF_LOG(ERR, CLUSTER, "filed to get cluster\n");
BPF_LOG(ERR, CLUSTER, "failed to get cluster\n");
return -EAGAIN;
}

Expand Down Expand Up @@ -316,6 +317,12 @@ int cluster_manager(ctx_buff_t *ctx)
if (cluster == NULL)
return KMESH_TAIL_CALL_RET(ENOENT);

ret = on_cluster_sock_bind(ctx, cluster);
if (ret) {
// open circuit breaker, should reject here.
MARK_REJECTED(ctx);
return KMESH_TAIL_CALL_RET(ret);
}
ret = cluster_handle_loadbalance(cluster, &addr, ctx);
return KMESH_TAIL_CALL_RET(ret);
}
Expand Down
1 change: 1 addition & 0 deletions bpf/kmesh/ads/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#define map_of_virtual_host kmesh_virtual_host
#define map_of_route kmesh_route
#define map_of_cluster kmesh_cluster
#define map_of_cluster_stats kmesh_cluster_stats
#define map_of_loadbalance kmesh_loadbalance
#define map_of_endpoint kmesh_endpoint
#define map_of_tail_call_prog kmesh_tail_call_prog
Expand Down
2 changes: 2 additions & 0 deletions bpf/kmesh/ads/include/ctx/sock_addr.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ typedef struct bpf_sock_addr ctx_buff_t;
(ctx)->user_ip4 = (address)->ipv4; \
(ctx)->user_port = (address)->port

#define MARK_REJECTED(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what is this for

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cgroups/connect4 and sockops both use cluster_manager, without it will compile fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In kernel-native mode, if the running environment is not enhanced kernel(that is, L7 kernel traffic management is not supported), Kmesh completes L4 traffic management in the connect hook.

// cgroup_sock.c
cgroup_connect4_prog -> sock4_traffic_control -> listener_manager -> SEC_TAIL(KMESH_TAIL_CALL_FILTER_CHAIN) -> SEC_TAIL(KMESH_TAIL_CALL_FILTER) -> tcp_proxy_manager -> SEC_TAIL(KMESH_TAIL_CALL_CLUSTER) -> on_cluster_sock_bind

So, on this path, if circuit breaker is triggered, the service also needs to be rejected.


#endif //__BPF_CTX_SOCK_ADDR_H
7 changes: 7 additions & 0 deletions bpf/kmesh/ads/include/ctx/sock_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ typedef struct bpf_sock_ops ctx_buff_t;
#define SET_CTX_ADDRESS(ctx, address) \
(ctx)->remote_ip4 = (address)->ipv4; \
(ctx)->remote_port = (address)->port

#define MARK_REJECTED(ctx) \
BPF_LOG(DEBUG, KMESH, "mark reject"); \
(ctx)->remote_ip4 = 0; \
(ctx)->remote_port = 0
#else
#define MARK_REJECTED(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MARK_REJECTED does not need to be included in the compilation macro.

#endif

#endif //__BPF_CTX_SOCK_OPS_H
15 changes: 8 additions & 7 deletions bpf/kmesh/ads/include/kmesh_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#include "core/address.pb-c.h"
#include "tail_call_index.h"

#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON
#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON
#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON
#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON
#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON
#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON
#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON
#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON
#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON
#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON
#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON
#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON
#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON
#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON
#define BPF_LOGTYPE_CIRCUIT_BREAKER BPF_DEBUG_ON

#define BPF_OK 1

Expand Down
Loading
Loading