Skip to content

Commit

Permalink
Check multi-packet receive queue caps
Browse files Browse the repository at this point in the history
- Bail out earlier if the device caps indicate no support
- Clamp the stride and WQE sizes to the supported size range
- Make mcdump actually use 2MB chunks again, to match the comment (MPRQ
  had accidentally made it use 4MB chunks).
  • Loading branch information
bmerry committed Jan 14, 2019
1 parent 030dcbc commit 2e2a44c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 9 deletions.
3 changes: 3 additions & 0 deletions include/spead2/common_ibv.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class rdma_cm_id_t : public std::unique_ptr<rdma_cm_id, detail::rdma_cm_id_delet

void bind_addr(const boost::asio::ip::address &addr);
ibv_device_attr query_device() const;
#if SPEAD2_USE_IBV_EXP
ibv_exp_device_attr exp_query_device() const;
#endif
};

/* This class is not intended to be used for anything. However, the mlx5 driver
Expand Down
17 changes: 16 additions & 1 deletion src/common_ibv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,27 @@ ibv_device_attr rdma_cm_id_t::query_device() const
{
assert(get());
ibv_device_attr attr;
std::memset(&attr, 0, sizeof(attr));
int status = ibv_query_device(get()->verbs, &attr);
if (status != 0)
throw_errno("ibv_query_device failed", status);
return attr;
}

#if SPEAD2_USE_IBV_EXP
ibv_exp_device_attr rdma_cm_id_t::exp_query_device() const
{
assert(get());
ibv_exp_device_attr attr;
std::memset(&attr, 0, sizeof(attr));
attr.comp_mask = IBV_EXP_DEVICE_ATTR_RESERVED - 1;
int status = ibv_exp_query_device(get()->verbs, &attr);
if (status != 0)
throw_errno("ibv_exp_query_device failed", status);
return attr;
}
#endif

ibv_context_t::ibv_context_t(struct ibv_device *device)
{
ibv_context *ctx = ibv_open_device(device);
Expand Down Expand Up @@ -588,7 +603,7 @@ ibv_exp_wq_t::ibv_exp_wq_t(const rdma_cm_id_t &cm_id, ibv_exp_wq_init_attr *attr
void ibv_exp_wq_t::modify(ibv_exp_wq_state state)
{
ibv_exp_wq_attr wq_attr;
memset(&wq_attr, 0, sizeof(wq_attr));
std::memset(&wq_attr, 0, sizeof(wq_attr));
wq_attr.wq_state = IBV_EXP_WQS_RDY;
wq_attr.attr_mask = IBV_EXP_WQ_ATTR_STATE;
int status = ibv_exp_modify_wq(get(), &wq_attr);
Expand Down
27 changes: 23 additions & 4 deletions src/mcdump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,18 +1063,37 @@ capture_mprq::capture_mprq(const options &opts)
wq.modify(IBV_EXP_WQS_RDY);
}

static int clamp(int x, int low, int high)
{
return std::min(std::max(x, low), high);
}

chunking_scheme capture_mprq::sizes(const options &opts, const spead2::rdma_cm_id_t &cm_id)
{
// TODO: adapt these to the hardware and the requested buffer size
std::size_t log_stride_bytes = 6; // 64 bytes
std::size_t log_strides_per_chunk = 16; // 2MB chunks
ibv_exp_device_attr attr = cm_id.exp_query_device();
if (!(attr.comp_mask & IBV_EXP_DEVICE_ATTR_MP_RQ)
|| !(attr.mp_rq_caps.supported_qps & IBV_EXP_MP_RQ_SUP_TYPE_WQ_RQ))
throw std::system_error(std::make_error_code(std::errc::not_supported),
"device does not support multi-packet receive queues");

/* TODO: adapt these to the requested buffer size e.g. if a very large
* buffer is requested, might need to increase the stride size to avoid
* running out of CQEs.
*/
std::size_t log_stride_bytes =
clamp(6,
attr.mp_rq_caps.min_single_stride_log_num_of_bytes,
attr.mp_rq_caps.max_single_stride_log_num_of_bytes); // 64 bytes
std::size_t log_strides_per_chunk =
clamp(21 - log_stride_bytes,
attr.mp_rq_caps.min_single_wqe_log_num_of_strides,
attr.mp_rq_caps.max_single_wqe_log_num_of_strides); // 2MB chunks
std::size_t max_records = 1 << log_strides_per_chunk;
std::size_t chunk_size = max_records << log_stride_bytes;
std::size_t n_chunks = opts.net_buffer / chunk_size;
if (n_chunks == 0)
n_chunks = 1;

