Skip to content

Commit

Permalink
Add a concurrent map
Browse files Browse the repository at this point in the history
  • Loading branch information
jserv committed Aug 26, 2023
1 parent 388d590 commit 980a953
Show file tree
Hide file tree
Showing 16 changed files with 1,282 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ ForEachMacros:
- rb_list_foreach_safe
- EV_FOREACH
- LIST_FOREACH
- LIST_FOREACH_POP
- MAP_FOREACH
- MAP_FOREACH_WITH_HASH
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ purpose of these programs is to be illustrative and educational.
* [Synchronization](https://en.wikipedia.org/wiki/Synchronization_(computer_science))
- [mcslock](mcslock/): An MCS lock implementation.
- [seqlock](seqlock/): A seqlock (sequence lock) implementation.
- [hp\_list](hp_list)/: A concurrent linked list utilizing Hazard Pointers.
- [hp\_list](hp_list/)/: A concurrent linked list utilizing Hazard Pointers.
- [rcu-list](rcu-list/): A concurrent linked list utilizing the simplified RCU algorithm.
- [qsbr](qsbr/): An implementation of Quiescent state based reclamation (QSBR).
- [list-move](list-move/): Evaluation of two concurrent linked lists: QSBR and lock-based.
- [rcu\_queue](rcu_queue/): An efficient concurrent queue based on QSBR.
- [thread-rcu](thread-rcu/): A Linux Kernel style thread-based simple RCU.
- [cmap](cmap/): A concurrent map implementation based on RCU.
* Applications
- [httpd](httpd/): A multi-threaded web server.
- [map-reduce](map-reduce/): word counting using MapReduce.
Expand Down
7 changes: 7 additions & 0 deletions cmap/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
all:
gcc -Wall -O2 -o test-cmap \
cmap.c random.c rcu.c test-cmap.c \
-lpthread

clean:
rm -f test-cmap
262 changes: 262 additions & 0 deletions cmap/cmap.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
#include "cmap.h"
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include "locks.h"
#include "util.h"

#define MAP_INITIAL_SIZE 512

struct cmap_entry {
struct cmap_node *first;
};

struct cmap_impl {
struct cmap_entry *arr; /* Map entreis */
size_t count; /* Number of elements in this */
size_t max; /* Capacity of this */
size_t utilization; /* Number of utialized entries */
struct cond fence; /* Prevent new reads while old still exist */
};

struct cmap_impl_pair {
struct cmap_impl *old, *new;
};

static void cmap_expand(struct cmap *cmap);
static void cmap_expand_callback(void *args);
static void cmap_destroy_callback(void *args);
static size_t cmap_count__(const struct cmap *cmap);
static void cmap_insert__(struct cmap_impl *, struct cmap_node *);

/* Only a single concurrent writer to cmap is allowed */
static void cmap_insert__(struct cmap_impl *impl, struct cmap_node *node)
{
size_t i = node->hash & impl->max;
node->next = impl->arr[i].first;
if (!impl->arr[i].first)
impl->utilization++;
impl->arr[i].first = node;
}

static void cmap_destroy_callback(void *args)
{
struct cmap_impl *impl = (struct cmap_impl *) args;
cond_destroy(&impl->fence);
free(impl);
}

static struct cmap_impl *cmap_impl_init(size_t entry_num)
{
size_t size =
sizeof(struct cmap_impl) + sizeof(struct cmap_entry) * entry_num;
struct cmap_impl *impl = xmalloc(size);
impl->max = entry_num - 1;
impl->count = 0;
impl->utilization = 0;
impl->arr = OBJECT_END(struct cmap_entry *, impl);
cond_init(&impl->fence);

for (int i = 0; i < entry_num; ++i)
impl->arr[i].first = NULL;
return impl;
}

static void cmap_expand_callback(void *args)
{
struct cmap_impl_pair *pair = (struct cmap_impl_pair *) args;
struct cmap_node *c, *n;

/* Rehash */
for (int i = 0; i <= pair->old->max; i++) {
for (c = pair->old->arr[i].first; c; c = n) {
n = c->next;
cmap_insert__(pair->new, c);
}
}

/* Remove fence */
cond_unlock(&pair->new->fence);
free(pair->old);
free(pair);
}

/* Only a single concurrent writer to cmap is allowed */
static void cmap_expand(struct cmap *cmap)
{
struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl_pair *pair = xmalloc(sizeof(*pair));
pair->old = rcu_get(impl_rcu, struct cmap_impl *);

/* Do not allow two expansions in parallel */
/* Prevent new reads while old still exist */
while (cond_is_locked(&pair->old->fence)) {
rcu_release(impl_rcu);
cond_wait(&pair->old->fence);
impl_rcu = rcu_acquire(cmap->impl->p);
pair->old = rcu_get(impl_rcu, struct cmap_impl *);
}

/* Initiate new rehash array */
pair->new = cmap_impl_init((pair->old->max + 1) * 2);
pair->new->count = pair->old->count;

/* Prevent new reads/updates while old reads still exist */
cond_lock(&pair->new->fence);

rcu_postpone(impl_rcu, cmap_expand_callback, pair);
rcu_release(impl_rcu);
rcu_set(cmap->impl->p, pair->new);
}

void cmap_init(struct cmap *cmap)
{
struct cmap_impl *impl = cmap_impl_init(MAP_INITIAL_SIZE);
cmap->impl = xmalloc(sizeof(*cmap->impl));
rcu_init(cmap->impl->p, impl);
}

void cmap_destroy(struct cmap *cmap)
{
if (!cmap)
return;

struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
rcu_postpone(impl_rcu, cmap_destroy_callback, impl);
rcu_release(impl_rcu);
rcu_destroy(impl_rcu);
free(cmap->impl);
}

static size_t cmap_count__(const struct cmap *cmap)
{
struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
size_t count = impl->count;
rcu_release(impl_rcu);
return count;
}

double cmap_utilization(const struct cmap *cmap)
{
struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
double res = (double) impl->utilization / (impl->max + 1);
rcu_release(impl_rcu);
return res;
}

size_t cmap_size(const struct cmap *cmap)
{
return cmap_count__(cmap);
}

/* Only one concurrent writer */
size_t cmap_insert(struct cmap *cmap, struct cmap_node *node, uint32_t hash)
{
node->hash = hash;

struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
cmap_insert__(impl, node);
impl->count++;
size_t count = impl->count;
bool expand = impl->count > impl->max * 2;
rcu_release(impl_rcu);

if (expand)
cmap_expand(cmap);
return count;
}

/* Only one concurrent writer */
size_t cmap_remove(struct cmap *cmap, struct cmap_node *node)
{
struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
size_t pos = node->hash & impl->max;
struct cmap_entry *cmap_entry = &impl->arr[pos];
size_t count = impl->count;

struct cmap_node **node_p = &cmap_entry->first;
while (*node_p) {
if (*node_p == node) {
*node_p = node->next;
count--;
break;
}
node_p = &(*node_p)->next;
}
impl->count = count;
rcu_release(impl_rcu);
return count;
}

struct cmap_state cmap_state_acquire(struct cmap *cmap)
{
struct cmap_state state = {.p = rcu_acquire(cmap->impl->p)};
return state;
}

void cmap_state_release(struct cmap_state state)
{
rcu_release(state.p);
}

struct cmap_cursor cmap_find__(struct cmap_state state, uint32_t hash)
{
struct cmap_impl *impl = rcu_get(state.p, struct cmap_impl *);

/* Prevent new reads while old still exist */
while (cond_is_locked(&impl->fence))
cond_wait(&impl->fence);

struct cmap_cursor cursor = {
.entry_idx = hash & impl->max,
.node = impl->arr[hash & impl->max].first,
.next = NULL,
.accross_entries = false,
};
if (cursor.node)
cursor.next = cursor.node->next;
return cursor;
}

struct cmap_cursor cmap_start__(struct cmap_state state)
{
struct cmap_cursor cursor = cmap_find__(state, 0);
cursor.accross_entries = true;
/* Don't start with an empty node */
if (!cursor.node)
cmap_next__(state, &cursor);
return cursor;
}

void cmap_next__(struct cmap_state state, struct cmap_cursor *cursor)
{
struct cmap_impl *impl = rcu_get(state.p, struct cmap_impl *);

cursor->node = cursor->next;
if (cursor->node) {
cursor->next = cursor->node->next;
return;
}

/* We got to the end of the current entry. Try to find
* a valid node in next entries
*/
while (cursor->accross_entries) {
cursor->entry_idx++;
if (cursor->entry_idx > impl->max)
break;
cursor->node = impl->arr[cursor->entry_idx].first;
if (cursor->node) {
cursor->next = cursor->node->next;
return;
}
}

cursor->node = NULL;
cursor->next = NULL;
}
83 changes: 83 additions & 0 deletions cmap/cmap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#pragma once

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>

#include "rcu.h"
#include "util.h"

/* Concurrent cmap.
* It supports multiple concurrent readers and a single concurrent writer.
* To iterate, the user needs to acquire a "cmap state" (snapshot).
*/
struct cmap_node {
struct cmap_node *next; /* Next node with same hash. */
uint32_t hash;
};

/* Used for going over all cmap nodes */
struct cmap_cursor {
struct cmap_node *node; /* Pointer to cmap_node */
struct cmap_node *next; /* Pointer to cmap_node */
size_t entry_idx; /* Current entry */
bool accross_entries; /* Hold cursor accross cmap entries */
};

/* Map state (snapshot), must be acquired before cmap iteration, and released
* afterwards.
*/
struct cmap_state {
struct rcu *p;
};

/* Concurrent hash cmap. */
struct cmap {
struct cmap_state *impl;
};

/* Initialization. */
void cmap_init(struct cmap *);
void cmap_destroy(struct cmap *);

/* Counters. */
size_t cmap_size(const struct cmap *);
double cmap_utilization(const struct cmap *cmap);

/* Insertion and deletion. Return the current count after the operation. */
size_t cmap_insert(struct cmap *, struct cmap_node *, uint32_t hash);
size_t cmap_remove(struct cmap *, struct cmap_node *);

/* Acquire/release cmap concurrent state. Use with iteration macros.
* Each acquired state must be released. */
struct cmap_state cmap_state_acquire(struct cmap *cmap);
void cmap_state_release(struct cmap_state state);

/* Iteration macros. Usage example:
*
* struct {
* struct cmap_node node;
* int value;
* } *data;
* struct cmap_state *cmap_state = cmap_state_acquire(&cmap);
* MAP_FOREACH(data, node, cmap_state) {
* ...
* }
* cmap_state_release(cmap_state);
*/
#define MAP_FOREACH(NODE, MEMBER, STATE) \
MAP_FOREACH__(NODE, MEMBER, MAP, cmap_start__(STATE), STATE)

#define MAP_FOREACH_WITH_HASH(NODE, MEMBER, HASH, STATE) \
MAP_FOREACH__(NODE, MEMBER, MAP, cmap_find__(STATE, HASH), STATE)

/* Ieration, private methods. Use iteration macros instead */
struct cmap_cursor cmap_start__(struct cmap_state state);
struct cmap_cursor cmap_find__(struct cmap_state state, uint32_t hash);
void cmap_next__(struct cmap_state state, struct cmap_cursor *cursor);

#define MAP_FOREACH__(NODE, MEMBER, MAP, START, STATE) \
for (struct cmap_cursor cursor_ = START; \
(cursor_.node ? (INIT_CONTAINER(NODE, cursor_.node, MEMBER), true) \
: false); \
cmap_next__(STATE, &cursor_))
Loading

0 comments on commit 980a953

Please sign in to comment.