Skip to content

Commit

Permalink
eventdev: fix inconsistency in queue config
Browse files Browse the repository at this point in the history
With the current scheme of event queue configuration the cfg schedule
type macros (RTE_EVENT_QUEUE_CFG_*_ONLY) are inconsistent with the
event schedule type (RTE_SCHED_TYPE_*) this requires unnecessary
conversion between the fastpath and slowpath API's while scheduling
events or configuring event queues.

This patch aims to fix such inconsistency by using event schedule
types (RTE_SCHED_TYPE_*) for event queue configuration.

This patch also fixes example/eventdev_pipeline_sw_pmd as it doesn't
convert RTE_EVENT_QUEUE_CFG_*_ONLY to RTE_SCHED_TYPE_* which leads to
improper events being enqueued to the eventdev.

Fixes: adb5d54 ("examples/eventdev_pipeline_sw_pmd: add sample app")

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
  • Loading branch information
PavanNikhilesh authored and tmonjalo committed Oct 26, 2017
1 parent bc0d4e6 commit 13370a3
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 121 deletions.
21 changes: 0 additions & 21 deletions app/test-eventdev/evt_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,4 @@ evt_has_all_types_queue(uint8_t dev_id)
true : false;
}

static inline uint32_t
evt_sched_type2queue_cfg(uint8_t sched_type)
{
uint32_t ret;

switch (sched_type) {
case RTE_SCHED_TYPE_ATOMIC:
ret = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
break;
case RTE_SCHED_TYPE_ORDERED:
ret = RTE_EVENT_QUEUE_CFG_ORDERED_ONLY;
break;
case RTE_SCHED_TYPE_PARALLEL:
ret = RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY;
break;
default:
rte_panic("Invalid sched_type %d\n", sched_type);
}
return ret;
}

#endif /* _EVT_COMMON_*/
4 changes: 2 additions & 2 deletions app/test-eventdev/test_order_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
/* q0 (ordered queue) configuration */
struct rte_event_queue_conf q0_ordered_conf = {
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ORDERED_ONLY,
.schedule_type = RTE_SCHED_TYPE_ORDERED,
.nb_atomic_flows = opt->nb_flows,
.nb_atomic_order_sequences = opt->nb_flows,
};
Expand All @@ -177,7 +177,7 @@ order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
/* q1 (atomic queue) configuration */
struct rte_event_queue_conf q1_atomic_conf = {
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY,
.schedule_type = RTE_SCHED_TYPE_ATOMIC,
.nb_atomic_flows = opt->nb_flows,
.nb_atomic_order_sequences = opt->nb_flows,
};
Expand Down
4 changes: 2 additions & 2 deletions app/test-eventdev/test_perf_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ perf_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
};
/* queue configurations */
for (queue = 0; queue < perf_queue_nb_event_queues(opt); queue++) {
q_conf.event_queue_cfg = evt_sched_type2queue_cfg
(opt->sched_type_list[queue % nb_stages]);
q_conf.schedule_type =
(opt->sched_type_list[queue % nb_stages]);

if (opt->q_priority) {
uint8_t stage_pos = queue % nb_stages;
Expand Down
4 changes: 2 additions & 2 deletions drivers/event/dpaa2/dpaa2_eventdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ dpaa2_eventdev_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
RTE_SET_USED(queue_conf);

queue_conf->nb_atomic_flows = DPAA2_EVENT_QUEUE_ATOMIC_FLOWS;
queue_conf->event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY;
queue_conf->schedule_type = RTE_SCHED_TYPE_ATOMIC |
RTE_SCHED_TYPE_PARALLEL;
queue_conf->priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
}

Expand Down
28 changes: 7 additions & 21 deletions drivers/event/sw/sw_evdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,14 @@ sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
{
int type;

/* SINGLE_LINK can be OR-ed with other types, so handle first */
type = conf->schedule_type;

if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) {
type = SW_SCHED_TYPE_DIRECT;
} else {
switch (conf->event_queue_cfg) {
case RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY:
type = RTE_SCHED_TYPE_ATOMIC;
break;
case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
type = RTE_SCHED_TYPE_ORDERED;
break;
case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
type = RTE_SCHED_TYPE_PARALLEL;
break;
case RTE_EVENT_QUEUE_CFG_ALL_TYPES:
SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
return -ENOTSUP;
default:
SW_LOG_ERR("Unknown queue type %d requested\n",
conf->event_queue_cfg);
return -EINVAL;
}
} else if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
& conf->event_queue_cfg) {
SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n");
return -ENOTSUP;
}

