Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable OF-compatible queue handling #79

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mk/make_dpdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ edit_dpdk_config CONFIG_RTE_LIBRTE_PMD_PCAP=n $NEWCONFIG
edit_dpdk_config CONFIG_RTE_APP_TEST=n $NEWCONFIG
edit_dpdk_config CONFIG_RTE_TEST_PMD=n $NEWCONFIG
edit_dpdk_config CONFIG_RTE_IXGBE_INC_VECTOR=n $NEWCONFIG
edit_dpdk_config CONFIG_RTE_SCHED_COLLECT_STATS=y $NEWCONFIG
edit_dpdk_config CONFIG_RTE_SCHED_RED=y $NEWCONFIG

Copy link

@ynkjm ynkjm May 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default in DPDK, RED-marked packet is not dropped so the following command seems necessary.

eidt_dpdk_config  CONFIG_RTE_SCHED_RED=y $NEWCONFIG

${MAKE} T=${NEW_TARGET} config && ${MAKE} && \
RTE_SDK="${TOPDIR}/${DPDKDIR}" \
Expand Down
24 changes: 19 additions & 5 deletions src/agent/ofp_queue_config_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ s_queue_prop_encode(struct pbuf *pbuf,
res = LAGOPUS_RESULT_OUT_OF_RANGE;
break;
}

return res;
}

