Skip to content

Commit

Permalink
Implement evil optimization which involves mapping the same physical …
Browse files Browse the repository at this point in the history
…page to two adjacent virtual pages
  • Loading branch information
tmick0 committed Aug 15, 2017
1 parent df5bc9d commit 642f89c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CCFLAGS=-Wall -Wpedantic -Werror -std=c99 -fsanitize=address
CCFLAGS=-Wall -Wpedantic -Werror -std=c99 -fsanitize=address -D_GNU_SOURCE
LDFLAGS=-lpthread -lasan

test: test.c queue.o | queue.h
Expand Down
100 changes: 68 additions & 32 deletions queue.c
Original file line number Diff line number Diff line change
@@ -1,39 +1,83 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>

#include <linux/memfd.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/types.h>

#include "queue_internal.h"

void queue_init(queue_t *q, uint8_t *b, size_t s) {
if(s <= sizeof(message_t)) {
fprintf(stderr, "queue error: Tried to initialize queue backed by buffer of %lu bytes, but %lu a minimum of bytes are needed\n", s, sizeof(message_t));
static inline int memfd_create(const char *name, unsigned int flags) {
return syscall(__NR_memfd_create, name, flags);
}

void queue_init(queue_t *q, size_t s) {

/* we're going to use a trick where we mmap two adjacent pages (in virtual memory) that point to the
* same physical memory. this lets us optimize memory access, by virtue of the fact that we don't need
* to even worry about wrapping our pointers around until we go through the entire buffer. too bad this
* isn't portable, because it's so fun.
*/

if(s % getpagesize() != 0) {
fprintf(stderr, "queue error: Requested size (%lu) is not a multiple of the page size (%d)\n", s, getpagesize());
abort();
}

// create an anonymous file backed by memory
if((q->fd = memfd_create("queue_region", 0)) == -1){
fprintf(stderr, "queue error: Could not obtain anonymous file (errno %d)\n", errno);
abort();
}

// set buffer size
if(ftruncate(q->fd, s) != 0){
fprintf(stderr, "queue error: Could not set size of anonymous file (errno %d)\n", errno);
abort();
}

// mmap first region
if((q->buffer = mmap(NULL, s, PROT_READ | PROT_WRITE, MAP_SHARED, q->fd, 0)) == MAP_FAILED){
fprintf(stderr, "queue error: Could not map buffer into virtual memory (errno %d)\n", errno);
abort();
}

// mmap second region, with exact address
if(mmap(q->buffer + s, s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, q->fd, 0) == MAP_FAILED){
fprintf(stderr, "queue error: Could not map buffer into virtual memory (errno %d)\n", errno);
abort();
}

pthread_mutex_init(&q->lock, NULL);
pthread_cond_init(&q->readable, NULL);
pthread_cond_init(&q->writeable, NULL);

q->buffer = b;
q->size = s;

q->head = 0;
q->tail = 0;
q->head_seq = 0;
q->tail_seq = 0;
q->avail = 0;
}

void queue_destroy(queue_t *q) {
pthread_mutex_destroy(&q->lock);
pthread_cond_destroy(&q->readable);
pthread_cond_destroy(&q->writeable);
munmap(q->buffer + q->size, q->size);
munmap(q->buffer, q->size);
close(q->fd);
}

void queue_put(queue_t *q, uint8_t *buffer, size_t size) {
pthread_mutex_lock(&q->lock);

// wait for space to become available
while(q->size - q->avail < size + sizeof(message_t)) {
while(q->size - (q->tail - q->head) < size + sizeof(message_t)) {
pthread_cond_wait(&q->writeable, &q->lock);
}

Expand All @@ -42,37 +86,23 @@ void queue_put(queue_t *q, uint8_t *buffer, size_t size) {
m.len = size;
m.seq = q->tail_seq++;

// write header
size_t i;
for(i = 0; i < sizeof(message_t); i++) {
q->buffer[q->tail] = ((uint8_t *) &m)[i];
q->tail = (q->tail + 1) % q->size;
}
// write message
memcpy(&q->buffer[q->tail ], &m, sizeof(message_t));
memcpy(&q->buffer[q->tail + sizeof(message_t)], buffer, size );

// write body
for(i = 0; i < size; i++){
q->buffer[q->tail] = buffer[i];
q->tail = (q->tail + 1) % q->size;
}

q->avail += size + sizeof(message_t);
// increment write index
q->tail += size + sizeof(message_t);

pthread_cond_signal(&q->readable);
pthread_mutex_unlock(&q->lock);
}

void queue_peek_header(queue_t const *q, message_t *m) {
size_t i;
for(i = 0; i < sizeof(message_t); i++){
((uint8_t *) m)[i] = q->buffer[(q->head + i) % q->size];
}
memcpy(m, &q->buffer[q->head], sizeof(message_t));
}

void queue_peek_body(queue_t const *q, uint8_t *buffer, size_t len) {
size_t i;
for(i = 0; i < len; i++) {
buffer[i] = q->buffer[(q->head + sizeof(message_t) + i) % q->size];
}
memcpy(buffer, &q->buffer[q->head + sizeof(message_t)], len);
}

size_t queue_get(queue_t *q, uint8_t *buffer, size_t max) {
Expand All @@ -83,7 +113,7 @@ size_t queue_get(queue_t *q, uint8_t *buffer, size_t max) {
for(;;) {

// wait for a message to arrive
while(q->avail == 0){
while((q->tail - q->head) == 0){
pthread_cond_wait(&q->readable, &q->lock);
}

Expand All @@ -105,9 +135,15 @@ size_t queue_get(queue_t *q, uint8_t *buffer, size_t max) {
// read message body
queue_peek_body(q, buffer, m.len);

// consume the message
q->head = (q->head + m.len + sizeof(message_t)) % q->size;
q->avail -= m.len + sizeof(message_t);
// consume the message by incrementing the read pointer
q->head += m.len + sizeof(message_t);

// when read buffer moves into 2nd memory region, we can reset to the 1st region
if(q->head >= q->size) {
q->head -= q->size;
q->tail -= q->size;
}

q->head_seq++;

pthread_cond_signal(&q->writeable);
Expand Down
10 changes: 5 additions & 5 deletions queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ typedef struct __queue_t__ {
uint8_t *buffer;
size_t size;

// backing buffer's memfd descriptor
int fd;

// read / write indices
size_t head;
size_t tail;
Expand All @@ -21,18 +24,15 @@ typedef struct __queue_t__ {
// sequence number of last written message
size_t tail_seq;

// number of bytes readable
size_t avail;

// synchronization primitives
pthread_cond_t readable;
pthread_cond_t writeable;
pthread_mutex_t lock;
} queue_t;

/** Initialize a blocking queue *q*, backed by the given buffer *b* of size *s*
/** Initialize a blocking queue *q* of size *s*
*/
void queue_init(queue_t *q, uint8_t *b, size_t s);
void queue_init(queue_t *q, size_t s);

/** Destroy the blocking queue *q*
*/
Expand Down
11 changes: 5 additions & 6 deletions test.c
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include <stdio.h>
#include <unistd.h>

#include "queue.h"

#define BUFFER_SIZE (4096)
#define NUM_THREADS (24)
#define MESSAGES_PER_THREAD (100)
#define BUFFER_SIZE (getpagesize())
#define NUM_THREADS (8)
#define MESSAGES_PER_THREAD (getpagesize() * 2)

void *consumer_loop(void *arg) {
queue_t *q = (queue_t *) arg;
Expand All @@ -29,10 +30,8 @@ void *publisher_loop(void *arg) {

int main(int argc, char *argv[]){

uint8_t buffer[BUFFER_SIZE];

queue_t q;
queue_init(&q, buffer, BUFFER_SIZE);
queue_init(&q, BUFFER_SIZE);

pthread_t publisher;
pthread_t consumers[NUM_THREADS];
Expand Down

0 comments on commit 642f89c

Please sign in to comment.