Skip to content

Commit

Permalink
Merge pull request #2067 from wiredtiger/log-scalability
Browse files Browse the repository at this point in the history
WT-1989 Log scalability
  • Loading branch information
michaelcahill committed Jul 21, 2015
2 parents 72530cc + 85dc7d7 commit b10bff9
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 146 deletions.
1 change: 1 addition & 0 deletions dist/stat_data.py
Expand Up @@ -241,6 +241,7 @@ def __init__(self, name, desc, flags=''):
LogStat('log_writes', 'log write operations'),
LogStat('log_write_lsn', 'log server thread advances write LSN'),

LogStat('log_slot_coalesced', 'written slots coalesced'),
LogStat('log_slot_consolidated', 'logging bytes consolidated'),
LogStat('log_slot_closes', 'consolidated slot closures'),
LogStat('log_slot_joins', 'consolidated slot joins'),
Expand Down
172 changes: 113 additions & 59 deletions src/conn/conn_log.c
Expand Up @@ -392,100 +392,154 @@ typedef struct {
(entry1).lsn.offset < (entry2).lsn.offset))

/*
* __log_wrlsn_server --
* The log wrlsn server thread.
* __wt_log_wrlsn --
* Process written log slots and attempt to coalesce them if the LSNs
* are contiguous. Returns 1 if slots were freed, 0 if no slots were
* freed in the progress arg. Must be called with the log slot lock held.
*/
static WT_THREAD_RET
__log_wrlsn_server(void *arg)
int
__wt_log_wrlsn(WT_SESSION_IMPL *session, uint32_t *free_i, int *yield)
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_LOG *log;
WT_LOG_WRLSN_ENTRY written[WT_SLOT_POOL];
WT_LOGSLOT *slot;
WT_SESSION_IMPL *session;
WT_LOGSLOT *coalescing, *slot;
size_t written_i;
uint32_t i, save_i;
int yield;

session = arg;
conn = S2C(session);
log = conn->log;
yield = 0;
while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
/*
* No need to use the log_slot_lock because the slot pool
* is statically allocated and any slot in the
* WT_LOG_SLOT_WRITTEN state is exclusively ours for now.
*/
i = 0;
written_i = 0;
coalescing = NULL;
written_i = 0;
i = 0;
if (free_i != NULL)
*free_i = WT_SLOT_POOL;

