Skip to content

Commit

Permalink
Merge pull request #2189 from wiredtiger/index-create-lsm3
Browse files Browse the repository at this point in the history
WT-147: Dynamic Index creation.  Use bulk=unordered
  • Loading branch information
michaelcahill committed Sep 22, 2015
2 parents 752b2b2 + 402d127 commit 959376c
Show file tree
Hide file tree
Showing 16 changed files with 473 additions and 96 deletions.
2 changes: 2 additions & 0 deletions dist/s_string.ok
Expand Up @@ -302,6 +302,7 @@ UnixLib
Unmap
UnmapViewOfFile
Unmarshall
Unordered
Uryyb
VARCHAR
VLDB
Expand Down Expand Up @@ -967,6 +968,7 @@ unmarshall
unmarshalled
unmerged
unmodify
unordered
unpackv
unpadded
unreferenced
Expand Down
33 changes: 25 additions & 8 deletions examples/c/ex_extractor.c
Expand Up @@ -37,6 +37,8 @@

#include <wiredtiger.h>

#define RET_OK(ret) ((ret) == 0 || (ret) == WT_NOTFOUND)

int add_extractor(WT_CONNECTION *conn);

static const char *home;
Expand Down Expand Up @@ -144,28 +146,43 @@ read_index(WT_SESSION *session)
WT_CURSOR *cursor;
int i, ret;
char *first_name, *last_name;
uint16_t term_end, term_start, year;
uint16_t rec_year, term_end, term_start, year;

