Permalink
Browse files

journal: asynchronous journal_file_set_offline()

This adds a wait flag to journal_file_set_offline(), when false the offline is
performed asynchronously in a separate thread.

When wait is true, if an asynchronous offline is already in-progress it is
restarted and waited for.  Otherwise the offline is performed synchronously
without the use of a thread.

journal_file_set_online() cancels or waits for the asynchronous offline to
complete if in-flight, depending on where in the offline process the thread
happens to be.  If the thread is in the fsync() phase, it is cancelled and
waiting is unnecessary.  Otherwise, the thread is joined before proceeding.

A new offline_state member is added to JournalFile which is used via
atomic operations for communicating between the offline thread and the
journal_file_set_{offline,online}() functions.
  • Loading branch information...
vcaputo committed Feb 12, 2016
1 parent 69a3a6f commit ac2e41f5103ce2c679089c4f8fb6be61d7caec07
Showing with 208 additions and 24 deletions.
  1. +192 −21 src/journal/journal-file.c
  2. +14 −1 src/journal/journal-file.h
  3. +2 −2 src/journal/journald-server.c
@@ -20,6 +20,7 @@
#include <errno.h>
#include <fcntl.h>
#include <linux/fs.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/mman.h>
#include <sys/statvfs.h>
@@ -86,33 +87,127 @@
/* The mmap context to use for the header we pick as one above the last defined typed */
#define CONTEXT_HEADER _OBJECT_TYPE_MAX

static int journal_file_set_online(JournalFile *f) {
/* This may be called from a separate thread to prevent blocking the caller for the duration of fsync().
* As a result we use atomic operations on f->offline_state for inter-thread communications with
* journal_file_set_offline() and journal_file_set_online(). */
static void journal_file_set_offline_internal(JournalFile *f) {
assert(f);
assert(f->fd >= 0);
assert(f->header);

if (!f->writable)
return -EPERM;
for (;;) {
switch (f->offline_state) {
case OFFLINE_CANCEL:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_DONE))
continue;
return;

if (!(f->fd >= 0 && f->header))
return -EINVAL;
case OFFLINE_AGAIN_FROM_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_SYNCING))
continue;
break;

case OFFLINE_AGAIN_FROM_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_SYNCING))
continue;
break;

case OFFLINE_SYNCING:
(void) fsync(f->fd);

if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_OFFLINING))
continue;

f->header->state = STATE_OFFLINE;
(void) fsync(f->fd);
break;

case OFFLINE_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_DONE))
continue;
/* fall through */

case OFFLINE_DONE:
return;

case OFFLINE_JOINED:
log_debug("OFFLINE_JOINED unexpected offline state for journal_file_set_offline_internal()");
return;
}
}
}

static void * journal_file_set_offline_thread(void *arg) {
JournalFile *f = arg;

journal_file_set_offline_internal(f);

return NULL;
}

static int journal_file_set_offline_thread_join(JournalFile *f) {
int r;

assert(f);

if (f->offline_state == OFFLINE_JOINED)
return 0;

r = pthread_join(f->offline_thread, NULL);
if (r)
return -r;

f->offline_state = OFFLINE_JOINED;

if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;

switch (f->header->state) {
case STATE_ONLINE:
return 0;
return 0;
}

case STATE_OFFLINE:
f->header->state = STATE_ONLINE;
(void) fsync(f->fd);
return 0;
/* Trigger a restart if the offline thread is mid-flight in a restartable state. */
static bool journal_file_set_offline_try_restart(JournalFile *f) {
for (;;) {
switch (f->offline_state) {
case OFFLINE_AGAIN_FROM_SYNCING:
case OFFLINE_AGAIN_FROM_OFFLINING:
return true;

case OFFLINE_CANCEL:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_CANCEL, OFFLINE_AGAIN_FROM_SYNCING))
continue;
return true;

case OFFLINE_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_AGAIN_FROM_SYNCING))
continue;
return true;

case OFFLINE_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_OFFLINING, OFFLINE_AGAIN_FROM_OFFLINING))
continue;
return true;

default:
return -EINVAL;
return false;
}
}
}

