Skip to content

Commit

Permalink
smoe workload mode bugfix and waypoint optimization
Browse files Browse the repository at this point in the history
1.fix potential target port modify error when access backend directly
2.fix a backend that belongs to multi services
3.store waypoint info in service to accelerate waypoint access

Signed-off-by: kwb0523 <kwb0523@163.com>
  • Loading branch information
kwb0523 committed May 27, 2024
1 parent 542a5fd commit 1eb233d
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 101 deletions.
70 changes: 44 additions & 26 deletions bpf/kmesh/workload/include/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,52 +30,70 @@ static inline backend_value *map_lookup_backend(const backend_key *key)
return kmesh_map_lookup_elem(&map_of_backend, key);
}

static inline int backend_manager(ctx_buff_t *ctx, backend_value *backend_v)
static inline int waypoint_manager(ctx_buff_t *ctx, __u32 addr, __u32 port)
{
int ret;
address_t target_addr;
__u64 *sk = (__u64 *)ctx->sk;
struct bpf_sock_tuple value_tuple = {0};

value_tuple.ipv4.daddr = ctx->user_ip4;
value_tuple.ipv4.dport = ctx->user_port;

ret = bpf_map_update_elem(&map_of_dst_info, &sk, &value_tuple, BPF_NOEXIST);
if (ret) {
BPF_LOG(ERR, BACKEND, "record metadata origin address and port failed, ret is %d\n", ret);
return ret;
}
target_addr.ipv4 = addr;
target_addr.port = port;
SET_CTX_ADDRESS(ctx, target_addr);
kmesh_workload_tail_call(ctx, TAIL_CALL_CONNECT4_INDEX);

// if tail call failed will run this code
BPF_LOG(ERR, BACKEND, "workload tail call failed, err is %d\n", ret);
return -ENOEXEC;
}