/*
* Walk the array once saving any slots that are in the
* WT_LOG_SLOT_WRITTEN state.
*/
while (i < WT_SLOT_POOL) {
save_i = i;
slot = &log->slot_pool[i++];
if (free_i != NULL && *free_i == WT_SLOT_POOL &&
slot->slot_state == WT_LOG_SLOT_FREE)
*free_i = save_i;
if (slot->slot_state != WT_LOG_SLOT_WRITTEN)
continue;
written[written_i].slot_index = save_i;
written[written_i++].lsn = slot->slot_release_lsn;
}
/*
* If we found any written slots process them. We sort them
* based on the release LSN, and then look for them in order.
*/
if (written_i > 0) {
/*
* Walk the array once saving any slots that are in the
* WT_LOG_SLOT_WRITTEN state.
* If wanted, reset the yield variable to indicate that we
* have found written slots.
*/
while (i < WT_SLOT_POOL) {
save_i = i;
slot = &log->slot_pool[i++];
if (slot->slot_state != WT_LOG_SLOT_WRITTEN)
continue;
written[written_i].slot_index = save_i;
written[written_i++].lsn = slot->slot_release_lsn;
}
if (yield != NULL)
*yield = 0;
WT_INSERTION_SORT(written, written_i,
WT_LOG_WRLSN_ENTRY, WT_WRLSN_ENTRY_CMP_LT);

/*
* If we found any written slots process them. We sort them
* based on the release LSN, and then look for them in order.
* We know the written array is sorted by LSN. Go
* through them either advancing write_lsn or coalesce
* contiguous ranges of written slots.
*/
if (written_i > 0) {
yield = 0;
WT_INSERTION_SORT(written, written_i,
WT_LOG_WRLSN_ENTRY, WT_WRLSN_ENTRY_CMP_LT);

/*
* We know the written array is sorted by LSN. Go
* through them either advancing write_lsn or stop
* as soon as one is not in order.
*/
for (i = 0; i < written_i; i++) {
if (WT_LOG_CMP(&log->write_lsn,
&written[i].lsn) != 0)
break;
for (i = 0; i < written_i; i++) {
slot = &log->slot_pool[written[i].slot_index];
if (coalescing != NULL) {
if (WT_LOG_CMP(&coalescing->slot_end_lsn,
&written[i].lsn) != 0) {
coalescing = slot;
continue;
}
/*
* If we get here we have a slot to coalesce
* and free.
*/
coalescing->slot_end_lsn = slot->slot_end_lsn;
WT_STAT_FAST_CONN_INCR(
session, log_slot_coalesced);
/*
* Copy the flag for later closing.
*/
if (F_ISSET(slot, WT_SLOT_CLOSEFH))
F_SET(coalescing, WT_SLOT_CLOSEFH);
} else {
/*
* If this written slot is not the next LSN,
* try to start coalescing with later slots.
*/
if (WT_LOG_CMP(
&log->write_lsn, &written[i].lsn) != 0) {
coalescing = slot;
continue;
}
/*
* If we get here we have a slot to process.
* Advance the LSN and process the slot.
*/
slot = &log->slot_pool[written[i].slot_index];
WT_ASSERT(session, WT_LOG_CMP(&written[i].lsn,
&slot->slot_release_lsn) == 0);
log->write_start_lsn = slot->slot_start_lsn;
log->write_lsn = slot->slot_end_lsn;
WT_ERR(__wt_cond_signal(session,
log->log_write_cond));
WT_RET(__wt_cond_signal(
session, log->log_write_cond));
WT_STAT_FAST_CONN_INCR(session, log_write_lsn);

/*
* Signal the close thread if needed.
*/
if (F_ISSET(slot, WT_SLOT_CLOSEFH))
WT_ERR(__wt_cond_signal(session,
conn->log_file_cond));
WT_ERR(__wt_log_slot_free(session, slot));
WT_RET(__wt_cond_signal(
session, conn->log_file_cond));
}
WT_RET(__wt_log_slot_free(session, slot));
if (free_i != NULL && *free_i == WT_SLOT_POOL &&
slot->slot_state == WT_LOG_SLOT_FREE)
*free_i = save_i;
}
/*
* If we saw a later write, we always want to yield because
* we know something is in progress.
*/
if (yield++ < 1000)
}
return (0);
}

/*
* __log_wrlsn_server --
* The log wrlsn server thread.
*/
static WT_THREAD_RET
__log_wrlsn_server(void *arg)
{
WT_CONNECTION_IMPL *conn;
WT_DECL_RET;
WT_LOG *log;
WT_SESSION_IMPL *session;
int locked, yield;

session = arg;
conn = S2C(session);
log = conn->log;
locked = yield = 0;
while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) {
__wt_spin_lock(session, &log->log_slot_lock);
locked = 1;
WT_ERR(__wt_log_wrlsn(session, NULL, &yield));
locked = 0;
__wt_spin_unlock(session, &log->log_slot_lock);
if (++yield < 1000)
__wt_yield();
else
/* Wait until the next event. */
WT_ERR(__wt_cond_wait(session,
conn->log_wrlsn_cond, 100000));
}

if (0)
if (0) {
err: __wt_err(session, ret, "log wrlsn server error");
}
if (locked)
__wt_spin_unlock(session, &log->log_slot_lock);
return (WT_THREAD_RET_VALUE);
}

Expand Down
1 change: 1 addition & 0 deletions src/include/extern.h
Expand Up @@ -237,6 +237,7 @@ extern int __wt_conn_dhandle_discard(WT_SESSION_IMPL *session);
extern int __wt_connection_init(WT_CONNECTION_IMPL *conn);
extern int __wt_connection_destroy(WT_CONNECTION_IMPL *conn);
extern int __wt_log_truncate_files( WT_SESSION_IMPL *session, WT_CURSOR *cursor, const char *cfg[]);
extern int __wt_log_wrlsn(WT_SESSION_IMPL *session, uint32_t *free_i, int *yield);
extern int __wt_logmgr_create(WT_SESSION_IMPL *session, const char *cfg[]);
extern int __wt_logmgr_open(WT_SESSION_IMPL *session);
extern int __wt_logmgr_destroy(WT_SESSION_IMPL *session);
Expand Down
3 changes: 1 addition & 2 deletions src/include/log.h
Expand Up @@ -158,10 +158,9 @@ typedef struct {
*/
#define WT_SLOT_ACTIVE 1
#define WT_SLOT_POOL 128
uint32_t pool_index; /* Global pool index */
WT_LOGSLOT *slot_array[WT_SLOT_ACTIVE]; /* Active slots */
WT_LOGSLOT slot_pool[WT_SLOT_POOL]; /* Pool of all slots */
uint32_t slot_buf_size; /* Buffer size for slots */
wt_off_t slot_buf_size; /* Buffer size for slots */

#define WT_LOG_FORCE_CONSOLIDATE 0x01 /* Disable direct writes */
uint32_t flags;
Expand Down
1 change: 1 addition & 0 deletions src/include/stat.h
Expand Up @@ -221,6 +221,7 @@ struct __wt_connection_stats {
WT_STATS log_scan_rereads;
WT_STATS log_scans;
WT_STATS log_slot_closes;
WT_STATS log_slot_coalesced;
WT_STATS log_slot_consolidated;
WT_STATS log_slot_joins;
WT_STATS log_slot_races;
Expand Down

0 comments on commit b10bff9

Please sign in to comment.