Skip to content

Commit

Permalink
ass_cache: shard promotion queue and sync primitives per-thread
Browse files Browse the repository at this point in the history
  • Loading branch information
rcombs committed May 10, 2024
1 parent 3394def commit 14e0359
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 65 deletions.
186 changes: 142 additions & 44 deletions libass/ass_cache.c
Expand Up @@ -378,29 +378,45 @@ typedef struct cache_item {
const CacheDesc *desc;
struct cache_item *_Atomic next, *_Atomic *prev;
struct cache_item *_Atomic queue_next, *_Atomic *queue_prev;
struct cache_item *_Atomic promote_next;
struct cache_item *promote_next;
_Atomic uintptr_t size, ref_count;
ass_hashcode hash;

_Atomic uintptr_t last_used_frame;

#if ENABLE_THREADS
pthread_mutex_t mutex;
pthread_cond_t cond;
struct cache_client *creating_client;
#endif
} CacheItem;

struct cache {
unsigned buckets;
CacheItem *_Atomic *map;
CacheItem *_Atomic queue_first, *_Atomic *_Atomic queue_last;
CacheItem *_Atomic promote_first;

const CacheDesc *desc;

_Atomic uintptr_t cache_size;

uintptr_t cur_frame;

size_t n_clients;
struct cache_client **clients;

#if ENABLE_THREADS
pthread_mutex_t mutex;
#endif
};

struct cache_client {
Cache *cache;
CacheItem *promote_first;
size_t idx;

#if ENABLE_THREADS
pthread_mutex_t mutex;
pthread_cond_t cond;
#endif
};

#define CACHE_ALIGN 8
Expand Down Expand Up @@ -432,11 +448,95 @@ Cache *ass_cache_create(const CacheDesc *desc)
return NULL;
}

#if ENABLE_THREADS
if (pthread_mutex_init(&cache->mutex, NULL) != 0) {
free(cache->map);
free(cache);
return NULL;
}
#endif

return cache;
}

void *ass_cache_get(Cache *cache, void *key, void *priv)
CacheClient *ass_cache_client_create(Cache *cache)
{
CacheClient *client = NULL;

#if ENABLE_THREADS
pthread_mutex_lock(&cache->mutex);
#endif

size_t idx;

for (idx = 0; idx < cache->n_clients; idx++) {
if (!cache->clients[idx])
break;
}

if (idx >= cache->n_clients) {
if (!ASS_REALLOC_ARRAY(cache->clients, idx + 1))
goto fail;
}

client = calloc(1, sizeof(*client));
if (!client)
goto fail;

client->cache = cache;
client->idx = idx;

#if ENABLE_THREADS
if (pthread_mutex_init(&client->mutex, NULL) != 0) {
free(client);
goto fail;
}

if (pthread_cond_init(&client->cond, NULL) != 0) {
pthread_cond_destroy(&client->cond);
free(client);
goto fail;
}
#endif

cache->clients[idx] = client;
if (idx >= cache->n_clients)
cache->n_clients = idx + 1;

fail:
#if ENABLE_THREADS
pthread_mutex_unlock(&cache->mutex);
#endif

return client;
}

void ass_cache_client_done(CacheClient *client)
{
if (!client)
return;

Cache *cache = client->cache;

#if ENABLE_THREADS
pthread_mutex_lock(&cache->mutex);
#endif

cache->clients[client->idx] = NULL;

#if ENABLE_THREADS
pthread_mutex_destroy(&client->mutex);
pthread_cond_destroy(&client->cond);

pthread_mutex_unlock(&cache->mutex);
#endif

free(client);
}

void *ass_cache_get(CacheClient *client, void *key, void *priv)
{
Cache *cache = client->cache;
const CacheDesc *desc = cache->desc;
size_t key_offs = CACHE_ITEM_SIZE + align_cache(desc->value_size);
ass_hashcode hash = desc->hash_func(key, ASS_HASH_INIT);
Expand All @@ -459,8 +559,8 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)
uintptr_t last_used = atomic_exchange_explicit(&item->last_used_frame, cache->cur_frame, memory_order_consume);

if (last_used != cache->cur_frame) {
CacheItem *old_first = atomic_exchange_explicit(&cache->promote_first, item, memory_order_acq_rel);
atomic_store_explicit(&item->promote_next, old_first, memory_order_relaxed);
item->promote_next = client->promote_first;
client->promote_first = item;
}
}

Expand All @@ -471,18 +571,15 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)

#if ENABLE_THREADS
if (!atomic_load_explicit(&item->size, memory_order_acquire)) {
pthread_mutex_lock(&item->mutex);
pthread_mutex_lock(&item->creating_client->mutex);

while (!item->size)
pthread_cond_wait(&item->cond, &item->mutex);
pthread_cond_wait(&item->creating_client->cond, &item->creating_client->mutex);

pthread_mutex_unlock(&item->mutex);
pthread_mutex_unlock(&item->creating_client->mutex);
}

if (new_item) {
pthread_mutex_destroy(&new_item->mutex);
pthread_cond_destroy(&new_item->cond);

free(new_item);
}
#endif
Expand All @@ -503,23 +600,10 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)

new_key = (char *) new_item + key_offs;
if (!desc->key_move_func(new_key, key)) {
#if ENABLE_THREADS
fail_move:
#endif
free(new_item);
goto fail_item;
}

#if ENABLE_THREADS
if (pthread_cond_init(&new_item->cond, NULL) != 0)
goto fail_move;

if (pthread_mutex_init(&new_item->mutex, NULL) != 0) {
pthread_cond_destroy(&new_item->cond);
goto fail_move;
}
#endif

key = new_key;

