Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add static rate limit on slice traffic #324

Merged
merged 23 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, fname):
self.enable_ntf = False
self.notify_sockaddr = "/tmp/notifycp"
self.endmarker_sockaddr = "/tmp/pfcpport"
self.enable_slice_metering = False

def parse(self, ifaces):
# Maximum number of flows to manage ip4 frags for re-assembly
Expand Down Expand Up @@ -149,6 +150,13 @@ def parse(self, ifaces):
print('Can\'t parse interface name(s)! Setting it to default values ({}, {})'.format(
"access", "core"))

# Slice rate limits
try:
self.conf["slice_rate_limit_config"]
self.enable_slice_metering = True
except KeyError:
print("No slice rate limit! Disabling meter.")

# UnixPort Paths
try:
self.notify_sockaddr = self.conf["notify_sockaddr"]
Expand Down
35 changes: 29 additions & 6 deletions conf/up4.bess
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,33 @@ if ntf:
_in -> ntf
_in = ntf

_in -> qerLookup::Qos(fields=[ {'attr_name':'src_iface', 'num_bytes':1}, \
{'attr_name':'qer_id', 'num_bytes':4}, \
{'attr_name':'fseid', 'num_bytes':8}], \
values=[{'attr_name':'qfi', 'num_bytes':1}])
_in -> qerLookup::Qos(fields=[{'attr_name':'src_iface', 'num_bytes':1}, \
{'attr_name':'qer_id', 'num_bytes':4}, \
{'attr_name':'fseid', 'num_bytes':8}], \
values=[{'attr_name':'qfi', 'num_bytes':1}])

executeFAR::Split(size=1, attribute='action')
_in = executeFAR
if parser.enable_slice_metering:
# sliceMeter enforces a per slice, per direction meter rate limit
sliceMeter::Qos(fields=[{'attr_name':'action', 'num_bytes':1}, \
{'attr_name':'tunnel_out_type', 'num_bytes':1}])
# Reserved gates, reject rule adds with gate=1/2/3
m_meter = 0 # Placeholder gate not connected. Will meter if lookup result returns this gate
m_green = 1 # For green traffic
m_yellow = 2 # For yellow traffic
m_red = 3 # For red traffic
# User defined gates
m_fail = 4 # For lookup failure traffic
m_unmeter = 5 # For unmetered traffic
# Admit green and yellow, drop red
sliceMeter:m_green -> executeFAR
sliceMeter:m_yellow -> executeFAR
sliceMeter:m_red -> sliceMeterRed::Sink()
sliceMeter:m_fail -> sliceMeterLookupFail::Sink()
sliceMeter:m_unmeter -> executeFAR
sliceMeter.set_default_gate(gate=m_fail)
_in = sliceMeter

farLookup::ExactMatch(fields=[{'attr_name':'far_id', 'num_bytes':4}, \
{'attr_name':'fseid', 'num_bytes':8}], \
Expand All @@ -213,7 +236,7 @@ farLookup::ExactMatch(fields=[{'attr_name':'far_id', 'num_bytes':4}, \
{'attr_name':'tunnel_out_teid', 'num_bytes':4}, \
{'attr_name':'tunnel_out_udp_port', 'num_bytes':2}]):noGTPUEncap \
-> farMerge::Merge() \
-> executeFAR::Split(size=1, attribute='action')
-> _in

# Add logical pipeline when gtpudecap is needed
pdrLookup:GTPUDecap \
Expand Down Expand Up @@ -338,7 +361,7 @@ accessFastBPF:GTPUGate \
_in = accessRxUDPCksum
gate = 0

# 2. Build the remaining first half of the UL pipeline before entering the shard pipeline
# 2. Build the remaining first half of the UL pipeline before entering the shared pipeline
#ports[parser.access_ifname].rewrite \
_in:gate \
-> SetMetadata(attrs=[{'name':'src_iface', 'size':1, 'value_int':Access}]) \
Expand Down
11 changes: 11 additions & 0 deletions conf/upf.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@
"priority": 5
}
],

"": "Optional slice-wide meter rate limits",
"slice_rate_limit_config": {
"": "uplink policer",
"n6_bps": 500000000,
"n6_burst_bytes": 3000,
"": "downlink policer",
"n3_bps": 500000000,
"n3_burst_bytes": 3000
},