int journal_file_set_offline(JournalFile *f) {
/* Sets a journal offline.
*
* If wait is false then an offline is dispatched in a separate thread for a
* subsequent journal_file_set_offline() or journal_file_set_online() of the
* same journal to synchronize with.
*
* If wait is true, then either an existing offline thread will be restarted
* and joined, or if none exists the offline is simply performed in this
* context without involving another thread.
*/
int journal_file_set_offline(JournalFile *f, bool wait) {
bool restarted;
int r;

assert(f);

if (!f->writable)
@@ -124,19 +219,95 @@ int journal_file_set_offline(JournalFile *f) {
if (f->header->state != STATE_ONLINE)
return 0;

(void) fsync(f->fd);
/* Restart an in-flight offline thread and wait if needed, or join a lingering done one. */
restarted = journal_file_set_offline_try_restart(f);
if ((restarted && wait) || !restarted) {
r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;
}

if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;
if (restarted)
return 0;

/* Initiate a new offline. */
f->offline_state = OFFLINE_SYNCING;

if (wait) /* Without using a thread if waiting. */
journal_file_set_offline_internal(f);
else {
r = pthread_create(&f->offline_thread, NULL, journal_file_set_offline_thread, f);
if (r > 0)
return -r;
}

return 0;
}

static int journal_file_set_online(JournalFile *f) {
bool joined = false;

assert(f);

if (!f->writable)
return -EPERM;

if (!(f->fd >= 0 && f->header))
return -EINVAL;

f->header->state = STATE_OFFLINE;
while (!joined) {
switch (f->offline_state) {
case OFFLINE_JOINED:
/* No offline thread, no need to wait. */
joined = true;
break;

case OFFLINE_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_SYNCING, OFFLINE_CANCEL))
continue;
/* Canceled syncing prior to offlining, no need to wait. */
break;

case OFFLINE_AGAIN_FROM_SYNCING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_SYNCING, OFFLINE_CANCEL))
continue;
/* Canceled restart from syncing, no need to wait. */
break;

case OFFLINE_AGAIN_FROM_OFFLINING:
if (!__sync_bool_compare_and_swap(&f->offline_state, OFFLINE_AGAIN_FROM_OFFLINING, OFFLINE_CANCEL))
continue;
/* Canceled restart from offlining, must wait for offlining to complete however. */

/* fall through to wait */
default: {
int r;

r = journal_file_set_offline_thread_join(f);
if (r < 0)
return r;

joined = true;
break;
}
}
}

if (mmap_cache_got_sigbus(f->mmap, f->fd))
return -EIO;

(void) fsync(f->fd);
switch (f->header->state) {
case STATE_ONLINE:
return 0;

return 0;
case STATE_OFFLINE:
f->header->state = STATE_ONLINE;
(void) fsync(f->fd);
return 0;

default:
return -EINVAL;
}
}

JournalFile* journal_file_close(JournalFile *f) {
@@ -159,7 +330,7 @@ JournalFile* journal_file_close(JournalFile *f) {
sd_event_source_unref(f->post_change_timer);
}

journal_file_set_offline(f);
journal_file_set_offline(f, true);

if (f->mmap && f->fd >= 0)
mmap_cache_close_fd(f->mmap, f->fd);
@@ -63,6 +63,16 @@ typedef enum LocationType {
LOCATION_SEEK
} LocationType;

typedef enum OfflineState {
OFFLINE_JOINED,
OFFLINE_SYNCING,
OFFLINE_OFFLINING,
OFFLINE_CANCEL,
OFFLINE_AGAIN_FROM_SYNCING,
OFFLINE_AGAIN_FROM_OFFLINING,
OFFLINE_DONE
} OfflineState;

typedef struct JournalFile {
int fd;

@@ -105,6 +115,9 @@ typedef struct JournalFile {

OrderedHashmap *chain_cache;

pthread_t offline_thread;
volatile OfflineState offline_state;

#if defined(HAVE_XZ) || defined(HAVE_LZ4)
void *compress_buffer;
size_t compress_buffer_size;
@@ -139,7 +152,7 @@ int journal_file_open(
JournalFile *template,
JournalFile **ret);

int journal_file_set_offline(JournalFile *f);
int journal_file_set_offline(JournalFile *f, bool wait);
JournalFile* journal_file_close(JournalFile *j);

int journal_file_open_reliably(
@@ -372,13 +372,13 @@ void server_sync(Server *s) {
int r;

if (s->system_journal) {
r = journal_file_set_offline(s->system_journal);
r = journal_file_set_offline(s->system_journal, false);
if (r < 0)
log_warning_errno(r, "Failed to sync system journal, ignoring: %m");
}

ORDERED_HASHMAP_FOREACH(f, s->user_journals, i) {
r = journal_file_set_offline(f);
r = journal_file_set_offline(f, false);
if (r < 0)
log_warning_errno(r, "Failed to sync user journal, ignoring: %m");
}

0 comments on commit ac2e41f

Please sign in to comment.