Skip to content

Commit

Permalink
object-store: allow threaded access to object reading
Browse files Browse the repository at this point in the history
Allow object reading to be performed by multiple threads protecting it
with a set of locks. The locks usage can be toggled with
enable_obj_read_locks() and disable_obj_read_locks(). Currently, the
functions which can be safely called in parallel are:
read_object_file_extended(), repo_read_object_file(),
read_object_file(), read_object_with_reference(), read_object(),
oid_object_info() and oid_object_info_extended().

Probably there are many spots in these functions that could be executed
unlocked (and thus, in parallel). But, for now, we are most interested
in allowing parallel access to zlib inflation. This is one of the
sections where object reading spends most of the time and it's already
thread-safe. So, to take advantage of that, the respective lock is
released when entering it and re-acquired right after. We may refine the
locks to also exploit other possible parallel spots in the future, but
threaded zlib inflation should already give great speedups.

Note that add_delta_base_cache() was also modified to skip adding
already present entries to the cache. This wasn't possible before, but
now it is since phase I and phase III of unpack_entry() may execute
concurrently.

Signed-off-by: Matheus Tavares <matheus.bernardino@usp.br>
  • Loading branch information
matheustavares committed Aug 19, 2019
1 parent dc2e663 commit 84b7ca5
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 6 deletions.
25 changes: 25 additions & 0 deletions object-store.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "list.h"
#include "sha1-array.h"
#include "strbuf.h"
#include "thread-utils.h"

