diff --git a/src/cursor/cur_join.c b/src/cursor/cur_join.c index 2cbefa68c5e..556b6f7ea2a 100644 --- a/src/cursor/cur_join.c +++ b/src/cursor/cur_join.c @@ -53,7 +53,9 @@ __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, iter->session = session; iter->entry = entry; iter->cursor = newcur; - iter->advance = false; + iter->positioned = false; + iter->isequal = (entry->ends_next == 1 && + WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ); *iterp = iter; if (0) { @@ -101,10 +103,10 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey, WT_SESSION_IMPL *session; uint64_t r; - if (iter->advance) + if (iter->positioned) WT_ERR(iter->cursor->next(iter->cursor)); else - iter->advance = true; + iter->positioned = true; session = iter->session; cjoin = iter->cjoin; @@ -143,11 +145,11 @@ __curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter) { WT_DECL_RET; - if (iter->advance) { + if (iter->positioned) { WT_ERR(iter->cursor->reset(iter->cursor)); WT_ERR(__wt_cursor_dup_position( iter->cjoin->entries[0].ends[0].cursor, iter->cursor)); - iter->advance = false; + iter->positioned = false; iter->entry->stats.actual_count = 0; } @@ -162,7 +164,7 @@ err: return (ret); static bool __curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter) { - return (iter->advance); + return (iter->positioned); } /* @@ -255,18 +257,16 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_CURSOR_JOIN_ENDPOINT *end, *endmax; WT_DECL_RET; WT_DECL_ITEM(uribuf); - WT_ITEM curkey, curvalue, *k; + WT_ITEM curkey, curvalue; WT_TABLE *maintable; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; const char *mainkey_str, *p; - void *allocbuf; size_t mainkey_len, size; u_int i; int cmp, skip; c = NULL; - allocbuf = NULL; skip = 0; if (entry->index != NULL) { @@ -305,7 +305,7 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, if ((end = &entry->ends[0]) < endmax && F_ISSET(end, WT_CURJOIN_END_GE)) { WT_ERR(__wt_cursor_dup_position(end->cursor, c)); - if (end->flags == WT_CURJOIN_END_GE) + if (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_GE) skip = 1; } collator = (entry->index == NULL) ? NULL : entry->index->collator; @@ -313,18 +313,15 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_ERR(c->get_key(c, &curkey)); if (entry->index != NULL) { cindex = (WT_CURSOR_INDEX *)c; - if (cindex->index->extractor == NULL) { - /* - * Repack so it's comparable to the - * reference endpoints. - */ - k = &cindex->child->key; - WT_ERR(__wt_struct_repack(session, - cindex->child->key_format, - entry->main->value_format, k, &curkey, - &allocbuf)); - } else - curkey = cindex->child->key; + /* + * Repack so it's comparable to the + * reference endpoints. + */ + WT_ERR(__wt_struct_repack(session, + cindex->child->key_format, + (entry->repack_format != NULL ? + entry->repack_format : cindex->iface.key_format), + &cindex->child->key, &curkey)); } for (end = &entry->ends[skip]; end < endmax; end++) { WT_ERR(__wt_compare(session, collator, &curkey, @@ -361,7 +358,6 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, err: if (c != NULL) WT_TRET(c->close(c)); __wt_scr_free(session, &uribuf); - __wt_free(session, allocbuf); return (ret); } @@ -378,19 +374,16 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, WT_DECL_RET; WT_ITEM *k; uint64_t r; - void *allocbuf; - allocbuf = NULL; if ((cursor = endpoint->cursor) != NULL) { if (entry->index != NULL) { /* Extract and save the index's logical key. */ cindex = (WT_CURSOR_INDEX *)endpoint->cursor; WT_ERR(__wt_struct_repack(session, cindex->child->key_format, - cindex->iface.key_format, - &cindex->child->key, &endpoint->key, &allocbuf)); - if (allocbuf != NULL) - F_SET(endpoint, WT_CURJOIN_END_OWN_KEY); + (entry->repack_format != NULL ? + entry->repack_format : cindex->iface.key_format), + &cindex->child->key, &endpoint->key)); } else { k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key; if (WT_CURSOR_RECNO(cursor)) { @@ -404,10 +397,8 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session, endpoint->key = *k; } } - if (0) { -err: __wt_free(session, allocbuf); - } - return (ret); + +err: return (ret); } /* @@ -548,7 +539,7 @@ err: return (ret); typedef struct { WT_CURSOR iface; WT_CURSOR_JOIN_ENTRY *entry; - int ismember; + bool ismember; } WT_CURJOIN_EXTRACTOR; /* @@ -584,8 +575,8 @@ __curjoin_extract_insert(WT_CURSOR *cursor) { ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false); if (ret == WT_NOTFOUND) ret = 0; - else - cextract->ismember = 1; + else if (ret == 0) + cextract->ismember = true; return (ret); } @@ -659,10 +650,11 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, v = *key; if ((idx = entry->index) != NULL && idx->extractor != NULL) { + WT_CLEAR(extract_cursor); extract_cursor.iface = iface; extract_cursor.iface.session = &session->iface; extract_cursor.iface.key_format = idx->exkey_format; - extract_cursor.ismember = 0; + extract_cursor.ismember = false; extract_cursor.entry = entry; WT_ERR(idx->extractor->extract(idx->extractor, &session->iface, key, &v, &extract_cursor.iface)); @@ -715,8 +707,15 @@ __curjoin_next(WT_CURSOR *cursor) for (i = 0; i < cjoin->entries_next; i++) { ret = __curjoin_entry_member(session, cjoin, &cjoin->entries[i], skip_left); - if (ret == WT_NOTFOUND) + if (ret == WT_NOTFOUND) { + /* + * If this is compare=eq on our outer iterator, + * and we've moved past it, we're done. + */ + if (cjoin->iter->isequal && i == 0) + break; goto nextkey; + } skip_left = false; WT_ERR(ret); } @@ -783,12 +782,10 @@ __curjoin_close(WT_CURSOR *cursor) if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM)) WT_TRET(__wt_bloom_close(entry->bloom)); for (end = &entry->ends[0]; - end < &entry->ends[entry->ends_next]; end++) { + end < &entry->ends[entry->ends_next]; end++) F_CLR(end->cursor, WT_CURSTD_JOINED); - if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY)) - __wt_free(session, end->key.data); - } __wt_free(session, entry->ends); + __wt_free(session, entry->repack_format); } if (cjoin->iter != NULL) @@ -891,22 +888,22 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, WT_INDEX *idx, WT_CURSOR *ref_cursor, uint8_t flags, uint8_t range, uint64_t count, uint32_t bloom_bit_count, uint32_t bloom_hash_count) { + WT_CURSOR_INDEX *cindex; + WT_CURSOR_JOIN_ENDPOINT *end, *newend; WT_CURSOR_JOIN_ENTRY *entry; WT_DECL_RET; - WT_CURSOR_JOIN_ENDPOINT *end, *newend; bool hasins, needbloom, range_eq; - u_int i, ins, nonbloom; + char *main_uri, *newformat; const char *raw_cfg[] = { WT_CONFIG_BASE( session, WT_SESSION_open_cursor), "raw", NULL }; - char *main_uri; - size_t namesize, newsize; + size_t len, newsize; + u_int i, ins, nonbloom; entry = NULL; hasins = needbloom = false; ins = 0; /* -Wuninitialized */ main_uri = NULL; nonbloom = 0; /* -Wuninitialized */ - namesize = strlen(cjoin->table->name); for (i = 0; i < cjoin->entries_next; i++) { if (cjoin->entries[i].index == idx) { @@ -982,13 +979,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, ((range & WT_CURJOIN_END_GT) != 0 || range_eq)) || (F_ISSET(end, WT_CURJOIN_END_LT) && ((range & WT_CURJOIN_END_LT) != 0 || range_eq)) || - (end->flags == WT_CURJOIN_END_EQ && + (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && (range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT)) != 0)) WT_ERR_MSG(session, EINVAL, "join has overlapping ranges"); if (range == WT_CURJOIN_END_EQ && - end->flags == WT_CURJOIN_END_EQ && + WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ && !F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION)) WT_ERR_MSG(session, EINVAL, "compare=eq can only be combined " @@ -1026,15 +1023,40 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin, F_SET(newend, range); /* Open the main file with a projection of the indexed columns. */ - if (entry->main == NULL && entry->index != NULL) { - namesize = strlen(cjoin->table->name); - newsize = namesize + entry->index->colconf.len + 1; + if (entry->main == NULL && idx != NULL) { + newsize = strlen(cjoin->table->name) + idx->colconf.len + 1; WT_ERR(__wt_calloc(session, 1, newsize, &main_uri)); snprintf(main_uri, newsize, "%s%.*s", - cjoin->table->name, (int)entry->index->colconf.len, - entry->index->colconf.str); + cjoin->table->name, (int)idx->colconf.len, + idx->colconf.str); WT_ERR(__wt_open_cursor(session, main_uri, (WT_CURSOR *)cjoin, raw_cfg, &entry->main)); + if (idx->extractor == NULL) { + /* + * Add no-op padding so trailing 'u' formats are not + * transformed to 'U'. This matches what happens in + * the index. We don't do this when we have an + * extractor, extractors already use the padding + * byte trick. + */ + len = strlen(entry->main->value_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &newformat)); + snprintf(newformat, len, "%s0x", + entry->main->value_format); + __wt_free(session, entry->main->value_format); + entry->main->value_format = newformat; + } + + /* + * When we are repacking index keys to remove the primary + * key, we never want to transform trailing 'u'. Use no-op + * padding to force this. + */ + cindex = (WT_CURSOR_INDEX *)ref_cursor; + len = strlen(cindex->iface.key_format) + 3; + WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format)); + snprintf(entry->repack_format, len, "%s0x", + cindex->iface.key_format); } err: if (main_uri != NULL) diff --git a/src/include/cursor.h b/src/include/cursor.h index 7f7b5dceb79..f9bd20c8ba1 100644 --- a/src/include/cursor.h +++ b/src/include/cursor.h @@ -289,7 +289,8 @@ struct __wt_cursor_join_iter { WT_CURSOR_JOIN_ENTRY *entry; WT_CURSOR *cursor; WT_ITEM *curkey; - bool advance; + bool positioned; + bool isequal; /* advancing means we're done */ }; struct __wt_cursor_join_endpoint { @@ -302,14 +303,17 @@ struct __wt_cursor_join_endpoint { #define WT_CURJOIN_END_GT 0x04 /* include values > cursor */ #define WT_CURJOIN_END_GE (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ) #define WT_CURJOIN_END_LE (WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ) -#define WT_CURJOIN_END_OWN_KEY 0x08 /* must free key's data */ uint8_t flags; /* range for this endpoint */ }; +#define WT_CURJOIN_END_RANGE(endp) \ + ((endp)->flags & \ + (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | WT_CURJOIN_END_LT)) struct __wt_cursor_join_entry { WT_INDEX *index; WT_CURSOR *main; /* raw main table cursor */ WT_BLOOM *bloom; /* Bloom filter handle */ + char *repack_format; /* target format for repack */ uint32_t bloom_bit_count; /* bits per item in bloom */ uint32_t bloom_hash_count; /* hash functions in bloom */ uint64_t count; /* approx number of matches */ diff --git a/src/include/extern.h b/src/include/extern.h index 64d46a5a254..92ae968affd 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -553,7 +553,7 @@ extern int __wt_struct_size(WT_SESSION_IMPL *session, size_t *sizep, const char extern int __wt_struct_pack(WT_SESSION_IMPL *session, void *buffer, size_t size, const char *fmt, ...); extern int __wt_struct_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, ...); extern int __wt_struct_unpack_size(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, size_t *resultp); -extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp); +extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf); extern int __wt_ovfl_discard_add(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL *cell); extern void __wt_ovfl_discard_free(WT_SESSION_IMPL *session, WT_PAGE *page); extern int __wt_ovfl_reuse_search(WT_SESSION_IMPL *session, WT_PAGE *page, uint8_t **addrp, size_t *addr_sizep, const void *value, size_t value_size); diff --git a/src/packing/pack_impl.c b/src/packing/pack_impl.c index 0e3ed44ba6a..bd1c90525a6 100644 --- a/src/packing/pack_impl.c +++ b/src/packing/pack_impl.c @@ -144,33 +144,19 @@ __wt_struct_unpack_size(WT_SESSION_IMPL *session, */ int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, - const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp) + const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf) { WT_DECL_PACK_VALUE(pvin); WT_DECL_PACK_VALUE(pvout); WT_DECL_RET; WT_PACK packin, packout; const uint8_t *before, *end, *p; - uint8_t *pout; - size_t len; const void *start; start = NULL; p = inbuf->data; end = p + inbuf->size; - /* - * Handle this non-contiguous case: 'U' -> 'u' at the end of the buf. - * The former case has the size embedded before the item, the latter - * does not. - */ - if ((len = strlen(outfmt)) > 1 && outfmt[len - 1] == 'u' && - strlen(infmt) > len && infmt[len - 1] == 'U') { - WT_ERR(__wt_realloc(session, NULL, inbuf->size, reallocp)); - pout = *reallocp; - } else - pout = NULL; - WT_ERR(__pack_init(session, &packout, outfmt)); WT_ERR(__pack_init(session, &packin, infmt)); @@ -178,22 +164,14 @@ __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, while ((ret = __pack_next(&packout, &pvout)) == 0) { if (p >= end) WT_ERR(EINVAL); + if (pvout.type == 'x' && pvout.size == 0 && pvout.havesize) + continue; WT_ERR(__pack_next(&packin, &pvin)); before = p; WT_ERR(__unpack_read(session, &pvin, &p, (size_t)(end - p))); - if (pvout.type != pvin.type) { - if (pvout.type == 'u' && pvin.type == 'U') { - /* Skip the prefixed size, we don't need it */ - WT_ERR(__wt_struct_unpack_size(session, before, - (size_t)(end - before), "I", &len)); - before += len; - } else - WT_ERR(ENOTSUP); - } - if (pout != NULL) { - memcpy(pout, before, WT_PTRDIFF(p, before)); - pout += p - before; - } else if (start == NULL) + if (pvout.type != pvin.type) + WT_ERR(ENOTSUP); + if (start == NULL) start = before; } WT_ERR_NOTFOUND_OK(ret); @@ -201,13 +179,8 @@ __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, /* Be paranoid - __pack_write should never overflow. */ WT_ASSERT(session, p <= end); - if (pout != NULL) { - outbuf->data = *reallocp; - outbuf->size = WT_PTRDIFF(pout, *reallocp); - } else { - outbuf->data = start; - outbuf->size = WT_PTRDIFF(p, start); - } + outbuf->data = start; + outbuf->size = WT_PTRDIFF(p, start); err: return (ret); }