Skip to content

Commit

Permalink
WIP: ass_cache: experiment with non-blocking optimizations for the LR…
Browse files Browse the repository at this point in the history
…U list

- Instead of adding/promoting items in the LRU list within ass_cache_get, add them to a simpler singly-linked lock-free list of items used this frame
- In ass_cache_cut, add/promote the contents of the list from the previous frame
- Take locks as little as possible (by using double-checked locking patterns, making more things atomic, etc)
- Shard locks by bucket (this should really change to be by thread, via a mechanism suggested by MrSmile)
  • Loading branch information
rcombs committed Jul 31, 2022
1 parent 59da7ff commit cc1e523
Showing 1 changed file with 135 additions and 76 deletions.
211 changes: 135 additions & 76 deletions libass/ass_cache.c
Expand Up @@ -300,36 +300,45 @@ const CacheDesc glyph_metrics_cache_desc = {
};


#if CONFIG_PTHREAD
#define ATOMIC _Atomic
#else
#define ATOMIC
#endif

// Cache data
typedef struct cache_item {
Cache *cache;
const CacheDesc *desc;
struct cache_item *next, **prev;
struct cache_item * ATOMIC next, * ATOMIC *prev;
struct cache_item *queue_next, **queue_prev;
size_t size;
#if CONFIG_PTHREAD
atomic_size_t ref_count;
#else
size_t ref_count;
#endif
struct cache_item * ATOMIC frame_next;
size_t ATOMIC size;
unsigned bucket;
size_t ATOMIC ref_count;
} CacheItem;

#define N_MTX_BUCKETS 16

struct cache {
unsigned buckets;
CacheItem **map;
CacheItem * ATOMIC *map;
CacheItem *queue_first, **queue_last;
CacheItem * ATOMIC frame_first;

const CacheDesc *desc;

size_t cache_size;
unsigned hits;
unsigned misses;
unsigned items;
size_t ATOMIC cache_size;
unsigned ATOMIC hits;
unsigned ATOMIC misses;
unsigned ATOMIC items;

#if CONFIG_PTHREAD
pthread_mutex_t mutex;
pthread_cond_t cond;

pthread_mutex_t bmtx[N_MTX_BUCKETS];
pthread_cond_t bcond[N_MTX_BUCKETS];
#endif
};

Expand Down Expand Up @@ -357,6 +366,7 @@ Cache *ass_cache_create(const CacheDesc *desc)
cache->queue_last = &cache->queue_first;
cache->desc = desc;
cache->map = calloc(cache->buckets, sizeof(CacheItem *));
cache->frame_first = (void*)-1;
if (!cache->map) {
goto fail;
}
Expand All @@ -372,6 +382,11 @@ Cache *ass_cache_create(const CacheDesc *desc)
if (pthread_mutex_init(&cache->mutex, &attr) != 0)
goto faila;

for (int i = 0; i < N_MTX_BUCKETS; i++) {
pthread_mutex_init(&cache->bmtx[i], &attr);
pthread_cond_init(&cache->bcond[i], NULL);
}

if (pthread_cond_init(&cache->cond, NULL) != 0)
goto failm;

Expand Down Expand Up @@ -400,95 +415,125 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)
unsigned bucket = desc->hash_func(key, ASS_HASH_INIT) % cache->buckets;

#if CONFIG_PTHREAD
pthread_mutex_lock(&cache->mutex);
pthread_mutex_t *cmtx = &cache->mutex;
pthread_mutex_t *mtx = &cache->bmtx[bucket % N_MTX_BUCKETS];
pthread_cond_t *cond = &cache->bcond[bucket % N_MTX_BUCKETS];
bool locked = false;
#endif

