Skip to content

Commit

Permalink
Merge pull request #2487 from wiredtiger/wt-2382-join-collator
Browse files Browse the repository at this point in the history
WT-2382 Resolve 'u' vs. 'U' format mismatches for cursor joins in collators/extractors.
  • Loading branch information
ddanderson committed Feb 11, 2016
2 parents 6733005 + 7962cd7 commit 3bc10f5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 92 deletions.
130 changes: 76 additions & 54 deletions src/cursor/cur_join.c
Expand Up @@ -53,7 +53,9 @@ __curjoin_entry_iter_init(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
iter->session = session;
iter->entry = entry;
iter->cursor = newcur;
iter->advance = false;
iter->positioned = false;
iter->isequal = (entry->ends_next == 1 &&
WT_CURJOIN_END_RANGE(&entry->ends[0]) == WT_CURJOIN_END_EQ);
*iterp = iter;

if (0) {
Expand Down Expand Up @@ -101,10 +103,10 @@ __curjoin_entry_iter_next(WT_CURSOR_JOIN_ITER *iter, WT_ITEM *primkey,
WT_SESSION_IMPL *session;
uint64_t r;

if (iter->advance)
if (iter->positioned)
WT_ERR(iter->cursor->next(iter->cursor));
else
iter->advance = true;
iter->positioned = true;

session = iter->session;
cjoin = iter->cjoin;
Expand Down Expand Up @@ -143,11 +145,11 @@ __curjoin_entry_iter_reset(WT_CURSOR_JOIN_ITER *iter)
{
WT_DECL_RET;

if (iter->advance) {
if (iter->positioned) {
WT_ERR(iter->cursor->reset(iter->cursor));
WT_ERR(__wt_cursor_dup_position(
iter->cjoin->entries[0].ends[0].cursor, iter->cursor));
iter->advance = false;
iter->positioned = false;
iter->entry->stats.actual_count = 0;
}

Expand All @@ -162,7 +164,7 @@ err: return (ret);
static bool
__curjoin_entry_iter_ready(WT_CURSOR_JOIN_ITER *iter)
{
return (iter->advance);
return (iter->positioned);
}

/*
Expand Down Expand Up @@ -255,18 +257,16 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_CURSOR_JOIN_ENDPOINT *end, *endmax;
WT_DECL_RET;
WT_DECL_ITEM(uribuf);
WT_ITEM curkey, curvalue, *k;
WT_ITEM curkey, curvalue;
WT_TABLE *maintable;
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
const char *mainkey_str, *p;
void *allocbuf;
size_t mainkey_len, size;
u_int i;
int cmp, skip;

c = NULL;
allocbuf = NULL;
skip = 0;

if (entry->index != NULL) {
Expand Down Expand Up @@ -305,26 +305,23 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
if ((end = &entry->ends[0]) < endmax &&
F_ISSET(end, WT_CURJOIN_END_GE)) {
WT_ERR(__wt_cursor_dup_position(end->cursor, c));
if (end->flags == WT_CURJOIN_END_GE)
if (WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_GE)
skip = 1;
}
collator = (entry->index == NULL) ? NULL : entry->index->collator;
while (ret == 0) {
WT_ERR(c->get_key(c, &curkey));
if (entry->index != NULL) {
cindex = (WT_CURSOR_INDEX *)c;
if (cindex->index->extractor == NULL) {
/*
* Repack so it's comparable to the
* reference endpoints.
*/
k = &cindex->child->key;
WT_ERR(__wt_struct_repack(session,
cindex->child->key_format,
entry->main->value_format, k, &curkey,
&allocbuf));
} else
curkey = cindex->child->key;
/*
* Repack so it's comparable to the
* reference endpoints.
*/
WT_ERR(__wt_struct_repack(session,
cindex->child->key_format,
(entry->repack_format != NULL ?
entry->repack_format : cindex->iface.key_format),
&cindex->child->key, &curkey));
}
for (end = &entry->ends[skip]; end < endmax; end++) {
WT_ERR(__wt_compare(session, collator, &curkey,
Expand Down Expand Up @@ -361,7 +358,6 @@ __curjoin_init_bloom(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
err: if (c != NULL)
WT_TRET(c->close(c));
__wt_scr_free(session, &uribuf);
__wt_free(session, allocbuf);
return (ret);
}

Expand All @@ -378,19 +374,16 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
WT_DECL_RET;
WT_ITEM *k;
uint64_t r;
void *allocbuf;

allocbuf = NULL;
if ((cursor = endpoint->cursor) != NULL) {
if (entry->index != NULL) {
/* Extract and save the index's logical key. */
cindex = (WT_CURSOR_INDEX *)endpoint->cursor;
WT_ERR(__wt_struct_repack(session,
cindex->child->key_format,
cindex->iface.key_format,
&cindex->child->key, &endpoint->key, &allocbuf));
if (allocbuf != NULL)
F_SET(endpoint, WT_CURJOIN_END_OWN_KEY);
(entry->repack_format != NULL ?
entry->repack_format : cindex->iface.key_format),
&cindex->child->key, &endpoint->key));
} else {
k = &((WT_CURSOR_TABLE *)cursor)->cg_cursors[0]->key;
if (WT_CURSOR_RECNO(cursor)) {
Expand All @@ -404,10 +397,8 @@ __curjoin_endpoint_init_key(WT_SESSION_IMPL *session,
endpoint->key = *k;
}
}
if (0) {
err: __wt_free(session, allocbuf);
}
return (ret);

err: return (ret);
}

/*
Expand Down Expand Up @@ -548,7 +539,7 @@ err: return (ret);
typedef struct {
WT_CURSOR iface;
WT_CURSOR_JOIN_ENTRY *entry;
int ismember;
bool ismember;
} WT_CURJOIN_EXTRACTOR;

/*
Expand Down Expand Up @@ -584,8 +575,8 @@ __curjoin_extract_insert(WT_CURSOR *cursor) {
ret = __curjoin_entry_in_range(session, cextract->entry, &ikey, false);
if (ret == WT_NOTFOUND)
ret = 0;
else
cextract->ismember = 1;
else if (ret == 0)
cextract->ismember = true;

return (ret);
}
Expand Down Expand Up @@ -659,10 +650,11 @@ __curjoin_entry_member(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
v = *key;

if ((idx = entry->index) != NULL && idx->extractor != NULL) {
WT_CLEAR(extract_cursor);
extract_cursor.iface = iface;
extract_cursor.iface.session = &session->iface;
extract_cursor.iface.key_format = idx->exkey_format;
extract_cursor.ismember = 0;
extract_cursor.ismember = false;
extract_cursor.entry = entry;
WT_ERR(idx->extractor->extract(idx->extractor,
&session->iface, key, &v, &extract_cursor.iface));
Expand Down Expand Up @@ -715,8 +707,15 @@ __curjoin_next(WT_CURSOR *cursor)
for (i = 0; i < cjoin->entries_next; i++) {
ret = __curjoin_entry_member(session, cjoin,
&cjoin->entries[i], skip_left);
if (ret == WT_NOTFOUND)
if (ret == WT_NOTFOUND) {
/*
* If this is compare=eq on our outer iterator,
* and we've moved past it, we're done.
*/
if (cjoin->iter->isequal && i == 0)
break;
goto nextkey;
}
skip_left = false;
WT_ERR(ret);
}
Expand Down Expand Up @@ -783,12 +782,10 @@ __curjoin_close(WT_CURSOR *cursor)
if (F_ISSET(entry, WT_CURJOIN_ENTRY_OWN_BLOOM))
WT_TRET(__wt_bloom_close(entry->bloom));
for (end = &entry->ends[0];
end < &entry->ends[entry->ends_next]; end++) {
end < &entry->ends[entry->ends_next]; end++)
F_CLR(end->cursor, WT_CURSTD_JOINED);
if (F_ISSET(end, WT_CURJOIN_END_OWN_KEY))
__wt_free(session, end->key.data);
}
__wt_free(session, entry->ends);
__wt_free(session, entry->repack_format);
}

if (cjoin->iter != NULL)
Expand Down Expand Up @@ -891,22 +888,22 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
WT_INDEX *idx, WT_CURSOR *ref_cursor, uint8_t flags, uint8_t range,
uint64_t count, uint32_t bloom_bit_count, uint32_t bloom_hash_count)
{
WT_CURSOR_INDEX *cindex;
WT_CURSOR_JOIN_ENDPOINT *end, *newend;
WT_CURSOR_JOIN_ENTRY *entry;
WT_DECL_RET;
WT_CURSOR_JOIN_ENDPOINT *end, *newend;
bool hasins, needbloom, range_eq;
u_int i, ins, nonbloom;
char *main_uri, *newformat;
const char *raw_cfg[] = { WT_CONFIG_BASE(
session, WT_SESSION_open_cursor), "raw", NULL };
char *main_uri;
size_t namesize, newsize;
size_t len, newsize;
u_int i, ins, nonbloom;

entry = NULL;
hasins = needbloom = false;
ins = 0; /* -Wuninitialized */
main_uri = NULL;
nonbloom = 0; /* -Wuninitialized */
namesize = strlen(cjoin->table->name);

for (i = 0; i < cjoin->entries_next; i++) {
if (cjoin->entries[i].index == idx) {
Expand Down Expand Up @@ -982,13 +979,13 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
((range & WT_CURJOIN_END_GT) != 0 || range_eq)) ||
(F_ISSET(end, WT_CURJOIN_END_LT) &&
((range & WT_CURJOIN_END_LT) != 0 || range_eq)) ||
(end->flags == WT_CURJOIN_END_EQ &&
(WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
(range & (WT_CURJOIN_END_LT | WT_CURJOIN_END_GT))
!= 0))
WT_ERR_MSG(session, EINVAL,
"join has overlapping ranges");
if (range == WT_CURJOIN_END_EQ &&
end->flags == WT_CURJOIN_END_EQ &&
WT_CURJOIN_END_RANGE(end) == WT_CURJOIN_END_EQ &&
!F_ISSET(entry, WT_CURJOIN_ENTRY_DISJUNCTION))
WT_ERR_MSG(session, EINVAL,
"compare=eq can only be combined "
Expand Down Expand Up @@ -1026,15 +1023,40 @@ __wt_curjoin_join(WT_SESSION_IMPL *session, WT_CURSOR_JOIN *cjoin,
F_SET(newend, range);

/* Open the main file with a projection of the indexed columns. */
if (entry->main == NULL && entry->index != NULL) {
namesize = strlen(cjoin->table->name);
newsize = namesize + entry->index->colconf.len + 1;
if (entry->main == NULL && idx != NULL) {
newsize = strlen(cjoin->table->name) + idx->colconf.len + 1;
WT_ERR(__wt_calloc(session, 1, newsize, &main_uri));
snprintf(main_uri, newsize, "%s%.*s",
cjoin->table->name, (int)entry->index->colconf.len,
entry->index->colconf.str);
cjoin->table->name, (int)idx->colconf.len,
idx->colconf.str);
WT_ERR(__wt_open_cursor(session, main_uri,
(WT_CURSOR *)cjoin, raw_cfg, &entry->main));
if (idx->extractor == NULL) {
/*
* Add no-op padding so trailing 'u' formats are not
* transformed to 'U'. This matches what happens in
* the index. We don't do this when we have an
* extractor, extractors already use the padding
* byte trick.
*/
len = strlen(entry->main->value_format) + 3;
WT_ERR(__wt_calloc(session, len, 1, &newformat));
snprintf(newformat, len, "%s0x",
entry->main->value_format);
__wt_free(session, entry->main->value_format);
entry->main->value_format = newformat;
}

/*
* When we are repacking index keys to remove the primary
* key, we never want to transform trailing 'u'. Use no-op
* padding to force this.
*/
cindex = (WT_CURSOR_INDEX *)ref_cursor;
len = strlen(cindex->iface.key_format) + 3;
WT_ERR(__wt_calloc(session, len, 1, &entry->repack_format));
snprintf(entry->repack_format, len, "%s0x",
cindex->iface.key_format);
}

err: if (main_uri != NULL)
Expand Down
8 changes: 6 additions & 2 deletions src/include/cursor.h
Expand Up @@ -289,7 +289,8 @@ struct __wt_cursor_join_iter {
WT_CURSOR_JOIN_ENTRY *entry;
WT_CURSOR *cursor;
WT_ITEM *curkey;
bool advance;
bool positioned;
bool isequal; /* advancing means we're done */
};

struct __wt_cursor_join_endpoint {
Expand All @@ -302,14 +303,17 @@ struct __wt_cursor_join_endpoint {
#define WT_CURJOIN_END_GT 0x04 /* include values > cursor */
#define WT_CURJOIN_END_GE (WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ)
#define WT_CURJOIN_END_LE (WT_CURJOIN_END_LT | WT_CURJOIN_END_EQ)
#define WT_CURJOIN_END_OWN_KEY 0x08 /* must free key's data */
uint8_t flags; /* range for this endpoint */
};
#define WT_CURJOIN_END_RANGE(endp) \
((endp)->flags & \
(WT_CURJOIN_END_GT | WT_CURJOIN_END_EQ | WT_CURJOIN_END_LT))

struct __wt_cursor_join_entry {
WT_INDEX *index;
WT_CURSOR *main; /* raw main table cursor */
WT_BLOOM *bloom; /* Bloom filter handle */
char *repack_format; /* target format for repack */
uint32_t bloom_bit_count; /* bits per item in bloom */
uint32_t bloom_hash_count; /* hash functions in bloom */
uint64_t count; /* approx number of matches */
Expand Down
2 changes: 1 addition & 1 deletion src/include/extern.h
Expand Up @@ -553,7 +553,7 @@ extern int __wt_struct_size(WT_SESSION_IMPL *session, size_t *sizep, const char
extern int __wt_struct_pack(WT_SESSION_IMPL *session, void *buffer, size_t size, const char *fmt, ...);
extern int __wt_struct_unpack(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, ...);
extern int __wt_struct_unpack_size(WT_SESSION_IMPL *session, const void *buffer, size_t size, const char *fmt, size_t *resultp);
extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf, void **reallocp);
extern int __wt_struct_repack(WT_SESSION_IMPL *session, const char *infmt, const char *outfmt, const WT_ITEM *inbuf, WT_ITEM *outbuf);
extern int __wt_ovfl_discard_add(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL *cell);
extern void __wt_ovfl_discard_free(WT_SESSION_IMPL *session, WT_PAGE *page);
extern int __wt_ovfl_reuse_search(WT_SESSION_IMPL *session, WT_PAGE *page, uint8_t **addrp, size_t *addr_sizep, const void *value, size_t value_size);
Expand Down

0 comments on commit 3bc10f5

Please sign in to comment.