static inline int backend_manager(ctx_buff_t *ctx, backend_value *backend_v, __u32 service_id, service_value *service_v)
{
int ret;
address_t target_addr;
__u32 user_port = ctx->user_port;

if (backend_v->waypoint_addr != 0 && backend_v->waypoint_port != 0) {
BPF_LOG(
DEBUG,
BACKEND,
"find waypoint addr=[%pI4h:%u]\n",
&backend_v->waypoint_addr,
bpf_ntohs(backend_v->waypoint_port));
value_tuple.ipv4.daddr = ctx->user_ip4;
value_tuple.ipv4.dport = ctx->user_port;

ret = bpf_map_update_elem(&map_of_dst_info, &sk, &value_tuple, BPF_NOEXIST);
if (ret) {
BPF_LOG(ERR, BACKEND, "record metadata origin address and port failed, ret is %d\n", ret);
ret = waypoint_manager(ctx, backend_v->waypoint_addr, backend_v->waypoint_port);
if (ret == -ENOEXEC) {
BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret);
return ret;
}
target_addr.ipv4 = backend_v->waypoint_addr;
target_addr.port = backend_v->waypoint_port;
SET_CTX_ADDRESS(ctx, target_addr);
kmesh_workload_tail_call(ctx, TAIL_CALL_CONNECT4_INDEX);

// if tail call failed will run this code
BPF_LOG(ERR, BACKEND, "workload tail call failed, err is %d\n", ret);
return -ENOEXEC;
}

#pragma unroll
for (unsigned int i = 0; i < backend_v->port_count; i++) {
if (i >= MAX_PORT_COUNT) {
BPF_LOG(WARN, BACKEND, "exceed the max port count\n");
for (__u32 i = 0; i < backend_v->service_count; i++) {
if (i >= MAX_SERVICE_COUNT) {
BPF_LOG(WARN, BACKEND, "exceed the max port count:%d\n", MAX_SERVICE_COUNT);
return -EINVAL;
}

if (ctx->user_port == backend_v->service_port[i]) {
target_addr.ipv4 = backend_v->ipv4;
target_addr.port = backend_v->target_port[i];
SET_CTX_ADDRESS(ctx, target_addr);
BPF_LOG(
DEBUG, BACKEND, "get the backend addr=[%pI4h:%u]\n", &target_addr.ipv4, bpf_ntohs(target_addr.port));
return 0;
if (service_id == backend_v->service[i]) {
BPF_LOG(DEBUG, BACKEND, "access the backend by service:%d\n", service_id);
#pragma unroll
for (__u32 j = 0; j < MAX_PORT_COUNT; j++) {
if (user_port == service_v->service_port[j]) {
target_addr.ipv4 = backend_v->ipv4;
target_addr.port = service_v->target_port[j];
SET_CTX_ADDRESS(ctx, target_addr);
BPF_LOG(DEBUG,
BACKEND, "get the backend addr=[%pI4h:%u]\n", &target_addr.ipv4, bpf_ntohs(target_addr.port));
return 0;
}
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions bpf/kmesh/workload/include/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ static inline endpoint_value *map_lookup_endpoint(const endpoint_key *key)
return kmesh_map_lookup_elem(&map_of_endpoint, key);
}

static inline int endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v)
static inline int
endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v, __u32 service_id, service_value *service_v)
{
int ret = 0;
backend_key backend_k = {0};
Expand All @@ -41,7 +42,7 @@ static inline int endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v)
return -ENOENT;
}

ret = backend_manager(ctx, backend_v);
ret = service_backend_manager(ctx, backend_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, ENDPOINT, "backend_manager failed, ret:%d\n", ret);
Expand Down
17 changes: 12 additions & 5 deletions bpf/kmesh/workload/include/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,18 @@ static inline int frontend_manager(ctx_buff_t *ctx, frontend_value *frontend_v)
}

if (direct_backend) {
ret = backend_manager(ctx, backend_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, FRONTEND, "backend_manager failed, ret:%d\n", ret);
return ret;
// For pod direct access, if a pod has watpoint captured, we will redirect to waypoint, otherwise we do nothing.
if (backend_v->waypoint_addr != 0 && backend_v->waypoint_port != 0) {
BPF_LOG(DEBUG,
FRONTEND,
"find waypoint addr=[%pI4h:%u]\n",
&backend_v->waypoint_addr,
bpf_ntohs(backend_v->waypoint_port));
ret = waypoint_manager(ctx, backend_v->waypoint_addr, backend_v->waypoint_port);
if (ret == -ENOEXEC) {
BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret);
return ret;
}
}
} else {
ret = service_manager(ctx, frontend_v->upstream_id, service_v);
Expand Down
20 changes: 17 additions & 3 deletions bpf/kmesh/workload/include/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static inline service_value *map_lookup_service(const service_key *key)
return kmesh_map_lookup_elem(&map_of_service, key);
}

static inline int lb_random_handle(ctx_buff_t *ctx, int service_id, service_value *service_v)
static inline int lb_random_handle(ctx_buff_t *ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
endpoint_key endpoint_k = {0};
Expand All @@ -42,7 +42,7 @@ static inline int lb_random_handle(ctx_buff_t *ctx, int service_id, service_valu
return -ENOENT;
}

ret = endpoint_manager(ctx, endpoint_v);
ret = endpoint_manager(ctx, endpoint_v, service_id, service_v);
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
Expand All @@ -52,10 +52,24 @@ static inline int lb_random_handle(ctx_buff_t *ctx, int service_id, service_valu
return 0;
}

static inline int service_manager(ctx_buff_t *ctx, int service_id, service_value *service_v)
static inline int service_manager(ctx_buff_t *ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;

if (service_v->waypoint_addr != 0 && service_v->waypoint_port != 0) {
BPF_LOG(
DEBUG,
SERVICE,
"find waypoint addr=[%pI4h:%u]\n",
&service_v->waypoint_addr,
bpf_ntohs(service_v->waypoint_port));
ret = waypoint_manager(ctx, service_v->waypoint_addr, service_v->waypoint_port);
if (ret == -ENOEXEC) {
BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret);
return ret;
}
}

BPF_LOG(DEBUG, SERVICE, "load balance type:%u", service_v->lb_policy);
switch (service_v->lb_policy) {
case LB_POLICY_RANDOM:
Expand Down
19 changes: 12 additions & 7 deletions bpf/kmesh/workload/include/workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

#include "config.h"

#define MAX_PORT_COUNT 10
#define RINGBUF_SIZE (1 << 12)
#define MAX_PORT_COUNT 10
#define MAX_SERVICE_COUNT 10
#define RINGBUF_SIZE (1 << 12)

// frontend map
typedef struct {
Expand All @@ -40,8 +41,13 @@ typedef struct {
} __attribute__((packed)) service_key;

typedef struct {
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 endpoint_count; // endpoint count of current service
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
// is MAX_PORT_COUNT-1
__u32 target_port[MAX_PORT_COUNT];
__u32 waypoint_addr;
__u32 waypoint_port;
} __attribute__((packed)) service_value;

// endpoint map
Expand All @@ -61,9 +67,8 @@ typedef struct {

typedef struct {
__u32 ipv4; // backend ip
__u32 port_count;
__u32 service_port[MAX_PORT_COUNT];
__u32 target_port[MAX_PORT_COUNT];
__u32 service_count;
__u32 service[MAX_SERVICE_COUNT];
__u32 waypoint_addr;
__u32 waypoint_port;
} __attribute__((packed)) backend_value;
Expand Down
12 changes: 5 additions & 7 deletions pkg/controller/workload/bpfcache/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,20 @@ import (
)

const (
ConverNumBase = 10
MaxPortPairNum = 10
ConverNumBase = 10
MaxServiceNum = 10
)

type BackendKey struct {
BackendUid uint32 // workloadUid to uint32
}

type ServicePorts [MaxPortPairNum]uint32
type TargetPorts [MaxPortPairNum]uint32
type ServiceList [MaxServiceNum]uint32

type BackendValue struct {
IPv4 uint32 // backend ip
PortCount uint32
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is PortCount-1
TargetPort TargetPorts
ServiceCount uint32
Services ServiceList
WaypointAddr uint32
WaypointPort uint32
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@ import (
"github.com/cilium/ebpf"
)

const (
MaxPortNum = 10
)

type ServiceKey struct {
ServiceId uint32 // service id
}

type ServicePorts [MaxPortNum]uint32
type TargetPorts [MaxPortNum]uint32

type ServiceValue struct {
EndpointCount uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
EndpointCount uint32 // endpoint count of current service
LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm
ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1
TargetPort TargetPorts
WaypointAddr uint32
WaypointPort uint32
}

func (c *Cache) ServiceUpdate(key *ServiceKey, value *ServiceValue) error {
Expand Down
Loading

0 comments on commit 1eb233d

Please sign in to comment.