CacheItem *item = cache->map[bucket];
while (item) {
if (desc->compare_func(key, (char *) item + key_offs)) {
if (!item->queue_prev || item->queue_next) {
if (item->queue_prev) {
item->queue_next->queue_prev = item->queue_prev;
*item->queue_prev = item->queue_next;
} else
item->ref_count++;
*cache->queue_last = item;
item->queue_prev = cache->queue_last;
cache->queue_last = &item->queue_next;
item->queue_next = NULL;
}
cache->hits++;
desc->key_move_func(NULL, key);
item->ref_count++;
CacheItem *stopat = NULL;
#if CONFIG_PTHREAD
CacheItem *prev = NULL;
while ((item = cache->map[bucket]) != prev) {
if (locked)
pthread_mutex_unlock(mtx);
stopat = prev;
prev = item;
#endif
while (item != stopat) {
bool matched = desc->compare_func(key, (char *) item + key_offs);
if (matched) {
item->ref_count++;

if (!item->frame_next) {
CacheItem *next;
do {
next = cache->frame_first;
CacheItem *next2 = next;
if (atomic_compare_exchange_strong(&item->frame_next, &next2, NULL))
break;
} while (!atomic_compare_exchange_weak(&cache->frame_first, &next, item));
}
cache->hits++;

#if CONFIG_PTHREAD
while (!item->size)
pthread_cond_wait(&cache->cond, &cache->mutex);
if (!item->size) {
pthread_mutex_lock(mtx);

while (!item->size)
pthread_cond_wait(cond, mtx);

pthread_mutex_unlock(&cache->mutex);
pthread_mutex_unlock(mtx);
}
#endif

return (char *) item + CACHE_ITEM_SIZE;
desc->key_move_func(NULL, key);

return (char *) item + CACHE_ITEM_SIZE;
}
item = item->next;
}
item = item->next;
#if CONFIG_PTHREAD
pthread_mutex_lock(mtx);
locked = true;
}
cache->misses++;
#endif

item = malloc(key_offs + desc->key_size);
if (!item) {
desc->key_move_func(NULL, key);
#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
pthread_mutex_unlock(mtx);
#endif
desc->key_move_func(NULL, key);
return NULL;
}
void *new_key = (char *) item + key_offs;
if (!desc->key_move_func(new_key, key)) {
free(item);
#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
pthread_mutex_unlock(mtx);
#endif
free(item);
return NULL;
}

item->cache = cache;
item->desc = desc;
item->size = 0;
item->ref_count = 2;
item->queue_next = NULL;
item->queue_prev = (void*)-1;
item->bucket = bucket;

CacheItem **bucketptr = &cache->map[bucket];
CacheItem * ATOMIC *bucketptr = &cache->map[bucket];
if (*bucketptr)
(*bucketptr)->prev = &item->next;
item->prev = bucketptr;
item->next = *bucketptr;
*bucketptr = item;

*cache->queue_last = item;
item->queue_prev = cache->queue_last;
cache->queue_last = &item->queue_next;
item->queue_next = NULL;
item->ref_count = 2;

cache->items++;

#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
pthread_mutex_unlock(mtx);
#endif

void *value = (char *) item + CACHE_ITEM_SIZE;
size_t size = desc->construct_func(new_key, value, priv);
assert(size);

cache->cache_size += size;
cache->items++;
cache->misses++;

CacheItem *next;
do {
next = cache->frame_first;
item->frame_next = next;
} while (!atomic_compare_exchange_weak(&cache->frame_first, &next, item));

#if CONFIG_PTHREAD
pthread_mutex_lock(&cache->mutex);
pthread_mutex_lock(cmtx);
#endif

item->size = size;
#if CONFIG_PTHREAD
pthread_mutex_unlock(cmtx);
pthread_mutex_lock(mtx);
#endif

cache->cache_size += size;
item->size = size;

#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
pthread_cond_broadcast(&cache->cond);
pthread_mutex_unlock(mtx);
pthread_cond_broadcast(cond);
#endif

return value;
Expand Down Expand Up @@ -547,13 +592,19 @@ void ass_cache_dec_ref(void *value)
Cache *cache = item->cache;

#if CONFIG_PTHREAD
pthread_mutex_lock(&cache->mutex);
pthread_mutex_t *mtx = &cache->bmtx[item->bucket % N_MTX_BUCKETS];
pthread_mutex_lock(mtx);
#endif

if (item->next)
item->next->prev = item->prev;
*item->prev = item->next;

#if CONFIG_PTHREAD
pthread_mutex_unlock(mtx);
pthread_mutex_lock(&cache->mutex);
#endif

cache->cache_size -= item->size;
cache->items--;

Expand All @@ -564,17 +615,29 @@ void ass_cache_dec_ref(void *value)

void ass_cache_cut(Cache *cache, size_t max_size)
{
#if CONFIG_PTHREAD
pthread_mutex_lock(&cache->mutex);
#endif
CacheItem *next;
for (CacheItem *item = cache->frame_first; item && item != (void*)-1; item = next) {
if (!item->queue_prev || item->queue_prev == (void*)-1 || item->queue_next) {
if (item->queue_prev && item->queue_prev != (void*)-1) {
item->queue_next->queue_prev = item->queue_prev;
*item->queue_prev = item->queue_next;
} else if (!item->queue_prev)
item->ref_count++;
*cache->queue_last = item;
item->queue_prev = cache->queue_last;
cache->queue_last = &item->queue_next;
item->queue_next = NULL;
}

if (cache->cache_size <= max_size) {
#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
#endif
return;
next = item->frame_next;
item->frame_next = NULL;
}

cache->frame_first = (void*)-1;

if (cache->cache_size <= max_size)
return;

do {
CacheItem *item = cache->queue_first;
if (!item)
Expand All @@ -599,19 +662,11 @@ void ass_cache_cut(Cache *cache, size_t max_size)
cache->queue_first->queue_prev = &cache->queue_first;
else
cache->queue_last = &cache->queue_first;

#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
#endif
}

void ass_cache_stats(Cache *cache, size_t *size, unsigned *hits,
unsigned *misses, unsigned *count)
{
#if CONFIG_PTHREAD
pthread_mutex_lock(&cache->mutex);
#endif

if (size)
*size = cache->cache_size;
if (hits)
Expand All @@ -620,10 +675,6 @@ void ass_cache_stats(Cache *cache, size_t *size, unsigned *hits,
*misses = cache->misses;
if (count)
*count = cache->items;

#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
#endif
}

void ass_cache_empty(Cache *cache)
Expand All @@ -633,6 +684,10 @@ void ass_cache_empty(Cache *cache)
#endif

for (int i = 0; i < cache->buckets; i++) {
#if CONFIG_PTHREAD
pthread_mutex_t *mtx = &cache->bmtx[i % N_MTX_BUCKETS];
pthread_mutex_lock(mtx);
#endif
CacheItem *item = cache->map[i];
while (item) {
assert(item->size);
Expand All @@ -652,11 +707,15 @@ void ass_cache_empty(Cache *cache)
item = next;
}
cache->map[i] = NULL;
#if CONFIG_PTHREAD
pthread_mutex_unlock(mtx);
#endif
}

cache->queue_first = NULL;
cache->queue_last = &cache->queue_first;
cache->hits = cache->misses = cache->cache_size = 0;
cache->hits = cache->misses = 0;
cache->cache_size = 0;

#if CONFIG_PTHREAD
pthread_mutex_unlock(&cache->mutex);
Expand Down

0 comments on commit cc1e523

Please sign in to comment.