struct sw_evdev *sw = sw_pmd_priv(dev);
Expand Down Expand Up @@ -400,7 +386,7 @@ sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id,
static const struct rte_event_queue_conf default_conf = {
.nb_atomic_flows = 4096,
.nb_atomic_order_sequences = 1,
.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY,
.schedule_type = RTE_SCHED_TYPE_ATOMIC,
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
};

Expand Down
18 changes: 9 additions & 9 deletions examples/eventdev_pipeline_sw_pmd/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct config_data {
static struct config_data cdata = {
.num_packets = (1L << 25), /* do ~32M packets */
.num_fids = 512,
.queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY,
.queue_type = RTE_SCHED_TYPE_ATOMIC,
.next_qid = {-1},
.qid = {-1},
.num_stages = 1,
Expand Down Expand Up @@ -490,10 +490,10 @@ parse_app_args(int argc, char **argv)
cdata.enable_queue_priorities = 1;
break;
case 'o':
cdata.queue_type = RTE_EVENT_QUEUE_CFG_ORDERED_ONLY;
cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
break;
case 'p':
cdata.queue_type = RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY;
cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
break;
case 'q':
cdata.quiet = 1;
Expand Down Expand Up @@ -684,7 +684,7 @@ setup_eventdev(struct prod_data *prod_data,
.new_event_threshold = 4096,
};
struct rte_event_queue_conf wkr_q_conf = {
.event_queue_cfg = cdata.queue_type,
.schedule_type = cdata.queue_type,
.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
.nb_atomic_flows = 1024,
.nb_atomic_order_sequences = 1024,
Expand Down Expand Up @@ -751,11 +751,11 @@ setup_eventdev(struct prod_data *prod_data,
}

const char *type_str = "Atomic";
switch (wkr_q_conf.event_queue_cfg) {
case RTE_EVENT_QUEUE_CFG_ORDERED_ONLY:
switch (wkr_q_conf.schedule_type) {
case RTE_SCHED_TYPE_ORDERED:
type_str = "Ordered";
break;
case RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY:
case RTE_SCHED_TYPE_PARALLEL:
type_str = "Parallel";
break;
}
Expand Down Expand Up @@ -907,9 +907,9 @@ main(int argc, char **argv)
printf("\tworkers: %u\n", cdata.num_workers);
printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
if (cdata.queue_type == RTE_EVENT_QUEUE_CFG_ORDERED_ONLY)
if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
printf("\tqid0 type: ordered\n");
if (cdata.queue_type == RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY)
if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
printf("\tqid0 type: atomic\n");
printf("\tCores available: %u\n", rte_lcore_count());
printf("\tCores used: %u\n", cores_needed);
Expand Down
20 changes: 8 additions & 12 deletions lib/librte_eventdev/rte_eventdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,11 @@ is_valid_atomic_queue_conf(const struct rte_event_queue_conf *queue_conf)
{
if (queue_conf &&
!(queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_SINGLE_LINK) && (
RTE_EVENT_QUEUE_CFG_SINGLE_LINK) &&
((queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_TYPE_MASK)
== RTE_EVENT_QUEUE_CFG_ALL_TYPES) ||
((queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_TYPE_MASK)
== RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY)
RTE_EVENT_QUEUE_CFG_ALL_TYPES) ||
(queue_conf->schedule_type
== RTE_SCHED_TYPE_ATOMIC)
))
return 1;
else
Expand All @@ -535,13 +533,11 @@ is_valid_ordered_queue_conf(const struct rte_event_queue_conf *queue_conf)
{
if (queue_conf &&
!(queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_SINGLE_LINK) && (
((queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_TYPE_MASK)
== RTE_EVENT_QUEUE_CFG_ALL_TYPES) ||
RTE_EVENT_QUEUE_CFG_SINGLE_LINK) &&
((queue_conf->event_queue_cfg &
RTE_EVENT_QUEUE_CFG_TYPE_MASK)
== RTE_EVENT_QUEUE_CFG_ORDERED_ONLY)
RTE_EVENT_QUEUE_CFG_ALL_TYPES) ||
(queue_conf->schedule_type
== RTE_SCHED_TYPE_ORDERED)
))
return 1;
else
Expand Down
54 changes: 17 additions & 37 deletions lib/librte_eventdev/rte_eventdev.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to include rte_mbuf.h */
#define RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES (1ULL << 3)
/**< Event device is capable of enqueuing events of any type to any queue.
* If this capability is not set, the queue only supports events of the
* *RTE_EVENT_QUEUE_CFG_* type that it was created with.
* *RTE_SCHED_TYPE_* type that it was created with.
*
* @see RTE_EVENT_QUEUE_CFG_* values
* @see RTE_SCHED_TYPE_* values
*/
#define RTE_EVENT_DEV_CAP_BURST_MODE (1ULL << 4)
/**< Event device is capable of operating in burst mode for enqueue(forward,
Expand Down Expand Up @@ -515,39 +515,13 @@ rte_event_dev_configure(uint8_t dev_id,
/* Event queue specific APIs */

/* Event queue configuration bitmap flags */
#define RTE_EVENT_QUEUE_CFG_TYPE_MASK (3ULL << 0)
/**< Mask for event queue schedule type configuration request */
#define RTE_EVENT_QUEUE_CFG_ALL_TYPES (0ULL << 0)
#define RTE_EVENT_QUEUE_CFG_ALL_TYPES (1ULL << 0)
/**< Allow ATOMIC,ORDERED,PARALLEL schedule type enqueue
*
* @see RTE_SCHED_TYPE_ORDERED, RTE_SCHED_TYPE_ATOMIC, RTE_SCHED_TYPE_PARALLEL
* @see rte_event_enqueue_burst()
*/
#define RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY (1ULL << 0)
/**< Allow only ATOMIC schedule type enqueue
*
* The rte_event_enqueue_burst() result is undefined if the queue configured
* with ATOMIC only and sched_type != RTE_SCHED_TYPE_ATOMIC
*
* @see RTE_SCHED_TYPE_ATOMIC, rte_event_enqueue_burst()
*/
#define RTE_EVENT_QUEUE_CFG_ORDERED_ONLY (2ULL << 0)
/**< Allow only ORDERED schedule type enqueue
*
* The rte_event_enqueue_burst() result is undefined if the queue configured
* with ORDERED only and sched_type != RTE_SCHED_TYPE_ORDERED
*
* @see RTE_SCHED_TYPE_ORDERED, rte_event_enqueue_burst()
*/
#define RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY (3ULL << 0)
/**< Allow only PARALLEL schedule type enqueue
*
* The rte_event_enqueue_burst() result is undefined if the queue configured
* with PARALLEL only and sched_type != RTE_SCHED_TYPE_PARALLEL
*
* @see RTE_SCHED_TYPE_PARALLEL, rte_event_enqueue_burst()
*/
#define RTE_EVENT_QUEUE_CFG_SINGLE_LINK (1ULL << 2)
#define RTE_EVENT_QUEUE_CFG_SINGLE_LINK (1ULL << 1)
/**< This event queue links only to a single event port.
*
* @see rte_event_port_setup(), rte_event_port_link()
Expand All @@ -558,8 +532,8 @@ struct rte_event_queue_conf {
uint32_t nb_atomic_flows;
/**< The maximum number of active flows this queue can track at any
* given time. If the queue is configured for atomic scheduling (by
* applying the RTE_EVENT_QUEUE_CFG_ALL_TYPES or
* RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY flags to event_queue_cfg), then the
* applying the RTE_EVENT_QUEUE_CFG_ALL_TYPES flag to event_queue_cfg
* or RTE_SCHED_TYPE_ATOMIC flag to schedule_type), then the
* value must be in the range of [1, nb_event_queue_flows], which was
* previously provided in rte_event_dev_configure().
*/
Expand All @@ -572,12 +546,18 @@ struct rte_event_queue_conf {
* event will be returned from dequeue until one or more entries are
* freed up/released.
* If the queue is configured for ordered scheduling (by applying the
* RTE_EVENT_QUEUE_CFG_ALL_TYPES or RTE_EVENT_QUEUE_CFG_ORDERED_ONLY
* flags to event_queue_cfg), then the value must be in the range of
* [1, nb_event_queue_flows], which was previously supplied to
* rte_event_dev_configure().
* RTE_EVENT_QUEUE_CFG_ALL_TYPES flag to event_queue_cfg or
* RTE_SCHED_TYPE_ORDERED flag to schedule_type), then the value must
* be in the range of [1, nb_event_queue_flows], which was
* previously supplied to rte_event_dev_configure().
*/
uint32_t event_queue_cfg;
/**< Queue cfg flags(EVENT_QUEUE_CFG_) */
uint8_t schedule_type;
/**< Queue schedule type(RTE_SCHED_TYPE_*).
* Valid when RTE_EVENT_QUEUE_CFG_ALL_TYPES bit is not set in
* event_queue_cfg.
*/
uint32_t event_queue_cfg; /**< Queue cfg flags(EVENT_QUEUE_CFG_) */
uint8_t priority;
/**< Priority for this event queue relative to other event queues.
* The requested priority should in the range of
Expand Down
12 changes: 5 additions & 7 deletions test/test/test_eventdev.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,13 @@ test_eventdev_queue_setup(void)
/* Negative cases */
ret = rte_event_queue_default_conf_get(TEST_DEV_ID, 0, &qconf);
TEST_ASSERT_SUCCESS(ret, "Failed to get queue0 info");
qconf.event_queue_cfg = (RTE_EVENT_QUEUE_CFG_ALL_TYPES &
RTE_EVENT_QUEUE_CFG_TYPE_MASK);
qconf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
qconf.nb_atomic_flows = info.max_event_queue_flows + 1;
ret = rte_event_queue_setup(TEST_DEV_ID, 0, &qconf);
TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);

qconf.nb_atomic_flows = info.max_event_queue_flows;
qconf.event_queue_cfg = (RTE_EVENT_QUEUE_CFG_ORDERED_ONLY &
RTE_EVENT_QUEUE_CFG_TYPE_MASK);
qconf.schedule_type = RTE_SCHED_TYPE_ORDERED;
qconf.nb_atomic_order_sequences = info.max_event_queue_flows + 1;
ret = rte_event_queue_setup(TEST_DEV_ID, 0, &qconf);
TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
Expand Down Expand Up @@ -423,7 +421,7 @@ test_eventdev_queue_attr_nb_atomic_flows(void)
/* Assume PMD doesn't support atomic flows, return early */
return -ENOTSUP;

qconf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
qconf.schedule_type = RTE_SCHED_TYPE_ATOMIC;

for (i = 0; i < (int)queue_count; i++) {
ret = rte_event_queue_setup(TEST_DEV_ID, i, &qconf);
Expand Down Expand Up @@ -466,7 +464,7 @@ test_eventdev_queue_attr_nb_atomic_order_sequences(void)
/* Assume PMD doesn't support reordering */
return -ENOTSUP;

qconf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ORDERED_ONLY;
qconf.schedule_type = RTE_SCHED_TYPE_ORDERED;

for (i = 0; i < (int)queue_count; i++) {
ret = rte_event_queue_setup(TEST_DEV_ID, i, &qconf);
Expand Down Expand Up @@ -507,7 +505,7 @@ test_eventdev_queue_attr_event_queue_cfg(void)
ret = rte_event_queue_default_conf_get(TEST_DEV_ID, 0, &qconf);
TEST_ASSERT_SUCCESS(ret, "Failed to get queue0 def conf");

qconf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_PARALLEL_ONLY;
qconf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK;

for (i = 0; i < (int)queue_count; i++) {
ret = rte_event_queue_setup(TEST_DEV_ID, i, &qconf);
Expand Down
Loading

0 comments on commit 13370a3

Please sign in to comment.