Skip to content

Commit

Permalink
Fix #806: Reduce count of IPI during work_queue processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksostapenko committed Sep 28, 2018
1 parent 04829a4 commit 5577d2a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 22 deletions.
17 changes: 14 additions & 3 deletions tempesta_fw/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,7 @@ static void
tfw_cache_ipi(struct irq_work *work)
{
TfwWorkTasklet *ct = container_of(work, TfwWorkTasklet, ipi_work);

clear_bit(TFW_QUEUE_B_IPI, &ct->wq.flags);
tasklet_schedule(&ct->tasklet);
}

Expand Down Expand Up @@ -1726,7 +1726,6 @@ tfw_cache_process(TfwHttpMsg *msg, tfw_http_cache_cb_t action)
TFW_DBG2("Cache: schedule tasklet w/ work: to_cpu=%d from_cpu=%d"
" msg=%p key=%lx\n", cpu, smp_processor_id(),
cw.msg, key);

if (tfw_wq_push(&ct->wq, &cw, cpu, &ct->ipi_work, tfw_cache_ipi)) {
TFW_WARN("Cache work queue overrun: [%s]\n",
resp ? "response" : "request");
Expand All @@ -1743,10 +1742,22 @@ static void
tfw_wq_tasklet(unsigned long data)
{
TfwWorkTasklet *ct = (TfwWorkTasklet *)data;
TfwRBQueue *wq = &ct->wq;
TfwCWork cw;

while (!tfw_wq_pop(&ct->wq, &cw))
while (!tfw_wq_pop(wq, &cw))
tfw_cache_do_action(cw.msg, cw.action);

set_bit(TFW_QUEUE_B_IPI, &wq->flags);

smp_mb__after_atomic();

if (!tfw_wq_size(wq))
return;

clear_bit(TFW_QUEUE_B_IPI, &wq->flags);

tasklet_schedule(&ct->tasklet);
}

/**
Expand Down
35 changes: 26 additions & 9 deletions tempesta_fw/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ do { \
static void
ss_ipi(struct irq_work *work)
{
TfwRBQueue *wq = &per_cpu(si_wq, smp_processor_id());
clear_bit(TFW_QUEUE_B_IPI, &wq->flags);
raise_softirq(NET_TX_SOFTIRQ);
}

Expand All @@ -200,6 +202,7 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu)
{
struct irq_work *iw = &per_cpu(ipi_work, cpu);
SsCloseBacklog *cb = &per_cpu(close_backlog, cpu);
TfwRBQueue *wq = &per_cpu(si_wq, cpu);
SsCblNode *cn;

cn = kmem_cache_alloc(ss_cbacklog_cache, GFP_ATOMIC);
Expand All @@ -214,7 +217,8 @@ ss_turnstile_push(long ticket, SsWork *sw, int cpu)
cb->turn = ticket;
spin_unlock_bh(&cb->lock);

tfw_raise_softirq(cpu, iw, ss_ipi);
if (test_bit(TFW_QUEUE_B_IPI, &wq->flags))
tfw_raise_softirq(cpu, iw, ss_ipi);

return 0;
}
Expand Down Expand Up @@ -254,9 +258,8 @@ ss_wq_push(SsWork *sw, int cpu)
}

static int
ss_wq_pop(SsWork *sw, long *ticket)
ss_wq_pop(TfwRBQueue *wq, SsWork *sw, long *ticket)
{
TfwRBQueue *wq = this_cpu_ptr(&si_wq);
SsCloseBacklog *cb = this_cpu_ptr(&close_backlog);

/*
Expand Down Expand Up @@ -1308,17 +1311,18 @@ static void
ss_tx_action(void)
{
SsWork sw;
int budget;
struct sk_buff *skb;
TfwRBQueue *wq = this_cpu_ptr(&si_wq);
long ticket = 0;
int budget;

/*
* @budget limits the loop to prevent live lock on constantly arriving
* new items. We use some small integer as a lower bound to catch just
* ariving items.
*/
budget = max(10UL, ss_close_q_sz());
while ((!ss_active() || budget--) && !ss_wq_pop(&sw, &ticket)) {
while ((!ss_active() || budget--) && !ss_wq_pop(wq, &sw, &ticket)) {
struct sock *sk = sw.sk;

bh_lock_sock(sk);
Expand Down Expand Up @@ -1358,11 +1362,24 @@ ss_tx_action(void)

/*
* Rearm softirq for local CPU if there are more jobs to do.
* ss_synchronize() is responsible for raising the softirq if there are
* more jobs in the work queue or the backlog.
* If all jobs are finished, and work queue and backlog are
* empty, then enable IPI generation by producers (disabled
* in 'ss_ipi()' handler).
* ss_synchronize() is responsible for raising the softirq
* if there are more jobs in the work queue or the backlog.
*/
if (!budget)
raise_softirq(NET_TX_SOFTIRQ);
if (budget) {
set_bit(TFW_QUEUE_B_IPI, &wq->flags);

smp_mb__after_atomic();

if (!ss_close_q_sz())
return;

clear_bit(TFW_QUEUE_B_IPI, &wq->flags);
}

raise_softirq(NET_TX_SOFTIRQ);
}

/*
Expand Down
1 change: 1 addition & 0 deletions tempesta_fw/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tfw_wq_init(TfwRBQueue *q, int node)
q->last_head = 0;
atomic64_set(&q->head, 0);
atomic64_set(&q->tail, 0);
set_bit(TFW_QUEUE_B_IPI, &q->flags);

q->array = kmalloc_node(QSZ * WQ_ITEM_SZ, GFP_KERNEL, node);
if (!q->array) {
Expand Down
27 changes: 17 additions & 10 deletions tempesta_fw/work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,28 @@ typedef struct {
long last_head;
atomic64_t head ____cacheline_aligned;
atomic64_t tail ____cacheline_aligned;
unsigned long flags;
} TfwRBQueue;

enum {
/* Enable IPI generation. */
TFW_QUEUE_B_IPI = 0
};

int tfw_wq_init(TfwRBQueue *wq, int node);
void tfw_wq_destroy(TfwRBQueue *wq);
long __tfw_wq_push(TfwRBQueue *wq, void *ptr);
int tfw_wq_pop_ticket(TfwRBQueue *wq, void *buf, long *ticket);

static inline int
tfw_wq_size(TfwRBQueue *q)
{
long t = atomic64_read(&q->tail);
long h = atomic64_read(&q->head);

return t > h ? 0 : h - t;
}

static inline void
tfw_raise_softirq(int cpu, struct irq_work *work,
void (*local_cpu_cb)(struct irq_work *))
Expand All @@ -63,7 +78,8 @@ tfw_wq_push(TfwRBQueue *q, void *ptr, int cpu, struct irq_work *work,
if (unlikely(ticket))
return ticket;

tfw_raise_softirq(cpu, work, local_cpu_cb);
if (test_bit(TFW_QUEUE_B_IPI, &q->flags))
tfw_raise_softirq(cpu, work, local_cpu_cb);

return 0;
}
Expand All @@ -74,13 +90,4 @@ tfw_wq_pop(TfwRBQueue *wq, void *buf)
return tfw_wq_pop_ticket(wq, buf, NULL);
}

static inline int
tfw_wq_size(TfwRBQueue *q)
{
long t = atomic64_read(&q->tail);
long h = atomic64_read(&q->head);

return t > h ? 0 : h - t;
}

#endif /* __TFW_WORK_QUEUE_H__ */

0 comments on commit 5577d2a

Please sign in to comment.