Expand Down Expand Up @@ -176,28 +177,41 @@ ofp_queue_get_config_reply_create(struct channel *channel,
struct ofp_header *xid_header) {
lagopus_result_t res = LAGOPUS_RESULT_ANY_FAILURES;
struct ofp_queue_get_config_reply reply;
const size_t packet_size = sizeof(struct ofp_queue_get_config_reply);

/* check params */
if (channel != NULL && pbuf != NULL &&
packet_queue_list != NULL && xid_header != NULL) {
/* alloc pbuf */
*pbuf = channel_pbuf_list_get(channel, packet_size);
*pbuf = channel_pbuf_list_get(channel, OFP_PACKET_MAX_SIZE);
pbuf_plen_set(*pbuf, pbuf_size_get(*pbuf));

if (*pbuf != NULL) {
/* set data. */
memset(&reply, 0, packet_size);
memset(&reply, 0, sizeof(reply));
ofp_header_set(&reply.header,
channel_version_get(channel),
OFPT_QUEUE_GET_CONFIG_REPLY,
(uint16_t)packet_size,
0, // length set in ofp_header_length_set()
xid_header->xid);
reply.port = port;

/* Encode message. */
pbuf_plen_set(*pbuf, packet_size);
res = ofp_queue_get_config_reply_encode(*pbuf, &reply);
if (res >= 0) {
res = s_packet_queue_list_encode(*pbuf, packet_queue_list);
if (res == LAGOPUS_RESULT_OK) {
uint16_t length = 0;
res = pbuf_length_get(*pbuf, &length);
if (res == LAGOPUS_RESULT_OK) {
res = ofp_header_length_set(*pbuf, length);
} else {
lagopus_msg_warning("FAILED (%s).\n",
lagopus_error_get_string(res));
}
} else {
lagopus_msg_warning("FAILED (%s).\n",
lagopus_error_get_string(res));
}
} else {
lagopus_msg_warning("FAILED : ofp_queue_get_config_reply_encode (%s).\n",
lagopus_error_get_string(res));
Expand Down
80 changes: 50 additions & 30 deletions src/dataplane/dpdk/dpdk_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,41 @@ app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers) {
}
}

/**
* Move packets through the qos scheduler if any configured.
* Return the number of packets ready to be transmitted.
*/
static inline uint32_t
schedule_tx_packets(struct interface *ifp, struct app_mbuf_array *outbufs) {
struct lagopus_packet *pkt;
struct rte_mbuf *mbuf;
uint32_t qidx, color;

if (ifp == NULL || ifp->sched_port == NULL) {
return outbufs->n_mbufs;
}

for (uint32_t i = 0; i < outbufs->n_mbufs; ++i) {
mbuf = outbufs->array[i];
pkt = (struct lagopus_packet*)(mbuf->buf_addr + APP_DEFAULT_MBUF_LOCALDATA_OFFSET);
qidx = (uint32_t)dpdk_interface_queue_id_to_index(ifp, pkt->queue_id);
color = rte_meter_trtcm_color_blind_check(&ifp->ifqueue.meters[qidx], rte_rdtsc(), OS_M_PKTLEN(mbuf));
rte_sched_port_pkt_write(mbuf, 0, 0, qidx, 0, color);
}

// TODO: rethink TX processing to enable appropriate scheduling.
// Under a heavy load we should dequeue less packets than inserted:
// otherwise we will just move low priority packets to the back of the burst
// and will provide no advantage to high priority packets of the next burst.

rte_sched_port_enqueue(ifp->sched_port, outbufs->array, outbufs->n_mbufs);
outbufs->n_mbufs = (uint32_t)rte_sched_port_dequeue(
ifp->sched_port, outbufs->array, APP_MBUF_ARRAY_SIZE
);

return outbufs->n_mbufs;
}

/**
* Dequeue mbufs from output queue and send to ethernet port.
* This function is called from I/O (Output) thread.
Expand Down Expand Up @@ -535,31 +570,11 @@ app_lcore_io_tx(struct app_lcore_params_io *lp,
lp->tx.mbuf_out_flush[port] = 1;
continue;
}

ifp = dpdk_interface_lookup(port);
if (ifp != NULL && ifp->sched_port != NULL) {
struct lagopus_packet *pkt;
struct rte_mbuf *m;
int qidx, color;

for (i = 0; i < n_mbufs; i++) {
m = lp->tx.mbuf_out[port].array[i];
pkt = (struct lagopus_packet *)
(m->buf_addr + APP_DEFAULT_MBUF_LOCALDATA_OFFSET);
if (unlikely(pkt->queue_id != 0)) {
qidx = dpdk_interface_queue_id_to_index(ifp, pkt->queue_id);
color = rte_meter_trtcm_color_blind_check(&ifp->ifqueue.meters[qidx],
rte_rdtsc(),
OS_M_PKTLEN(m));
rte_sched_port_pkt_write(m, 0, 0, 0, qidx, color);
}
}
n_mbufs = rte_sched_port_enqueue(ifp->sched_port,
lp->tx.mbuf_out[port].array,
n_mbufs);
n_mbufs = rte_sched_port_dequeue(ifp->sched_port,
lp->tx.mbuf_out[port].array,
n_mbufs);
}
lp->tx.mbuf_out[port].n_mbufs = n_mbufs;
n_mbufs = schedule_tx_packets(ifp, &lp->tx.mbuf_out[port]);

DPRINTF("send %d pkts\n", n_mbufs);
n_pkts = rte_eth_tx_burst(port,
0,
Expand Down Expand Up @@ -601,24 +616,29 @@ app_lcore_io_tx_flush(struct app_lcore_params_io *lp, __UNUSED void *arg) {
uint8_t portid, i;

for (i = 0; i < lp->tx.n_nic_ports; i++) {
uint32_t n_pkts;
struct interface *ifp;
uint32_t n_mbufs, n_pkts;

portid = lp->tx.nic_ports[i];
if (likely((lp->tx.mbuf_out_flush[portid] == 0) ||
(lp->tx.mbuf_out[portid].n_mbufs == 0))) {
continue;
}

DPRINTF("flush: send %d pkts\n", lp->tx.mbuf_out[portid].n_mbufs);
ifp = dpdk_interface_lookup(portid);
n_mbufs = schedule_tx_packets(ifp, &lp->tx.mbuf_out[portid]);

DPRINTF("flush: send %d pkts\n", n_mbufs);
n_pkts = rte_eth_tx_burst(portid,
0,
lp->tx.mbuf_out[portid].array,
(uint16_t)lp->tx.mbuf_out[portid].n_mbufs);
DPRINTF("flus: sent %d pkts\n", n_pkts);
(uint16_t)n_mbufs);

DPRINTF("flush: sent %d pkts\n", n_pkts);

if (unlikely(n_pkts < lp->tx.mbuf_out[portid].n_mbufs)) {
if (unlikely(n_pkts < n_mbufs)) {
uint32_t k;
for (k = n_pkts; k < lp->tx.mbuf_out[portid].n_mbufs; k ++) {
for (k = n_pkts; k < n_mbufs; k ++) {
struct rte_mbuf *pkt_to_free = lp->tx.mbuf_out[portid].array[k];
rte_pktmbuf_free(pkt_to_free);
}
Expand Down
Loading