Skip to content

Commit

Permalink
dpif-netdev: Remove PMD latency on seq_mutex
Browse files Browse the repository at this point in the history
The PMD thread needs to keep processing RX queues in order
to achieve maximum throughput. It also needs to sweep emc
cache and quiesce which use seq_mutex. That mutex can
eventually block the PMD thread causing latency spikes and
affecting the throughput.

Since there is no requirement for running those tasks at a
specific time, this patch extend seq API to allow tentative
locking instead.

Reported-by: Karl Rister <krister@redhat.com>
Co-authored-by: Karl Rister <krister@redhat.com>
Signed-off-by: Flavio Leitner <fbl@redhat.com>
Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>
  • Loading branch information
2 people authored and ddiproietto committed Jul 8, 2016
1 parent d66e4a5 commit 9dede5c
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 8 deletions.
5 changes: 3 additions & 2 deletions lib/dpif-netdev.c
Expand Up @@ -2871,9 +2871,10 @@ pmd_thread_main(void *f_)

lc = 0;

emc_cache_slow_sweep(&pmd->flow_cache);
coverage_try_clear();
ovsrcu_quiesce();
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(&pmd->flow_cache);
}

atomic_read_relaxed(&pmd->change_seq, &seq);
if (seq != port_seq) {
Expand Down
37 changes: 35 additions & 2 deletions lib/ovs-rcu.c
Expand Up @@ -15,6 +15,7 @@
*/

#include <config.h>
#include <errno.h>
#include "ovs-rcu.h"
#include "fatal-signal.h"
#include "guarded-list.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ static struct guarded_list flushed_cbsets;
static struct seq *flushed_cbsets_seq;

static void ovsrcu_init_module(void);
static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
static void ovsrcu_unregister__(struct ovsrcu_perthread *);
static bool ovsrcu_call_postponed(void);
Expand Down Expand Up @@ -151,6 +153,27 @@ ovsrcu_quiesce(void)
ovsrcu_quiesced();
}

int
ovsrcu_try_quiesce(void)
{
struct ovsrcu_perthread *perthread;
int ret = EBUSY;

ovs_assert(!single_threaded());
perthread = ovsrcu_perthread_get();
if (!seq_try_lock()) {
perthread->seqno = seq_read_protected(global_seqno);
if (perthread->cbset) {
ovsrcu_flush_cbset__(perthread, true);
}
seq_change_protected(global_seqno);
seq_unlock();
ovsrcu_quiesced();
ret = 0;
}
return ret;
}

bool
ovsrcu_is_quiescent(void)
{
Expand Down Expand Up @@ -292,18 +315,28 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
}

static void
ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
{
struct ovsrcu_cbset *cbset = perthread->cbset;

if (cbset) {
guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
perthread->cbset = NULL;

seq_change(flushed_cbsets_seq);
if (protected) {
seq_change_protected(flushed_cbsets_seq);
} else {
seq_change(flushed_cbsets_seq);
}
}
}

static void
ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
{
ovsrcu_flush_cbset__(perthread, false);
}

static void
ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
{
Expand Down
1 change: 1 addition & 0 deletions lib/ovs-rcu.h
Expand Up @@ -217,6 +217,7 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
void ovsrcu_quiesce_start(void);
void ovsrcu_quiesce_end(void);
void ovsrcu_quiesce(void);
int ovsrcu_try_quiesce(void);
bool ovsrcu_is_quiescent(void);

/* Synchronization. Waits for all non-quiescent threads to quiesce at least
Expand Down
49 changes: 45 additions & 4 deletions lib/seq.c
Expand Up @@ -100,20 +100,61 @@ seq_destroy(struct seq *seq)
ovs_mutex_unlock(&seq_mutex);
}

int
seq_try_lock(void)
{
return ovs_mutex_trylock(&seq_mutex);
}

void
seq_lock(void)
OVS_ACQUIRES(seq_mutex)
{
ovs_mutex_lock(&seq_mutex);
}

void
seq_unlock(void)
OVS_RELEASES(seq_mutex)
{
ovs_mutex_unlock(&seq_mutex);
}

/* Increments 'seq''s sequence number, waking up any threads that are waiting
* on 'seq'. */
void
seq_change(struct seq *seq)
OVS_EXCLUDED(seq_mutex)
seq_change_protected(struct seq *seq)
OVS_REQUIRES(seq_mutex)
{
COVERAGE_INC(seq_change);

ovs_mutex_lock(&seq_mutex);
seq->value = seq_next++;
seq_wake_waiters(seq);
}

/* Increments 'seq''s sequence number, waking up any threads that are waiting
* on 'seq'. */
void
seq_change(struct seq *seq)
OVS_EXCLUDED(seq_mutex)
{
ovs_mutex_lock(&seq_mutex);
seq_change_protected(seq);
ovs_mutex_unlock(&seq_mutex);
}

/* Returns 'seq''s current sequence number (which could change immediately).
*
* seq_read() and seq_wait() can be used together to yield a race-free wakeup
* when an object changes, even without an ability to lock the object. See
* Usage in seq.h for details. */
uint64_t
seq_read_protected(const struct seq *seq)
OVS_REQUIRES(seq_mutex)
{
return seq->value;
}

/* Returns 'seq''s current sequence number (which could change immediately).
*
* seq_read() and seq_wait() can be used together to yield a race-free wakeup
Expand All @@ -126,7 +167,7 @@ seq_read(const struct seq *seq)
uint64_t value;

ovs_mutex_lock(&seq_mutex);
value = seq->value;
value = seq_read_protected(seq);
ovs_mutex_unlock(&seq_mutex);

return value;
Expand Down
5 changes: 5 additions & 0 deletions lib/seq.h
Expand Up @@ -121,9 +121,14 @@
struct seq *seq_create(void);
void seq_destroy(struct seq *);
void seq_change(struct seq *);
void seq_change_protected(struct seq *);
void seq_lock(void);
int seq_try_lock(void);
void seq_unlock(void);

/* For observers. */
uint64_t seq_read(const struct seq *);
uint64_t seq_read_protected(const struct seq *);

void seq_wait_at(const struct seq *, uint64_t value, const char *where);
#define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)
Expand Down

0 comments on commit 9dede5c

Please sign in to comment.