Skip to content

Commit

Permalink
Use write-through writes instead windows FlushViewOfFile/FlushFileBuf…
Browse files Browse the repository at this point in the history
…fers for sync commits in Windows
  • Loading branch information
kriszyp committed Apr 30, 2019
1 parent dd49334 commit 7ff525a
Showing 1 changed file with 135 additions and 64 deletions.
199 changes: 135 additions & 64 deletions dependencies/lmdb/libraries/liblmdb/mdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,7 @@ struct MDB_env {
HANDLE me_fd; /**< The main data file */
HANDLE me_lfd; /**< The lock file */
HANDLE me_mfd; /**< For writing and syncing the meta pages */
HANDLE me_ovfd; /**< Overlapped/async with write-through file handle */
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
/** Some fields are initialized. */
Expand Down Expand Up @@ -2115,12 +2116,15 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
{
MDB_ID2 mid;
int rc, (*insert)(MDB_ID2L, MDB_ID2 *);

if (txn->mt_flags & MDB_TXN_WRITEMAP) {
#ifndef _WIN32 /* With Windows we always write dirty pages with WriteFile,
* so we always want them ordered, but otherwise, we just use
* msync, we don't need the ordering and just append */
if (txn->mt_flags & MDB_TXN_WRITEMAP)
insert = mdb_mid2l_append;
} else {
else
#endif
insert = mdb_mid2l_insert;
}
insert = mdb_mid2l_insert;

This comment has been minimized.

Copy link
@hyc

hyc Apr 30, 2019

This appears to be a duplicated line.

mid.mid = mp->mp_pgno;
mid.mptr = mp;
rc = insert(txn->mt_u.dirty_list, &mid);
Expand Down Expand Up @@ -2509,7 +2513,11 @@ mdb_env_sync(MDB_env *env, int force)
int rc = 0;
if (env->me_flags & MDB_RDONLY)
return EACCES;
if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
if (force
#ifndef _WIN32 /* Sync is normally achieved in Windows by doing WRITE_THROUGH writes */
|| !(env->me_flags & MDB_NOSYNC)
#endif
) {
if (env->me_flags & MDB_WRITEMAP) {
int flags = ((env->me_flags & MDB_MAPASYNC) && !force)
? MS_ASYNC : MS_SYNC;
Expand Down Expand Up @@ -3302,17 +3310,27 @@ mdb_page_flush(MDB_txn *txn, int keep)
pgno_t pgno = 0;
MDB_page *dp = NULL;
#ifdef _WIN32
OVERLAPPED ov;
OVERLAPPED* ov = malloc((pagecount - keep) * sizeof(OVERLAPPED));

This comment has been minimized.

Copy link
@hyc

hyc Apr 30, 2019

There should be a check for malloc failure.

Probably this ought to just be pre-allocated based on the maximum number of dirty pages a txn allows.

MDB_page *wdp;
int async_i = 0;
HANDLE fd = (env->me_flags & MDB_NOSYNC) ? env->me_fd : env->me_ovfd;
#else
struct iovec iov[MDB_COMMIT_PAGES];
HANDLE fd = env->me_fd;
#endif
ssize_t wpos = 0, wsize = 0, wres;
size_t next_pos = 1; /* impossible pos, so pos != next_pos */
int n = 0;
#endif

j = i = keep;

if (env->me_flags & MDB_WRITEMAP) {
if (env->me_flags & MDB_WRITEMAP
#ifdef _WIN32
/* In windows, we still do writes to the file (with write-through enabled in sync mode),
* as this is faster than FlushViewOfFile/FlushFileBuffers */
&& (env->me_flags & MDB_NOSYNC)
#endif
) {
/* Clear dirty flags */
while (++i <= pagecount) {
dp = dl[i].mptr;
Expand All @@ -3335,6 +3353,9 @@ mdb_page_flush(MDB_txn *txn, int keep)
if (dp->mp_flags & (P_LOOSE|P_KEEP)) {
dp->mp_flags &= ~P_KEEP;
dl[i].mid = 0;
#ifdef _WIN32
ov[i].hEvent = NULL; /* denote skipped page*/
#endif
continue;
}
pgno = dl[i].mid;
Expand All @@ -3344,46 +3365,59 @@ mdb_page_flush(MDB_txn *txn, int keep)
size = psize;
if (IS_OVERFLOW(dp)) size *= dp->mp_pages;
}
#ifdef _WIN32
else break;

/* Windows actually supports scatter/gather I/O, but only on
* unbuffered file handles. Since we're relying on the OS page
* cache for all our data, that's self-defeating. So we just
* write pages one at a time. We use the ov structure to set
* the write offset, to at least save the overhead of a Seek
* system call.
*/
DPRINTF(("committing page %"Z"u", pgno));
memset(&ov, 0, sizeof(ov));
ov.Offset = pos & 0xffffffff;
ov.OffsetHigh = pos >> 16 >> 16;
if (!WriteFile(env->me_fd, dp, size, NULL, &ov)) {
rc = ErrCode();
DPRINTF(("WriteFile: %d", rc));
return rc;
}
#else
/* Write up to MDB_COMMIT_PAGES dirty pages at a time. */
if (pos!=next_pos || n==MDB_COMMIT_PAGES || wsize+size>MAX_WRITE) {
if (pos!=next_pos || n==MDB_COMMIT_PAGES || wsize+size>MAX_WRITE
#ifdef _WIN32 /* If writemap is enabled, consecutive page positions infer
* contiguous (mapped) memory.
* Otherwise force write pages one at a time.
* Windows actually supports scatter/gather I/O, but only on
* unbuffered file handles. Since we're relying on the OS page
* cache for all our data, that's self-defeating. So we just
* write pages one at a time. We use the ov structure to set
* the write offset, to at least save the overhead of a Seek
* system call.
*/
|| !(env->me_flags & MDB_WRITEMAP)
#endif
) {
if (n) {
retry_write:
/* Write previous page(s) */
DPRINTF(("committing page %"Z"u", pgno));
#ifdef _WIN32
memset(&ov[async_i], 0, sizeof(OVERLAPPED));
ov[async_i].Offset = wpos & 0xffffffff;
ov[async_i].OffsetHigh = wpos >> 16 >> 16;
if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
HANDLE event = CreateEvent(NULL, TRUE, FALSE, NULL);

This comment has been minimized.

Copy link
@hyc

hyc Apr 30, 2019

There needs to be a check for failure here.

ov[async_i].hEvent = event;
}
if (!WriteFile(fd, wdp, wsize, NULL, &ov[async_i])) {
rc = ErrCode();
if (rc != ERROR_IO_PENDING) {
DPRINTF(("WriteFile: %d", rc));
return rc;
}
} else {
fprintf(stderr, "WriteFile returned, no need for overlap results\n");
}
async_i++;
#else
#ifdef MDB_USE_PWRITEV
wres = pwritev(env->me_fd, iov, n, wpos);
wres = pwritev(fd, iov, n, wpos);
#else
if (n == 1) {
wres = pwrite(env->me_fd, iov[0].iov_base, wsize, wpos);
wres = pwrite(fd, iov[0].iov_base, wsize, wpos);
} else {
retry_seek:
if (lseek(env->me_fd, wpos, SEEK_SET) == -1) {
if (lseek(fd, wpos, SEEK_SET) == -1) {
rc = ErrCode();
if (rc == EINTR)
goto retry_seek;
DPRINTF(("lseek: %s", strerror(rc)));
return rc;
}
wres = writev(env->me_fd, iov, n);
wres = writev(fd, iov, n);
}
#endif
if (wres != wsize) {
Expand All @@ -3398,37 +3432,65 @@ mdb_page_flush(MDB_txn *txn, int keep)
}
return rc;
}
#endif /* _WIN32 */
n = 0;
}
if (i > pagecount)
break;
wpos = pos;
wsize = 0;
#ifdef _WIN32
wdp = dp;
}
#else
}
DPRINTF(("committing page %"Z"u", pgno));
next_pos = pos + size;
iov[n].iov_len = size;
iov[n].iov_base = (char *)dp;
#endif /* _WIN32 */
DPRINTF(("committing page %"Z"u", pgno));
next_pos = pos + size;
wsize += size;
n++;
#endif /* _WIN32 */
}

/* MIPS has cache coherency issues, this is a no-op everywhere else
* Note: for any size >= on-chip cache size, entire on-chip cache is
* flushed.
*/
CACHEFLUSH(env->me_map, txn->mt_next_pgno * env->me_psize, DCACHE);
#ifdef _WIN32
if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
/* Now wait for all the asynchronous/overlapped sync/write-through writes to complete.
* We start with the last one so that all the others should already be complete and
* we reduce thread suspend/resuming */
while (--async_i >= 0) {
if (ov[async_i].hEvent) {
if (!GetOverlappedResult(fd, &ov[async_i], &wres, TRUE)) {
CloseHandle(ov[async_i].hEvent);
rc = ErrCode();
return rc;
}
CloseHandle(ov[async_i].hEvent);
}
}

for (i = keep; ++i <= pagecount; ) {
dp = dl[i].mptr;
/* This is a page we skipped above */
if (!dl[i].mid) {
dl[++j] = dl[i];
dl[j].mid = dp->mp_pgno;
continue;
}
free(ov);
#endif /* _WIN32 */

if (!(env->me_flags & MDB_WRITEMAP)) {
/* Don't free pages when using writemap (can only get here in NOSYNC mode in Windows)
* MIPS has cache coherency issues, this is a no-op everywhere else
* Note: for any size >= on-chip cache size, entire on-chip cache is
* flushed.
*/
CACHEFLUSH(env->me_map, txn->mt_next_pgno * env->me_psize, DCACHE);

for (i = keep; ++i <= pagecount; ) {
dp = dl[i].mptr;
/* This is a page we skipped above */
if (!dl[i].mid) {
dl[++j] = dl[i];
dl[j].mid = dp->mp_pgno;
continue;
}
mdb_dpage_free(env, dp);
}
mdb_dpage_free(env, dp);
}

done:
Expand Down Expand Up @@ -3771,7 +3833,6 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
if (len == -1 && ErrCode() == EINTR) continue; \
rc = (len >= 0); break; } while(1)
#endif

DPUTS("writing new meta page");

psize = env->me_psize;
Expand Down Expand Up @@ -3833,6 +3894,7 @@ mdb_env_write_meta(MDB_txn *txn)
if (mapsize < env->me_mapsize)
mapsize = env->me_mapsize;

#ifndef _WIN32 /* We don't want to ever use MSYNC/FlushViewOfFile in Windows */
if (flags & MDB_WRITEMAP) {
mp->mm_mapsize = mapsize;
mp->mm_dbs[FREE_DBI] = txn->mt_dbs[FREE_DBI];
Expand All @@ -3848,18 +3910,14 @@ mdb_env_write_meta(MDB_txn *txn)
unsigned meta_size = env->me_psize;
rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC;
ptr = (char *)mp - PAGEHDRSZ;
#ifndef _WIN32 /* POSIX msync() requires ptr = start of OS page */
r2 = (ptr - env->me_map) & (env->me_os_psize - 1);
ptr -= r2;
meta_size += r2;
#endif
if (MDB_MSYNC(ptr, meta_size, rc)) {
rc = ErrCode();
goto fail;
}
}
goto done;
}
#endif
metab.mm_txnid = mp->mm_txnid;
metab.mm_last_pg = mp->mm_last_pg;

Expand Down Expand Up @@ -4201,7 +4259,7 @@ mdb_fname_init(const char *path, unsigned envflags, MDB_name *fname)
/** File type, access mode etc. for #mdb_fopen() */
enum mdb_fopen_type {
#ifdef _WIN32
MDB_O_RDONLY, MDB_O_RDWR, MDB_O_META, MDB_O_COPY, MDB_O_LOCKS
MDB_O_RDONLY, MDB_O_RDWR, MDB_O_OVERLAPPED, MDB_O_META, MDB_O_COPY, MDB_O_LOCKS
#else
/* A comment in mdb_fopen() explains some O_* flag choices. */
MDB_O_RDONLY= O_RDONLY, /**< for RDONLY me_fd */
Expand Down Expand Up @@ -4262,6 +4320,11 @@ mdb_fopen(const MDB_env *env, MDB_name *fname,
disp = OPEN_ALWAYS;
attrs = FILE_ATTRIBUTE_NORMAL;
switch (which) {
case MDB_O_OVERLAPPED: /* for unbuffered asynchronous writes (write-through mode)*/
acc = GENERIC_WRITE;
disp = OPEN_EXISTING;
attrs = FILE_FLAG_OVERLAPPED|FILE_FLAG_WRITE_THROUGH;
break;
case MDB_O_RDONLY: /* read-only datafile */
acc = GENERIC_READ;
disp = OPEN_EXISTING;
Expand Down Expand Up @@ -4998,6 +5061,11 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
mode, &env->me_fd);
if (rc)
goto leave;
#ifdef _WIN32
rc = mdb_fopen(env, &fname, MDB_O_OVERLAPPED, mode, &env->me_ovfd);
if (rc)
goto leave;
#endif

if ((flags & (MDB_RDONLY|MDB_NOLOCK)) == MDB_RDONLY) {
rc = mdb_env_setup_locks(env, &fname, mode, &excl);
Expand All @@ -5006,14 +5074,13 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
}

if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) {
if (!(flags & (MDB_RDONLY|MDB_WRITEMAP))) {
/* Synchronous fd for meta writes. Needed even with
* MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset.
*/
rc = mdb_fopen(env, &fname, MDB_O_META, mode, &env->me_mfd);
if (rc)
goto leave;
}
/* Synchronous fd for meta writes. Needed even with
* MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset.
*/
rc = mdb_fopen(env, &fname, MDB_O_META, mode, &env->me_mfd);
if (rc)
goto leave;

DPRINTF(("opened dbenv %p", (void *) env));
if (excl > 0) {
rc = mdb_env_share_locks(env, &excl);
Expand Down Expand Up @@ -5091,6 +5158,10 @@ mdb_env_close0(MDB_env *env, int excl)
}
if (env->me_mfd != INVALID_HANDLE_VALUE)
(void) close(env->me_mfd);
#ifdef _WIN32
if (env->me_ovfd != INVALID_HANDLE_VALUE)
(void) close(env->me_ovfd);
#endif
if (env->me_fd != INVALID_HANDLE_VALUE)
(void) close(env->me_fd);
if (env->me_txns) {
Expand Down

0 comments on commit 7ff525a

Please sign in to comment.