Skip to content

Commit

Permalink
bandwidth shaper implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto De Ioris committed Aug 18, 2012
1 parent d2aa655 commit cc9977c
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 19 deletions.
24 changes: 23 additions & 1 deletion blastbeat.h
Expand Up @@ -63,6 +63,8 @@
#define BLASTBEAT_CACHE_MISS -1
#define BLASTBEAT_CACHE_ERROR -2

#define BB_TOKEN_BUCKET_SPEED 0.03

struct bb_virtualhost;
struct bb_session;

Expand Down Expand Up @@ -167,6 +169,17 @@ struct bb_cache_entry {
struct bb_cache_item *tail;
};

struct bb_throttle {
ev_timer throttle;
struct bb_virtualhost *vhost;
};

struct bb_connection_throttle {
ev_prepare throttle;
struct bb_virtualhost *vhost;
struct bb_connection *connection;
};

// a blastbeat virtualhost
struct bb_virtualhost {
char *name;
Expand Down Expand Up @@ -194,7 +207,9 @@ struct bb_virtualhost {
// bandwidth control
uint64_t bandwidth;
uint64_t bandwidth_bucket;
uint64_t bandwidth_last_sent;
struct bb_throttle throttle;
// set to 1 if a virtualhost is throttled
int throttled;

char *ssl_certificate;
char *ssl_key;
Expand Down Expand Up @@ -334,6 +349,8 @@ struct bb_connection {

// write queue
struct bb_writer writer;
// throttle system
struct bb_connection_throttle throttle;

struct bb_session *sessions_head;
struct bb_session *sessions_tail;
Expand Down Expand Up @@ -522,6 +539,8 @@ struct blastbeat_server {
uint32_t sht_size;
struct bb_session_entry *sht;

uint64_t writequeue_buffer;

struct bb_hostname *hnht[BLASTBEAT_HOSTNAME_HTSIZE];

struct bb_dealer *dealers;
Expand Down Expand Up @@ -640,3 +659,6 @@ int bb_pipe_add(struct bb_session *, char *, size_t);
int bb_check_for_pipe(struct bb_session *, char *, size_t, char *, size_t);

SSL_CTX *bb_new_ssl_ctx(void);

void bb_throttle_cb(struct ev_loop *, struct ev_timer *, int);
void bb_connection_throttle_cb(struct ev_loop *, struct ev_prepare *, int);
7 changes: 4 additions & 3 deletions blastbeat.ini
Expand Up @@ -17,7 +17,8 @@ max-hops = 17

sessions = 100

daemon = 1
; 10 mbytes per-connection
writequeue-buffer = 10485760

[blastbeat:quantal64.local:8443]
bind-ssl = 192.168.173.5:8443
Expand All @@ -40,8 +41,8 @@ alias = 192.168.173.5:8083
cache = 1
;timeout = 3

; 10 kbit/s
;bandwidth = 10
; 100 kbit/s
bandwidth = 100

[blastbeat:192.168.173.5:8081]
certificate = quantal.crt
Expand Down
5 changes: 5 additions & 0 deletions src/config.c
Expand Up @@ -483,6 +483,11 @@ static void bb_main_config_add(char *key, char *value) {
return;
}

is_opt( "writequeue-buffer") {
blastbeat.writequeue_buffer = strtoll(value, NULL, 10);
return;
}

}

static void bb_vhost_config_add(char *vhostname, char *key, char *value) {
Expand Down
21 changes: 18 additions & 3 deletions src/main.c
Expand Up @@ -107,6 +107,8 @@ void bb_connection_close(struct bb_connection *bbc) {
// stop I/O
ev_io_stop(blastbeat.loop, &bbc->reader.reader);
ev_io_stop(blastbeat.loop, &bbc->writer.writer);
// stop throttling
ev_prepare_stop(blastbeat.loop, &bbc->throttle.throttle);
// clear SSL if required
if (bbc->ssl) {
// this should be better managed, but why wasting resources ?
Expand Down Expand Up @@ -399,6 +401,9 @@ static void bb_accept_callback(struct ev_loop *loop, struct ev_io *w, int revent
ev_io_init(&bbc->writer.writer, bb_wq_callback, client, EV_WRITE);
bbc->writer.connection = bbc;

// prepare the throttle hook
ev_prepare_init(&bbc->throttle.throttle, bb_connection_throttle_cb);

// prepare a low level connection timeout
ev_timer_init(&bbc->timeout, connection_timer_cb, 0.0, 0.0);
// set the deafult timeout
Expand Down Expand Up @@ -578,6 +583,13 @@ static void bb_vhosts_fix() {
}
memset(vhosts->cache, 0, sizeof(struct bb_cache_entry) * vhosts->cht_size);
}

if (vhosts->bandwidth > 0) {
// start filling the token bucket (30ms frequency);
ev_timer_init(&vhosts->throttle.throttle, bb_throttle_cb, BB_TOKEN_BUCKET_SPEED, BB_TOKEN_BUCKET_SPEED);
vhosts->throttle.vhost = vhosts;
ev_timer_start(blastbeat.loop, &vhosts->throttle.throttle);
}
vhosts = vhosts->next;
}

Expand Down Expand Up @@ -683,6 +695,8 @@ int main(int argc, char *argv[]) {
blastbeat.gid = "nogroup";
blastbeat.max_hops = 10;
blastbeat.max_sessions = 10000;
// 8 MB per-connection
blastbeat.writequeue_buffer = 8*1024*1024;
// 2GB max_memory
blastbeat.max_memory = (uint64_t) 2048*1024*1024;
// default 30 minutes timeout
Expand All @@ -703,7 +717,10 @@ int main(int argc, char *argv[]) {
exit(1);
}

// fix acceptors/vhosts/cache...

blastbeat.loop = EV_DEFAULT;

// fix acceptors/vhosts/cache/bandwidth...
bb_vhosts_fix();

fprintf(stderr,"*** starting BlastBeat ***\n");
Expand Down Expand Up @@ -736,8 +753,6 @@ int main(int argc, char *argv[]) {
memset(blastbeat.sht, 0, sizeof(struct bb_session_entry) * blastbeat.sht_size);


blastbeat.loop = EV_DEFAULT;

// report config, bind sockets and assign ssl keys/certificates
struct bb_acceptor *acceptor = blastbeat.acceptors;
while(acceptor) {
Expand Down
94 changes: 82 additions & 12 deletions src/writequeue.c
Expand Up @@ -14,12 +14,29 @@ extern struct blastbeat_server blastbeat;
The offset in each item is required for managing incomplete writes
Bandwidth limiting
limiting per-vhost bandwidth is vital for QoS
the writequeue uses a token bucket algorithm
->bandwidth -> is the bandiwdth limit (in bytes per second)
->bandwidth_bucket -> is initialized to 0 and incremented by ((->bandwidth*30)/1000) bytes every 30ms
(30ms has been choosen as a good compromise between performance and load, but could be tunable)
as soon as ->bandwidth_bucket == ->bandwidth the token-add hook is stopped (will be restarted as soon as it decrease)
when the underlying socket is ready to WRITE data, N tokens are removed from the bucket (where N is the size of the packet,
if the packet is bigger it will be split)
if a WRITE event happens when the token is 0, we have a non conformant packet, the write event will be stopped, and will be restarted
at the next token-add hook
*/
static int wq_push(struct bb_writer *bbw, char *buf, size_t len, int flags, struct bb_session *bbs) {


// do not enqueue more than 8 megabytes (TODO configure that value)
if (bbw->len+len > 8*1024*1024) {
// check writequeue_buffer
if (bbw->len+len > blastbeat.writequeue_buffer) {
fprintf(stderr,"too much queued datas\n");
return -1;
}
Expand Down Expand Up @@ -64,6 +81,17 @@ static void wq_decapitate(struct bb_writer *bbw) {
bb_free(head, sizeof(struct bb_writer_item));
}

static void bb_throttle(struct bb_virtualhost *vhost, struct bb_connection *bbc) {
// mark the vhost as throttled
vhost->throttled = 1;
// stop the writer
ev_io_stop(blastbeat.loop, &bbc->writer.writer);
bbc->throttle.vhost = vhost;
bbc->throttle.connection = bbc;
// wait for unthrottling
ev_prepare_start(blastbeat.loop, &bbc->throttle.throttle);
}

void bb_wq_callback(struct ev_loop *loop, struct ev_io *w, int revents) {
struct bb_writer *bbw = (struct bb_writer *) w;
struct bb_connection *bbc = bbw->connection;
Expand All @@ -79,18 +107,18 @@ void bb_wq_callback(struct ev_loop *loop, struct ev_io *w, int revents) {
if (bbwi->session) {
// bandwidth check
uint64_t bandwidth = bbwi->session->vhost->bandwidth;
if (bandwidth) {
// if packet is bigger than bucket size, split it
size_t available = bandwidth - bbwi->session->vhost->bandwidth_bucket;
if (bbw_len > available) {
bbw_len -= available;
if (bandwidth > 0) {
// full bucket detected throttle the connection
if (bbwi->session->vhost->bandwidth_bucket == 0) {
bb_throttle(bbwi->session->vhost, bbc);
return;
}
// ok, we now have a valid chunk, can we send it ?
// we have milliseconds resolution
ev_tstamp now = bb_milliseconds;
// too fast, we have to throttle
if (now - bbwi->session->vhost->bandwidth_last_sent <= 0) {

// if packet is bigger than bucket size, split it
if (bbw_len > bbwi->session->vhost->bandwidth_bucket) {
bbw_len = bbwi->session->vhost->bandwidth_bucket;
}

}
}

Expand All @@ -113,6 +141,9 @@ void bb_wq_callback(struct ev_loop *loop, struct ev_io *w, int revents) {
// account transferred bytes to the virtualhost
if (bbwi->session) {
bbwi->session->vhost->tx+=wlen;
if (bbwi->session->vhost->bandwidth > 0) {
bbwi->session->vhost->bandwidth_bucket -= wlen;
}
}

bbw->len -= wlen;
Expand Down Expand Up @@ -206,3 +237,42 @@ int bb_wq_push_copy(struct bb_session *bbs, char *buf, size_t len, int flags) {
bb_wq_start(&bbc->writer);
return 0;
}

void bb_connection_throttle_cb(struct ev_loop *loop, struct ev_prepare *w, int revents) {
struct bb_connection_throttle *bbct = (struct bb_connection_throttle *) w;
struct bb_virtualhost *vhost = bbct->vhost;
struct bb_connection *bbc = bbct->connection;
if (!vhost || !bbc) {
fprintf(stderr,"BUG in throttle system !!!\n");
return;
}

// no more throttled
if (vhost->throttled == 0) {
ev_prepare_stop(blastbeat.loop, w);
// just for safety (could be useful in future implementations)
bbct->vhost = NULL;
bbct->connection = NULL;
bb_wq_start(&bbc->writer);
}
}

void bb_throttle_cb(struct ev_loop *loop, struct ev_timer *w, int revents) {
struct bb_throttle *bbt = (struct bb_throttle *) w;
struct bb_virtualhost *vhost = bbt->vhost;

// bucket could be bigger if we decrease bandwidth from the dealer
// (will be possibile soon ;)
if (vhost->bandwidth_bucket >= vhost->bandwidth) return;

uint64_t token = ((vhost->bandwidth*30)/1000);
if (vhost->bandwidth_bucket + token > vhost->bandwidth) {
vhost->bandwidth_bucket = vhost->bandwidth;
}
else {
vhost->bandwidth_bucket += token;
}
// unthrottle
vhost->throttled = 0;

}

0 comments on commit cc9977c

Please sign in to comment.