Permalink
Browse files

allow multiple concurrent writers

  • Loading branch information...
1 parent 8b649ef commit f0defbfd4c34acf971f5ff1d1e95d65bd848707d @wmorgan committed with twilliam Apr 1, 2012
Showing with 136 additions and 52 deletions.
  1. +135 −51 index.c
  2. +1 −1 lock.c
View
186 index.c
@@ -14,6 +14,25 @@ int wp_index_exists(const char* pathname_base) {
return wp_segment_exists(buf);
}
+RAISING_STATIC(grab_writelock(wp_index* index)) {
+ index_info* ii = MMAP_OBJ(index->indexinfo, index_info);
+ RELAY_ERROR(wp_lock_grab(&ii->lock, WP_LOCK_WRITELOCK));
+ return NO_ERROR;
+}
+
+RAISING_STATIC(grab_readlock(wp_index* index)) {
+ index_info* ii = MMAP_OBJ(index->indexinfo, index_info);
+ RELAY_ERROR(wp_lock_grab(&ii->lock, WP_LOCK_READLOCK));
+ return NO_ERROR;
+}
+
+RAISING_STATIC(release_lock(wp_index* index)) {
+ index_info* ii = MMAP_OBJ(index->indexinfo, index_info);
+ RELAY_ERROR(wp_lock_release(&ii->lock));
+ return NO_ERROR;
+}
+
+
RAISING_STATIC(index_info_init(index_info* ii, uint32_t index_version)) {
ii->index_version = index_version;
ii->num_segments = 0;
@@ -52,44 +71,41 @@ wp_error* wp_index_create(wp_index** indexptr, const char* pathname_base) {
return NO_ERROR;
}
-RAISING_STATIC(ensure_num_segments(wp_index* index)) {
+// increases the index->segments array until we have enough
+// space to represent index->num_segments
+RAISING_STATIC(ensure_segment_pointer_fit(wp_index* index)) {
if(index->num_segments >= index->sizeof_segments) {
- index->sizeof_segments *= 2;
+ if(index->sizeof_segments == 0) index->sizeof_segments = 1; // lame
+ while(index->sizeof_segments < index->num_segments) index->sizeof_segments *= 2; // lame
index->segments = realloc(index->segments, sizeof(wp_segment) * index->sizeof_segments);
index->docid_offsets = realloc(index->docid_offsets, sizeof(uint64_t) * index->sizeof_segments);
if(index->segments == NULL) RAISE_ERROR("oom");
+ if(index->segments == NULL) RAISE_ERROR("oom");
}
return NO_ERROR;
}
-wp_error* wp_index_load(wp_index** indexptr, const char* pathname_base) {
+// ensures that we know about all segments. should be wrapped
+// in a global read mutex to prevent creation.
+RAISING_STATIC(ensure_all_segments(wp_index* index)) {
char buf[PATH_BUF_SIZE];
- wp_index* index = *indexptr = malloc(sizeof(wp_index));
- snprintf(buf, PATH_BUF_SIZE, "%s.ii", pathname_base);
- RELAY_ERROR(mmap_obj_load(&index->indexinfo, "wp/indexinfo", buf));
- RELAY_ERROR(index_info_validate(MMAP_OBJ(index->indexinfo, index_info), INDEX_VERSION));
-
index_info* ii = MMAP_OBJ(index->indexinfo, index_info);
+ if(ii->num_segments < index->num_segments) RAISE_ERROR("invalid value for num_segments: %u vs %u", index->num_segments, ii->num_segments);
+ if(ii->num_segments == index->num_segments) return NO_ERROR;
- index->pathname_base = pathname_base;
- index->open = 1;
+ // otherwise, we need to load some more segments
+ uint16_t old_num_segments = index->num_segments;
index->num_segments = ii->num_segments;
- index->sizeof_segments = ii->num_segments;
- index->segments = calloc(ii->num_segments, sizeof(wp_segment));
- index->docid_offsets = malloc(sizeof(uint64_t));
+ RELAY_ERROR(ensure_segment_pointer_fit(index));
- // load all the segments we can
- for(int i = 0; i < index->num_segments; i++) {
- snprintf(buf, PATH_BUF_SIZE, "%s%d", pathname_base, i);
- if(!wp_segment_exists(buf)) RAISE_ERROR("cannot load segment %s", buf);
-
- RELAY_ERROR(ensure_num_segments(index));
- DEBUG("loading segment %s", buf);
+ for(uint16_t i = old_num_segments; i < index->num_segments; i++) {
+ snprintf(buf, PATH_BUF_SIZE, "%s%u", index->pathname_base, i);
+ DEBUG("trying to loading segment %u from %s", i, buf);
RELAY_ERROR(wp_segment_load(&index->segments[i], buf));
- if(i == 0)
- index->docid_offsets[i] = 0;
+
+ if(i == 0) index->docid_offsets[i] = 0;
else {
// segments return docids 1 through N, so the num_docs in a segment is
// also the max document id
@@ -101,6 +117,26 @@ wp_error* wp_index_load(wp_index** indexptr, const char* pathname_base) {
return NO_ERROR;
}
+wp_error* wp_index_load(wp_index** indexptr, const char* pathname_base) {
+ char buf[PATH_BUF_SIZE];
+
+ wp_index* index = *indexptr = malloc(sizeof(wp_index));
+ snprintf(buf, PATH_BUF_SIZE, "%s.ii", pathname_base);
+ RELAY_ERROR(mmap_obj_load(&index->indexinfo, "wp/indexinfo", buf));
+ RELAY_ERROR(index_info_validate(MMAP_OBJ(index->indexinfo, index_info), INDEX_VERSION));
+
+ index->pathname_base = pathname_base;
+ index->open = 1;
+ index->num_segments = 0;
+ index->sizeof_segments = 0;
+ index->segments = NULL;
+ index->docid_offsets = NULL;
+
+ RELAY_ERROR(ensure_all_segments(index));
+
+ return NO_ERROR;
+}
+
// we have two special values at our disposal to mark where we are in
// the sequence of segments
#define SEGMENT_UNINITIALIZED WP_MAX_SEGMENTS
@@ -116,6 +152,12 @@ wp_error* wp_index_setup_query(wp_index* index, wp_query* query) {
// can be called multiple times to resume
wp_error* wp_index_run_query(wp_index* index, wp_query* query, uint32_t max_num_results, uint32_t* num_results, uint64_t* results) {
*num_results = 0;
+
+ // make sure we have know about all segments (one could've been added by a writer)
+ RELAY_ERROR(grab_readlock(index));
+ RELAY_ERROR(ensure_all_segments(index));
+ RELAY_ERROR(release_lock(index));
+
if(index->num_segments == 0) return NO_ERROR;
if(query->segment_idx == SEGMENT_UNINITIALIZED) {
@@ -196,42 +238,70 @@ wp_error* wp_index_teardown_query(wp_index* index, wp_query* query) {
return NO_ERROR;
}
-wp_error* wp_index_add_entry(wp_index* index, wp_entry* entry, uint64_t* doc_id) {
- int success;
- wp_segment* seg = &index->segments[index->num_segments - 1];
+RAISING_STATIC(get_and_writelock_last_segment(wp_index* index, wp_entry* entry, wp_segment** returned_seg)) {
+ // assume we have a writelock on the index object here, so that no one can
+ // add segments while we're doing this stuff.
- // first, ensure we have enough space in the current segment
- uint32_t postings_bytes;
+ int success;
+ RELAY_ERROR(ensure_all_segments(index)); // make sure we know about all segments
+ wp_segment* seg = &index->segments[index->num_segments - 1]; // get last segment
+ RELAY_ERROR(wp_segment_grab_writelock(seg)); // grab the writelock
+ uint32_t postings_bytes; // calculate how much space we'll need to fit this entry in there
RELAY_ERROR(wp_entry_sizeof_postings_region(entry, seg, &postings_bytes));
- RELAY_ERROR(wp_segment_grab_writelock(seg));
RELAY_ERROR(wp_segment_ensure_fit(seg, postings_bytes, 0, &success));
- RELAY_ERROR(wp_segment_release_writelock(seg));
-
- // if not, we need to open a new one
- if(!success) {
- DEBUG("segment %d is full, loading a new one", index->num_segments - 1);
- char buf[PATH_BUF_SIZE];
- snprintf(buf, PATH_BUF_SIZE, "%s%d", index->pathname_base, index->num_segments);
- RELAY_ERROR(ensure_num_segments(index));
- RELAY_ERROR(wp_segment_create(&index->segments[index->num_segments], buf));
- index->num_segments++;
-
- // set the docid_offset
- segment_info* prevsi = MMAP_OBJ(index->segments[index->num_segments - 2].seginfo, segment_info);
- index->docid_offsets[index->num_segments - 1] = prevsi->num_docs + index->docid_offsets[index->num_segments - 2];
-
- seg = &index->segments[index->num_segments - 1];
- DEBUG("loaded new segment %d at %p", index->num_segments - 1, &index->segments[index->num_segments - 1]);
-
- RELAY_ERROR(wp_entry_sizeof_postings_region(entry, seg, &postings_bytes));
- RELAY_ERROR(wp_segment_ensure_fit(seg, postings_bytes, 0, &success));
- if(!success) RAISE_ERROR("can't fit new entry into fresh segment. that's crazy");
+
+ // if we can fit in there, then return it! (still locked)
+ if(success) {
+ *returned_seg = seg;
+ return NO_ERROR;
}
- //RELAY_ERROR(release_lock(index)); // release full-index lock
+ RAISE_ERROR("making new");
+ // otherwise, unlock it and let's make a new one
+ RELAY_ERROR(wp_segment_release_lock(seg));
+
+ char buf[PATH_BUF_SIZE];
+ DEBUG("segment %d is full, loading a new one", index->num_segments - 1);
+ snprintf(buf, PATH_BUF_SIZE, "%s%d", index->pathname_base, index->num_segments);
+
+ // increase the two counters
+ index_info* ii = MMAP_OBJ(index->indexinfo, index_info);
+ ii->num_segments++;
+ index->num_segments++;
+
+ // make sure we have a pointer for this guy
+ RELAY_ERROR(ensure_segment_pointer_fit(index));
+
+ // create the new segment
+ RELAY_ERROR(wp_segment_create(&index->segments[index->num_segments - 1], buf));
+
+ // set the docid_offset
+ segment_info* prevsi = MMAP_OBJ(index->segments[index->num_segments - 2].seginfo, segment_info);
+ index->docid_offsets[index->num_segments - 1] = prevsi->num_docs + index->docid_offsets[index->num_segments - 2];
+
+ seg = &index->segments[index->num_segments - 1];
+ DEBUG("loaded new segment %d at %p", index->num_segments - 1, seg);
+
+ RELAY_ERROR(wp_segment_grab_writelock(seg)); // lock it
+ RELAY_ERROR(wp_entry_sizeof_postings_region(entry, seg, &postings_bytes));
+ RELAY_ERROR(wp_segment_ensure_fit(seg, postings_bytes, 0, &success));
+ if(!success) RAISE_ERROR("can't fit new entry into fresh segment. that's crazy");
+
+ *returned_seg = seg;
+ return NO_ERROR;
+}
+
+wp_error* wp_index_add_entry(wp_index* index, wp_entry* entry, uint64_t* doc_id) {
+ wp_segment* seg = NULL;
docid_t seg_doc_id;
- RELAY_ERROR(wp_segment_grab_writelock(seg));
+
+ // interleaving lock access -- potential for deadlock is high. :(
+ RELAY_ERROR(grab_writelock(index)); // grab full-index lock
+ RELAY_ERROR(get_and_writelock_last_segment(index, entry, &seg));
+ RELAY_ERROR(release_lock(index)); // release full-index lock
+
+ RELAY_ERROR(wp_segment_reload(seg));
RELAY_ERROR(wp_segment_grab_docid(seg, &seg_doc_id));
RELAY_ERROR(wp_entry_write_to_segment(entry, seg, seg_doc_id));
RELAY_ERROR(wp_segment_release_lock(seg));
@@ -293,12 +363,17 @@ wp_error* wp_index_delete(const char* pathname_base) {
wp_error* wp_index_add_label(wp_index* index, const char* label, uint64_t doc_id) {
int found = 0;
+ RELAY_ERROR(grab_writelock(index));
+ RELAY_ERROR(ensure_all_segments(index));
+ RELAY_ERROR(release_lock(index));
+
for(uint32_t i = index->num_segments; i > 0; i--) {
if(doc_id > index->docid_offsets[i - 1]) {
wp_segment* seg = &index->segments[i - 1];
DEBUG("found doc %llu in segment %u", doc_id, i - 1);
RELAY_ERROR(wp_segment_grab_writelock(seg));
+ RELAY_ERROR(wp_segment_reload(seg));
RELAY_ERROR(wp_segment_add_label(seg, label, (docid_t)(doc_id - index->docid_offsets[i - 1])));
RELAY_ERROR(wp_segment_release_lock(seg));
found = 1;
@@ -315,12 +390,17 @@ wp_error* wp_index_add_label(wp_index* index, const char* label, uint64_t doc_id
wp_error* wp_index_remove_label(wp_index* index, const char* label, uint64_t doc_id) {
int found = 0;
+ RELAY_ERROR(grab_writelock(index));
+ RELAY_ERROR(ensure_all_segments(index));
+ RELAY_ERROR(release_lock(index));
+
for(uint32_t i = index->num_segments; i > 0; i--) {
if(doc_id > index->docid_offsets[i - 1]) {
wp_segment* seg = &index->segments[i - 1];
DEBUG("found doc %llu in segment %u", doc_id, i - 1);
RELAY_ERROR(wp_segment_grab_writelock(seg));
+ RELAY_ERROR(wp_segment_reload(seg));
RELAY_ERROR(wp_segment_remove_label(seg, label, (docid_t)(doc_id - index->docid_offsets[i - 1])));
RELAY_ERROR(wp_segment_release_lock(seg));
found = 1;
@@ -337,6 +417,10 @@ wp_error* wp_index_remove_label(wp_index* index, const char* label, uint64_t doc
wp_error* wp_index_num_docs(wp_index* index, uint64_t* num_docs) {
*num_docs = 0;
+ RELAY_ERROR(grab_readlock(index));
+ RELAY_ERROR(ensure_all_segments(index));
+ RELAY_ERROR(release_lock(index));
+
// TODO check for overflow or some shit
for(uint32_t i = index->num_segments; i > 0; i--) {
wp_segment* seg = &index->segments[i - 1];
View
2 lock.c
@@ -87,7 +87,7 @@ wp_error* wp_lock_grab(pthread_rwlock_t* lock, int lock_type) {
total_delay_ms += delay_ms;
}
- if(total_delay_ms > 0) DEBUG(":( acquired %slock for segment %p after %ums\n", lock_name, seg, total_delay_ms);
+ if(total_delay_ms > 0) DEBUG(":( acquired %slock for after %ums\n", lock_name, total_delay_ms);
return NO_ERROR;
}

0 comments on commit f0defbf

Please sign in to comment.