"": "Control plane controller settings",
"cpiface": {
"enable_ue_ip_alloc": false,
Expand Down
15 changes: 9 additions & 6 deletions core/modules/qos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@

#include "qos.h"
#include "utils/endian.h"
#include "utils/ether.h"
#include "utils/format.h"

#include <rte_cycles.h>
#include <string>
#include <vector>

using bess::utils::Ethernet;

typedef enum { FIELD_TYPE = 0, VALUE_TYPE } Type;
using bess::metadata::Attribute;
#define metering_test 0
Expand Down Expand Up @@ -140,7 +137,7 @@ void Qos::ProcessBatch(Context *ctx, bess::PacketBatch *batch) {
default_gate = ACCESS_ONCE(default_gate_);

int cnt = batch->cnt();
struct value *val[cnt];
value *val[cnt];

for (const auto &field : fields_) {
int offset;
Expand Down Expand Up @@ -190,7 +187,7 @@ void Qos::ProcessBatch(Context *ctx, bess::PacketBatch *batch) {
// meter if ogate is 0
if (ogate == METER_GATE) {
uint64_t time = rte_rdtsc();
uint32_t pkt_len = pkt->total_len() - sizeof(Ethernet);
uint32_t pkt_len = pkt->total_len() - val[j]->deduct_len;
uint8_t color = rte_meter_trtcm_color_blind_check(&val[j]->m, &val[j]->p,
time, pkt_len);

Expand Down Expand Up @@ -377,7 +374,7 @@ CommandResponse Qos::CommandAdd(const bess::pb::QosCommandAddArg &arg) {
MeteringKey key = {{0}};

MKey l;
struct value v;
value v;
v.ogate = gate;
CommandResponse err = ExtractKeyMask(arg, &key, &v.Data, &l);

Expand All @@ -391,6 +388,12 @@ CommandResponse Qos::CommandAdd(const bess::pb::QosCommandAddArg &arg) {
v.cbs = arg.cbs();
v.pbs = arg.pbs();
v.ebs = arg.ebs();
if (arg.optional_deduct_len_case() ==
bess::pb::QosCommandAddArg::OPTIONAL_DEDUCT_LEN_NOT_SET) {
v.deduct_len = 14; // Exclude Ethernet header by default
} else {
v.deduct_len = arg.deduct_len();
}

DLOG(INFO) << "Adding entry"
<< " cir: " << v.cir << " pir: " << v.pir << " cbs: " << v.cbs
Expand Down
1 change: 1 addition & 0 deletions core/modules/qos.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct value {
uint64_t cbs;
uint64_t pbs;
uint64_t ebs;
int64_t deduct_len;
struct rte_meter_trtcm_profile p;
struct rte_meter_trtcm m;
MeteringKey Data;
Expand Down
7 changes: 5 additions & 2 deletions patches/bess/0015-Protobuf-changes-for-Qos.patch
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ diff --git a/protobuf/module_msg.proto b/protobuf/module_msg.proto
index 25dfc81e..1f7058cd 100644
--- a/protobuf/module_msg.proto
+++ b/protobuf/module_msg.proto
@@ -1291,3 +1291,38 @@ message MplsPopArg {
@@ -1291,3 +1291,41 @@ message MplsPopArg {
message WorkerSplitArg {
map<uint32, uint32> worker_gates = 1; // ogate -> worker mask
}
Expand All @@ -28,6 +28,9 @@ index 25dfc81e..1f7058cd 100644
+ uint64 cbs = 4;
+ uint64 pbs = 5;
+ uint64 ebs = 6;
+ oneof optional_deduct_len {
+ int64 deduct_len = 9;
+ }
+ repeated FieldData fields = 7;
+ repeated FieldData values = 8;
+}
Expand All @@ -50,6 +53,6 @@ index 25dfc81e..1f7058cd 100644
+message QosCommandSetDefaultGateArg {
+ uint64 gate = 1;
+}
--
--
2.17.1

124 changes: 124 additions & 0 deletions pfcpiface/bess.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
qerGateUnmeter
)

const (
// Internal gates for Slice meter.
sliceMeterGateMeter uint64 = iota
sliceMeterGateLookupFail = iota + 4
sliceMeterGateUnmeter
)

var intEnc = func(u uint64) *pb.FieldData {
return &pb.FieldData{Encoding: &pb.FieldData_ValueInt{ValueInt: u}}
}
Expand Down Expand Up @@ -400,6 +407,17 @@ func (b *bess) setUpfInfo(u *upf, conf *Conf) {

go b.endMarkerSendLoop(b.endMarkerChan)
}

if conf.SliceMeterConfig.N6RateBps > 0 || conf.SliceMeterConfig.N3RateBps > 0 {
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
defer cancel()
done := make(chan bool)
b.addSliceMeter(ctx, done, conf.SliceMeterConfig)
rc := b.GRPCJoin(1, Timeout, done)
if !rc {
log.Errorln("Unable to make GRPC calls")
}
}
}

func (b *bess) sim(u *upf, method string) {
Expand Down Expand Up @@ -1010,6 +1028,112 @@ func (b *bess) delCounter(ctx context.Context, done chan<- bool, ctrID uint32, c
}()
}

func (b *bess) processSliceMeter(ctx context.Context, any *anypb.Any, method upfMsgType) {
if method != upfMsgTypeAdd && method != upfMsgTypeDel && method != upfMsgTypeClear {
log.Errorln("Invalid method name: ", method)
return
}

methods := [...]string{"add", "add", "delete", "clear"}

_, err := b.client.ModuleCommand(
ctx, &pb.CommandRequest{
Name: "sliceMeter",
Cmd: methods[method],
Arg: any,
},
)
if err != nil {
log.Errorln("sliceMeter method failed!:", err)
}
}

func (b *bess) addSliceMeter(ctx context.Context, done chan<- bool, meterConfig SliceMeterConfig) {
go func() {
var (
any *anypb.Any
err error
cir, pir, cbs, ebs, pbs, gate uint64
)

// Uplink N6 slice meter config
if meterConfig.N6RateBps != 0 {
gate = sliceMeterGateMeter
cir = 1 // Mark all traffic as yellow
pir = meterConfig.N6RateBps / 8 // bit/s to byte/s
} else {
gate = sliceMeterGateUnmeter
}
if meterConfig.N6BurstBytes != 0 {
cbs = 1 // Mark all traffic as yellow
pbs = meterConfig.N6BurstBytes
ebs = 0 // Unused
} else {
cbs = DefaultBurstSize
pbs = DefaultBurstSize
ebs = 0 // Unused
}
q := &pb.QosCommandAddArg{
Gate: gate,
Cir: cir, /* committed info rate */
Pir: pir, /* peak info rate */
Cbs: cbs, /* committed burst size */
Pbs: pbs, /* Peak burst size */
Ebs: ebs, /* Excess burst size */
OptionalDeductLen: &pb.QosCommandAddArg_DeductLen{0}, /* Include all headers */
Fields: []*pb.FieldData{
intEnc(uint64(farForwardU)), /* Action */
intEnc(uint64(0)), /* tunnel_out_type */
},
}
any, err = anypb.New(q)
if err != nil {
log.Errorln("Error marshalling the rule", q, err)
return
}
b.processSliceMeter(ctx, any, upfMsgTypeAdd)

// Downlink N3 slice meter config
if meterConfig.N3RateBps != 0 {
gate = sliceMeterGateMeter
cir = 1 // Mark all traffic as yellow
pir = meterConfig.N3RateBps / 8 // bit/s to byte/s
} else {
gate = sliceMeterGateUnmeter
}
if meterConfig.N3BurstBytes != 0 {
cbs = 1 // Mark all traffic as yellow
pbs = meterConfig.N3BurstBytes
ebs = 0 // Unused
} else {
cbs = DefaultBurstSize
pbs = DefaultBurstSize
ebs = 0 // Unused
}
// TODO: packet deduction should take GTPU extension header into account
q = &pb.QosCommandAddArg{
Gate: gate,
Cir: cir, /* committed info rate */
Pir: pir, /* peak info rate */
Cbs: cbs, /* committed burst size */
Pbs: pbs, /* Peak burst size */
Ebs: ebs, /* Excess burst size */
OptionalDeductLen: &pb.QosCommandAddArg_DeductLen{50}, /* Exclude Ethernet,IP,UDP,GTP header */
Fields: []*pb.FieldData{
intEnc(uint64(farForwardD)), /* Action */
intEnc(uint64(1)), /* tunnel_out_type */
},
}
any, err = anypb.New(q)
if err != nil {
log.Errorln("Error marshalling the rule", q, err)
return
}
b.processSliceMeter(ctx, any, upfMsgTypeAdd)
done <- true
}()
}

func (b *bess) removeAllPDRs(ctx context.Context, done chan<- bool) {
go func() {
var (
Expand Down