struct object_directory {
struct object_directory *next;
Expand Down Expand Up @@ -228,6 +229,30 @@ int has_loose_object_nonlocal(const struct object_id *);

void assert_oid_type(const struct object_id *oid, enum object_type expect);

/*
* Enabling the object read lock allows multiple threads to safely call the
* following functions in parallel: repo_read_object_file(), read_object_file(),
* read_object_file_extended(), read_object_with_reference(), read_object(),
* oid_object_info() and oid_object_info_extended().
*/
void enable_obj_read_locks(void);
void disable_obj_read_locks(void);

extern int obj_read_use_locks;
extern pthread_mutex_t oid_object_info_mutex;

static inline void oid_object_info_lock(void)
{
if(obj_read_use_locks)
pthread_mutex_lock(&oid_object_info_mutex);
}

static inline void oid_object_info_unlock(void)
{
if(obj_read_use_locks)
pthread_mutex_unlock(&oid_object_info_mutex);
}

struct object_info {
/* Request */
enum object_type *typep;
Expand Down
7 changes: 7 additions & 0 deletions packfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,9 @@ unsigned long get_size_from_delta(struct packed_git *p,
do {
in = use_pack(p, w_curs, curpos, &stream.avail_in);
stream.next_in = in;
oid_object_info_unlock();
st = git_inflate(&stream, Z_FINISH);
oid_object_info_lock();
curpos += stream.next_in - in;
} while ((st == Z_OK || st == Z_BUF_ERROR) &&
stream.total_out < sizeof(delta_head));
Expand Down Expand Up @@ -1468,6 +1470,9 @@ static void add_delta_base_cache(struct packed_git *p, off_t base_offset,
struct delta_base_cache_entry *ent = xmalloc(sizeof(*ent));
struct list_head *lru, *tmp;

if (get_delta_base_cache_entry(p, base_offset))
return;

delta_base_cached += base_size;

list_for_each_safe(lru, tmp, &delta_base_cache_lru) {
Expand Down Expand Up @@ -1597,7 +1602,9 @@ static void *unpack_compressed_entry(struct packed_git *p,
do {
in = use_pack(p, w_curs, curpos, &stream.avail_in);
stream.next_in = in;
oid_object_info_unlock();
st = git_inflate(&stream, Z_FINISH);
oid_object_info_lock();
if (!stream.avail_out)
break; /* the payload is larger than it should be */
curpos += stream.next_in - in;
Expand Down
97 changes: 91 additions & 6 deletions sha1-file.c
Original file line number Diff line number Diff line change
Expand Up @@ -1416,19 +1416,84 @@ static int loose_object_info(struct repository *r,
return (status < 0) ? status : 0;
}

int obj_read_use_locks = 0;
pthread_mutex_t oid_object_info_mutex;

/* Protects the thread-unsafe section of read_object_file_extended() */
static pthread_mutex_t read_object_file_mutex;

static inline void read_object_file_lock(void)
{
if(obj_read_use_locks)
pthread_mutex_lock(&read_object_file_mutex);
}

static inline void read_object_file_unlock(void)
{
if(obj_read_use_locks)
pthread_mutex_unlock(&read_object_file_mutex);
}

/*
* Protects the calls to lookup_replace_object() in read_object_file_extended()
* and do_oid_object_info_extended().
*/
static pthread_mutex_t lookup_replace_mutex;

static inline void lookup_replace_lock(void)
{
if(obj_read_use_locks)
pthread_mutex_lock(&lookup_replace_mutex);
}

static inline void lookup_replace_unlock(void)
{
if(obj_read_use_locks)
pthread_mutex_unlock(&lookup_replace_mutex);
}

void enable_obj_read_locks(void)
{
if (obj_read_use_locks)
return;

obj_read_use_locks = 1;
pthread_mutex_init(&oid_object_info_mutex, NULL);
pthread_mutex_init(&read_object_file_mutex, NULL);
pthread_mutex_init(&lookup_replace_mutex, NULL);
}

void disable_obj_read_locks(void)
{
if (!obj_read_use_locks)
return;

obj_read_use_locks = 0;
pthread_mutex_destroy(&oid_object_info_mutex);
pthread_mutex_destroy(&read_object_file_mutex);
pthread_mutex_destroy(&lookup_replace_mutex);
}


int fetch_if_missing = 1;

int oid_object_info_extended(struct repository *r, const struct object_id *oid,
struct object_info *oi, unsigned flags)
static int do_oid_object_info_extended(struct repository *r,
const struct object_id *oid,
struct object_info *oi, unsigned flags)
{
static struct object_info blank_oi = OBJECT_INFO_INIT;
struct pack_entry e;
int rtype;
const struct object_id *real = oid;
int already_retried = 0;

if (flags & OBJECT_INFO_LOOKUP_REPLACE)

if (flags & OBJECT_INFO_LOOKUP_REPLACE) {
lookup_replace_lock();
real = lookup_replace_object(r, oid);
lookup_replace_unlock();

}

if (is_null_oid(real))
return -1;
Expand Down Expand Up @@ -1501,7 +1566,7 @@ int oid_object_info_extended(struct repository *r, const struct object_id *oid,
rtype = packed_object_info(r, e.p, e.offset, oi);
if (rtype < 0) {
mark_bad_packed_object(e.p, real->hash);
return oid_object_info_extended(r, real, oi, 0);
return do_oid_object_info_extended(r, real, oi, 0);
} else if (oi->whence == OI_PACKED) {
oi->u.packed.offset = e.offset;
oi->u.packed.pack = e.p;
Expand All @@ -1512,6 +1577,17 @@ int oid_object_info_extended(struct repository *r, const struct object_id *oid,
return 0;
}

int oid_object_info_extended(struct repository *r, const struct object_id *oid,
struct object_info *oi, unsigned flags)
{
int ret;
oid_object_info_lock();
ret = do_oid_object_info_extended(r, oid, oi, flags);
oid_object_info_unlock();
return ret;
}


/* returns enum object_type or negative */
int oid_object_info(struct repository *r,
const struct object_id *oid,
Expand Down Expand Up @@ -1576,14 +1652,22 @@ void *read_object_file_extended(struct repository *r,
const struct packed_git *p;
const char *path;
struct stat st;
const struct object_id *repl = lookup_replace ?
lookup_replace_object(r, oid) : oid;
const struct object_id *repl;

if (lookup_replace) {
lookup_replace_lock();
repl = lookup_replace_object(r, oid);
lookup_replace_unlock();
} else {
repl = oid;
}

errno = 0;
data = read_object(r, repl, type, size);
if (data)
return data;

read_object_file_lock();
if (errno && errno != ENOENT)
die_errno(_("failed to read object %s"), oid_to_hex(oid));

Expand All @@ -1599,6 +1683,7 @@ void *read_object_file_extended(struct repository *r,
if ((p = has_packed_and_bad(r, repl->hash)) != NULL)
die(_("packed object %s (stored in %s) is corrupt"),
oid_to_hex(repl), p->pack_name);
read_object_file_unlock();

return NULL;
}
Expand Down

0 comments on commit 84b7ca5

Please sign in to comment.