new_item->desc = desc;
Expand All @@ -529,6 +613,9 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)
new_item->ref_count = 1;
new_item->queue_next = NULL;
new_item->queue_prev = NULL;
#if ENABLE_THREADS
new_item->creating_client = client;
#endif
}

new_item->next = start;
Expand All @@ -554,14 +641,14 @@ void *ass_cache_get(Cache *cache, void *key, void *priv)
atomic_fetch_add_explicit(&cache->cache_size, size + (size == 1 ? 0 : CACHE_ITEM_SIZE), memory_order_relaxed);

#if ENABLE_THREADS
pthread_mutex_lock(&item->mutex);
pthread_mutex_lock(&client->mutex);
#endif

item->size = size;

#if ENABLE_THREADS
pthread_mutex_unlock(&item->mutex);
pthread_cond_broadcast(&item->cond);
pthread_mutex_unlock(&client->mutex);
pthread_cond_broadcast(&client->cond);
#endif

return value;
Expand All @@ -579,10 +666,6 @@ static inline void destroy_item(const CacheDesc *desc, CacheItem *item)
assert(!item->next && !item->prev);
char *value = (char *) item + CACHE_ITEM_SIZE;
desc->destruct_func(value + align_cache(desc->value_size), value);
#if ENABLE_THREADS
pthread_mutex_destroy(&item->mutex);
pthread_cond_destroy(&item->cond);
#endif
free(item);
}

Expand Down Expand Up @@ -612,19 +695,25 @@ void ass_cache_dec_ref(void *value)

void ass_cache_cut(Cache *cache, size_t max_size)
{
while (cache->promote_first) {
CacheItem *item = cache->promote_first;
for (size_t i = 0; i < cache->n_clients; i++) {
CacheClient *client = cache->clients[i];
if (!client)
continue;

*item->queue_prev = item->queue_next;
if (item->queue_next)
item->queue_next->queue_prev = item->queue_prev;
item->queue_next = NULL;
while (client->promote_first) {
CacheItem *item = client->promote_first;

*item->queue_prev = item->queue_next;
if (item->queue_next)
item->queue_next->queue_prev = item->queue_prev;
item->queue_next = NULL;

item->queue_prev = cache->queue_last;
*cache->queue_last = item;
cache->queue_last = &item->queue_next;
item->queue_prev = cache->queue_last;
*cache->queue_last = item;
cache->queue_last = &item->queue_next;

cache->promote_first = item->promote_next;
client->promote_first = item->promote_next;
}
}

while (cache->cache_size > max_size && cache->queue_first) {
Expand Down Expand Up @@ -690,14 +779,23 @@ void ass_cache_empty(Cache *cache)

cache->queue_first = NULL;
cache->queue_last = &cache->queue_first;
cache->promote_first = NULL;
cache->cache_size = 0;

for (size_t i = 0; i < cache->n_clients; i++) {
CacheClient *client = cache->clients[i];
if (client)
client->promote_first = NULL;
}
}

void ass_cache_done(Cache *cache)
{
ass_cache_empty(cache);
free(cache->map);
free(cache->clients);
#if ENABLE_THREADS
pthread_mutex_destroy(&cache->mutex);
#endif
free(cache);
}

Expand Down
5 changes: 4 additions & 1 deletion libass/ass_cache.h
Expand Up @@ -26,6 +26,7 @@
#include "ass_bitmap.h"

typedef struct cache Cache;
typedef struct cache_client CacheClient;
typedef uint64_t ass_hashcode;

// cache values
Expand Down Expand Up @@ -97,7 +98,9 @@ typedef struct
} CacheDesc;

Cache *ass_cache_create(const CacheDesc *desc);
void *ass_cache_get(Cache *cache, void *key, void *priv);
CacheClient *ass_cache_client_create(Cache *cache);
void ass_cache_client_done(CacheClient *client);
void *ass_cache_get(CacheClient *cache, void *key, void *priv);
void *ass_cache_key(void *value);
void ass_cache_inc_ref(void *value);
void ass_cache_dec_ref(void *value);
Expand Down
4 changes: 2 additions & 2 deletions libass/ass_font.c
Expand Up @@ -479,9 +479,9 @@ static int add_face(ASS_FontSelector *fontsel, ASS_Font *font, uint32_t ch)
/**
* \brief Create a new ASS_Font according to "desc" argument
*/
ASS_Font *ass_font_new(ASS_Renderer *render_priv, ASS_FontDesc *desc)
ASS_Font *ass_font_new(struct render_context *context, ASS_FontDesc *desc)
{
ASS_Font *font = ass_cache_get(render_priv->cache.font_cache, desc, render_priv);
ASS_Font *font = ass_cache_get(context->cache.font_cache, desc, context->renderer);
if (!font)
return NULL;
if (font->library)
Expand Down
2 changes: 1 addition & 1 deletion libass/ass_font.h
Expand Up @@ -54,7 +54,7 @@ struct ass_font {
};

void ass_charmap_magic(ASS_Library *library, FT_Face face);
ASS_Font *ass_font_new(ASS_Renderer *render_priv, ASS_FontDesc *desc);
ASS_Font *ass_font_new(struct render_context *context, ASS_FontDesc *desc);
void ass_face_set_size(FT_Face face, double size);
int ass_face_get_weight(FT_Face face);
void ass_font_get_asc_desc(ASS_Font *font, int face_index,
Expand Down
2 changes: 1 addition & 1 deletion libass/ass_parse.c
Expand Up @@ -111,7 +111,7 @@ void ass_update_font(RenderContext *state)
val = 0; // normal
desc.italic = val;

state->font = ass_font_new(state->renderer, &desc);
state->font = ass_font_new(state, &desc);
}

/**
Expand Down

0 comments on commit 14e0359

Please sign in to comment.