year = 0;
srand((unsigned int)getpid());
ret = session->open_cursor(
session, "index:presidents:term", NULL, NULL, &cursor);
/*
* Pick 10 random years and read the data.
*/
for (i = 0; i < 10; i++) {
for (i = 0; i < 10 && RET_OK(ret); i++) {
year = (uint16_t)((rand() % YEAR_SPAN) + YEAR_BASE);
printf("Year %d:\n", year);
cursor->set_key(cursor, year);
if ((ret = cursor->search(cursor)) == 0) {
if ((ret = cursor->search(cursor)) != 0)
break;
if ((ret = cursor->get_key(cursor, &rec_year)) != 0)
break;
if ((ret = cursor->get_value(cursor,
&last_name, &first_name, &term_start, &term_end)) != 0)
break;

/* Report all presidents that served during the chosen year */
while (term_start <= year &&
year <= term_end && year == rec_year) {
printf("\t%s %s\n", first_name, last_name);
if ((ret = cursor->next(cursor)) != 0)
break;
if ((ret = cursor->get_key(cursor, &rec_year)) != 0)
break;
if ((ret = cursor->get_value(cursor, &last_name,
&first_name, &term_start, &term_end)) != 0)
break;
printf("Year %d: %s %s\n", year, first_name, last_name);
continue;
}

fprintf(stderr, "Error %d for year %d\n", ret, year);
break;
}
if (!RET_OK(ret))
fprintf(stderr, "Error %d for year %d\n", ret, year);

ret = cursor->close(cursor);
return (ret);
}
Expand Down
11 changes: 11 additions & 0 deletions src/conn/conn_dhandle.c
Expand Up @@ -331,6 +331,17 @@ __wt_conn_btree_open(
F_SET(btree, LF_ISSET(WT_BTREE_SPECIAL_FLAGS));

WT_ERR(__wt_btree_open(session, cfg));

/*
* Bulk handles require true exclusive access, otherwise, handles
* marked as exclusive are allowed to be relocked by the same
* session.
*/
if (F_ISSET(dhandle, WT_DHANDLE_EXCLUSIVE) &&
!LF_ISSET(WT_BTREE_BULK)) {
dhandle->excl_session = session;
dhandle->excl_ref = 1;
}
F_SET(dhandle, WT_DHANDLE_OPEN);

/*
Expand Down
18 changes: 13 additions & 5 deletions src/cursor/cur_file.c
Expand Up @@ -492,6 +492,7 @@ __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri,
int bitmap, bulk;
uint32_t flags;

bitmap = bulk = 0;
flags = 0;

WT_RET(__wt_config_gets_def(session, cfg, "bulk", 0, &cval));
Expand All @@ -502,7 +503,14 @@ __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri,
bulk = (cval.val != 0);
} else if (WT_STRING_MATCH("bitmap", cval.str, cval.len))
bitmap = bulk = 1;
else
/*
* Unordered bulk insert is a special case used internally by
* index creation on existing tables. It doesn't enforce
* any special semantics at the file level. It primarily
* exists to avoid some locking problems with LSM trees and
* index creation.
*/
else if (!WT_STRING_MATCH("unordered", cval.str, cval.len))
WT_RET_MSG(session, EINVAL,
"Value for 'bulk' must be a boolean or 'bitmap'");

Expand All @@ -513,11 +521,11 @@ __wt_curfile_open(WT_SESSION_IMPL *session, const char *uri,
/* Get the handle and lock it while the cursor is using it. */
if (WT_PREFIX_MATCH(uri, "file:")) {
/*
* If we are opening a bulk cursor, get the handle while
* holding the checkpoint lock. This prevents a bulk cursor
* open failing with EBUSY due to a database-wide checkpoint.
* If we are opening exclusive, get the handle while holding
* the checkpoint lock. This prevents a bulk cursor open
* failing with EBUSY due to a database-wide checkpoint.
*/
if (bulk)
if (LF_ISSET(WT_DHANDLE_EXCLUSIVE))
WT_WITH_CHECKPOINT_LOCK(session, ret =
__wt_session_get_btree_ckpt(
session, uri, cfg, flags));
Expand Down
94 changes: 54 additions & 40 deletions src/cursor/cur_table.c
Expand Up @@ -71,11 +71,13 @@ __curextract_insert(WT_CURSOR *cursor) {
}

/*
* __apply_idx --
* Apply an operation to all indices of a table.
* __wt_apply_single_idx --
* Apply an operation to a single index of a table.
*/
static int
__apply_idx(WT_CURSOR_TABLE *ctable, size_t func_off, int skip_immutable) {
int
__wt_apply_single_idx(WT_SESSION_IMPL *session, WT_INDEX *idx,
WT_CURSOR *cur, WT_CURSOR_TABLE *ctable, int (*f)(WT_CURSOR *))
{
WT_CURSOR_STATIC_INIT(iface,
__wt_cursor_get_key, /* get-key */
__wt_cursor_get_value, /* get-value */
Expand All @@ -93,11 +95,49 @@ __apply_idx(WT_CURSOR_TABLE *ctable, size_t func_off, int skip_immutable) {
__wt_cursor_notsup, /* reconfigure */
__wt_cursor_notsup, /* remove */
__wt_cursor_notsup); /* close */
WT_CURSOR **cp;
WT_CURSOR_EXTRACTOR extract_cursor;
WT_DECL_RET;
WT_INDEX *idx;
WT_ITEM key, value;

if (idx->extractor) {
extract_cursor.iface = iface;
extract_cursor.iface.session = &session->iface;
extract_cursor.iface.key_format = idx->exkey_format;
extract_cursor.ctable = ctable;
extract_cursor.idxc = cur;
extract_cursor.f = f;

WT_RET(__wt_cursor_get_raw_key(&ctable->iface, &key));
WT_RET(
__wt_cursor_get_raw_value(&ctable->iface, &value));
ret = idx->extractor->extract(idx->extractor,
&session->iface, &key, &value,
&extract_cursor.iface);

__wt_buf_free(session, &extract_cursor.iface.key);
WT_RET(ret);
} else {
WT_RET(__wt_schema_project_merge(session,
ctable->cg_cursors,
idx->key_plan, idx->key_format, &cur->key));
/*
* The index key is now set and the value is empty
* (it starts clear and is never set).
*/
F_SET(cur, WT_CURSTD_KEY_EXT | WT_CURSTD_VALUE_EXT);
WT_RET(f(cur));
}
return (0);
}

/*
* __apply_idx --
* Apply an operation to all indices of a table.
*/
static int
__apply_idx(WT_CURSOR_TABLE *ctable, size_t func_off, int skip_immutable) {
WT_CURSOR **cp;
WT_INDEX *idx;
WT_SESSION_IMPL *session;
int (*f)(WT_CURSOR *);
u_int i;
Expand All @@ -111,34 +151,7 @@ __apply_idx(WT_CURSOR_TABLE *ctable, size_t func_off, int skip_immutable) {
continue;

f = *(int (**)(WT_CURSOR *))((uint8_t *)*cp + func_off);
if (idx->extractor) {
extract_cursor.iface = iface;
extract_cursor.iface.session = &session->iface;
extract_cursor.iface.key_format = idx->exkey_format;
extract_cursor.ctable = ctable;
extract_cursor.idxc = *cp;
extract_cursor.f = f;

WT_RET(__wt_cursor_get_raw_key(&ctable->iface, &key));
WT_RET(
__wt_cursor_get_raw_value(&ctable->iface, &value));
ret = idx->extractor->extract(idx->extractor,
&session->iface, &key, &value,
&extract_cursor.iface);

__wt_buf_free(session, &extract_cursor.iface.key);
WT_RET(ret);
} else {
WT_RET(__wt_schema_project_merge(session,
ctable->cg_cursors,
idx->key_plan, idx->key_format, &(*cp)->key));
/*
* The index key is now set and the value is empty
* (it starts clear and is never set).
*/
F_SET(*cp, WT_CURSTD_KEY_EXT | WT_CURSTD_VALUE_EXT);
WT_RET(f(*cp));
}
WT_RET(__wt_apply_single_idx(session, idx, *cp, ctable, f));
WT_RET((*cp)->reset(*cp));
}

Expand Down Expand Up @@ -711,12 +724,13 @@ __curtable_close(WT_CURSOR *cursor)
ctable = (WT_CURSOR_TABLE *)cursor;
CURSOR_API_CALL(cursor, session, close, NULL);

for (i = 0, cp = ctable->cg_cursors;
i < WT_COLGROUPS(ctable->table); i++, cp++)
if (*cp != NULL) {
WT_TRET((*cp)->close(*cp));
*cp = NULL;
}
if (ctable->cg_cursors != NULL)
for (i = 0, cp = ctable->cg_cursors;
i < WT_COLGROUPS(ctable->table); i++, cp++)
if (*cp != NULL) {
WT_TRET((*cp)->close(*cp));
*cp = NULL;
}

if (ctable->idx_cursors != NULL)
for (i = 0, cp = ctable->idx_cursors;
Expand Down
2 changes: 2 additions & 0 deletions src/include/dhandle.h
Expand Up @@ -49,7 +49,9 @@ struct __wt_data_handle {
*/
uint32_t session_ref; /* Sessions referencing this handle */
int32_t session_inuse; /* Sessions using this handle */
uint32_t excl_ref; /* Refs of handle by excl_session */
time_t timeofdeath; /* Use count went to 0 */
WT_SESSION_IMPL *excl_session; /* Session with exclusive use, if any */

uint64_t name_hash; /* Hash of name */
const char *name; /* Object name as a URI */
Expand Down
1 change: 1 addition & 0 deletions src/include/extern.h
Expand Up @@ -307,6 +307,7 @@ extern int __wt_cursor_equals(WT_CURSOR *cursor, WT_CURSOR *other, int *equalp);
extern int __wt_cursor_reconfigure(WT_CURSOR *cursor, const char *config);
extern int __wt_cursor_dup_position(WT_CURSOR *to_dup, WT_CURSOR *cursor);
extern int __wt_cursor_init(WT_CURSOR *cursor, const char *uri, WT_CURSOR *owner, const char *cfg[], WT_CURSOR **cursorp);
extern int __wt_apply_single_idx(WT_SESSION_IMPL *session, WT_INDEX *idx, WT_CURSOR *cur, WT_CURSOR_TABLE *ctable, int (*f)(WT_CURSOR *));
extern int __wt_curtable_get_key(WT_CURSOR *cursor, ...);
extern int __wt_curtable_get_value(WT_CURSOR *cursor, ...);
extern void __wt_curtable_set_key(WT_CURSOR *cursor, ...);
Expand Down
18 changes: 9 additions & 9 deletions src/lsm/lsm_cursor_bulk.c
Expand Up @@ -91,6 +91,7 @@ int
__wt_clsm_open_bulk(WT_CURSOR_LSM *clsm, const char *cfg[])
{
WT_CURSOR *cursor, *bulk_cursor;
WT_DECL_RET;
WT_LSM_TREE *lsm_tree;
WT_SESSION_IMPL *session;

Expand All @@ -106,17 +107,16 @@ __wt_clsm_open_bulk(WT_CURSOR_LSM *clsm, const char *cfg[])
cursor->insert = __clsm_insert_bulk;
cursor->close = __clsm_close_bulk;

/* Setup the first chunk in the tree. */
WT_RET(__wt_clsm_request_switch(clsm));
WT_RET(__wt_clsm_await_switch(clsm));

/*
* Grab and release the LSM tree lock to ensure that the first chunk
* has been fully created before proceeding. We have the LSM tree
* open exclusive, so that saves us from needing the lock generally.
* Setup the first chunk in the tree. This is the only time we switch
* without using the LSM worker threads, it's safe to do here since
* we have an exclusive lock on the LSM tree. We need to do this
* switch inline, since switch needs a schema lock and online index
* creation opens a bulk cursor while holding the schema lock.
*/
WT_RET(__wt_lsm_tree_readlock(session, lsm_tree));
WT_RET(__wt_lsm_tree_readunlock(session, lsm_tree));
WT_WITH_SCHEMA_LOCK(session,
ret = __wt_lsm_tree_switch(session, lsm_tree));
WT_RET(ret);

/*
* Open a bulk cursor on the first chunk, it's not a regular LSM chunk
Expand Down
46 changes: 45 additions & 1 deletion src/schema/schema_create.c
Expand Up @@ -321,6 +321,47 @@ __wt_schema_index_source(WT_SESSION_IMPL *session,
return (0);
}

/*
* __fill_index --
* Fill the index from the current contents of the table.
*/
static int
__fill_index(WT_SESSION_IMPL *session, WT_TABLE *table, WT_INDEX *idx)
{
WT_DECL_RET;
WT_CURSOR *tcur, *icur;
WT_SESSION *wt_session;

wt_session = &session->iface;
tcur = NULL;
icur = NULL;
WT_RET(__wt_schema_open_colgroups(session, table));

/*
* If the column groups have not been completely created,
* there cannot be data inserted yet, and we're done.
*/
if (!table->cg_complete)
return (0);

WT_ERR(wt_session->open_cursor(wt_session,
idx->source, NULL, "bulk=unordered", &icur));
WT_ERR(wt_session->open_cursor(wt_session,
table->name, NULL, "readonly", &tcur));

while ((ret = tcur->next(tcur)) == 0)
WT_ERR(__wt_apply_single_idx(session, idx,
icur, (WT_CURSOR_TABLE *)tcur, icur->insert));

WT_ERR_NOTFOUND_OK(ret);
err:
if (icur)
WT_TRET(icur->close(icur));
if (tcur)
WT_TRET(tcur->close(tcur));
return (ret);
}

/*
* __create_index --
* Create an index.
Expand All @@ -333,6 +374,7 @@ __create_index(WT_SESSION_IMPL *session,
WT_CONFIG_ITEM ckey, cval, icols, kval;
WT_DECL_PACK_VALUE(pv);
WT_DECL_RET;
WT_INDEX *idx;
WT_ITEM confbuf, extra_cols, fmt, namebuf;
WT_PACK pack;
WT_TABLE *table;
Expand Down Expand Up @@ -495,7 +537,9 @@ __create_index(WT_SESSION_IMPL *session,

/* Make sure that the configuration is valid. */
WT_ERR(__wt_schema_open_index(
session, table, idxname, strlen(idxname), NULL));
session, table, idxname, strlen(idxname), &idx));

WT_ERR(__fill_index(session, table, idx));

err: __wt_free(session, idxconf);
__wt_free(session, sourceconf);
Expand Down

0 comments on commit 959376c

Please sign in to comment.