ibv_device_attr attr = cm_id.query_device();
unsigned int device_chunks = std::min(attr.max_qp_wr, attr.max_mr);

bool reduced = false;
Expand Down
23 changes: 19 additions & 4 deletions src/recv_udp_ibv_mprq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ udp_ibv_mprq_reader::poll_result udp_ibv_mprq_reader::poll_once(stream_base::add
return poll_result::partial;
}

static int clamp(int x, int low, int high)
{
return std::min(std::max(x, low), high);
}

udp_ibv_mprq_reader::udp_ibv_mprq_reader(
stream &owner,
const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
Expand All @@ -146,20 +151,30 @@ udp_ibv_mprq_reader::udp_ibv_mprq_reader(
: udp_ibv_reader_base<udp_ibv_mprq_reader>(
owner, endpoints, interface_address, max_size, comp_vector, max_poll)
{
ibv_exp_device_attr device_attr = cm_id.exp_query_device();
if (!(device_attr.comp_mask & IBV_EXP_DEVICE_ATTR_MP_RQ)
|| !(device_attr.mp_rq_caps.supported_qps & IBV_EXP_MP_RQ_SUP_TYPE_WQ_RQ))
throw std::system_error(std::make_error_code(std::errc::not_supported),
"device does not support multi-packet receive queues");

ibv_exp_res_domain_init_attr res_domain_attr;
memset(&res_domain_attr, 0, sizeof(res_domain_attr));
res_domain_attr.comp_mask = IBV_EXP_RES_DOMAIN_THREAD_MODEL | IBV_EXP_RES_DOMAIN_MSG_MODEL;
res_domain_attr.thread_model = IBV_EXP_THREAD_UNSAFE;
res_domain_attr.msg_model = IBV_EXP_MSG_HIGH_BW;
res_domain = ibv_exp_res_domain_t(cm_id, &res_domain_attr);

ibv_device_attr device_attr = cm_id.query_device();

// TODO: adjust stride parameters based on device info
ibv_exp_wq_init_attr wq_attr;
memset(&wq_attr, 0, sizeof(wq_attr));
wq_attr.mp_rq.single_stride_log_num_of_bytes = 6; // 64 bytes per stride
wq_attr.mp_rq.single_wqe_log_num_of_strides = 14; // 1MB per WQE
wq_attr.mp_rq.single_stride_log_num_of_bytes =
clamp(6,
device_attr.mp_rq_caps.min_single_stride_log_num_of_bytes,
device_attr.mp_rq_caps.max_single_stride_log_num_of_bytes); // 64 bytes per stride
wq_attr.mp_rq.single_wqe_log_num_of_strides =
clamp(20 - wq_attr.mp_rq.single_stride_log_num_of_bytes,
device_attr.mp_rq_caps.min_single_wqe_log_num_of_strides,
device_attr.mp_rq_caps.max_single_wqe_log_num_of_strides); // 1MB per WQE
int log_wqe_size = wq_attr.mp_rq.single_stride_log_num_of_bytes + wq_attr.mp_rq.single_wqe_log_num_of_strides;
wqe_size = std::size_t(1) << log_wqe_size;
if (buffer_size < 2 * wqe_size)
Expand Down

0 comments on commit 2e2a44c

Please sign in to comment.