Skip to content

Commit

Permalink
Move more code under mpool feature
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 8, 2024
1 parent ec8eb23 commit d5ced80
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
4 changes: 4 additions & 0 deletions ntex-bytes/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.27] (2024-04-08)

* Move more code under mpool feature

## [0.1.26] (2024-04-04)

* Make memory pools optional
Expand Down
2 changes: 1 addition & 1 deletion ntex-bytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-bytes"
version = "0.1.26"
version = "0.1.27"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes"
Expand Down
4 changes: 4 additions & 0 deletions ntex-bytes/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,7 @@ impl BytesMut {
self.chunk().iter()
}

#[cfg(feature = "mpool")]
pub(crate) fn move_to_pool(&mut self, pool: PoolRef) {
self.inner.move_to_pool(pool);
}
Expand Down Expand Up @@ -2417,6 +2418,7 @@ impl BytesVec {
self.chunk().iter()
}

#[cfg(feature = "mpool")]
pub(crate) fn move_to_pool(&mut self, pool: PoolRef) {
self.inner.move_to_pool(pool);
}
Expand Down Expand Up @@ -2686,6 +2688,7 @@ impl InnerVec {
}
}

#[cfg(feature = "mpool")]
#[inline]
fn move_to_pool(&mut self, pool: PoolRef) {
unsafe {
Expand Down Expand Up @@ -3102,6 +3105,7 @@ impl Inner {
}
}

#[cfg(feature = "mpool")]
#[inline]
fn move_to_pool(&mut self, pool: PoolRef) {
let kind = self.kind();
Expand Down
42 changes: 29 additions & 13 deletions ntex-bytes/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#![allow(clippy::type_complexity)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed};
#[cfg(feature = "mpool")]
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::task::{Context, Poll};
use std::{cell::Cell, cell::RefCell, fmt, future::Future, pin::Pin, ptr, rc::Rc};

#[cfg(feature = "mpool")]
use futures_core::task::__internal::AtomicWaker;

use crate::{BufMut, BytesMut, BytesVec};
Expand Down Expand Up @@ -35,7 +38,9 @@ bitflags::bitflags! {

struct MemoryPool {
id: PoolId,
#[cfg(feature = "mpool")]
waker: AtomicWaker,
#[cfg(feature = "mpool")]
waker_alive: AtomicBool,
#[cfg(feature = "mpool")]
waiters: RefCell<mpool::Waiters>,
Expand Down Expand Up @@ -188,13 +193,15 @@ impl PoolRef {
}

#[inline]
pub fn move_in(self, buf: &mut BytesMut) {
buf.move_to_pool(self);
pub fn move_in(self, _buf: &mut BytesMut) {
#[cfg(feature = "mpool")]
_buf.move_to_pool(self);
}

#[inline]
pub fn move_vec_in(self, buf: &mut BytesVec) {
buf.move_to_pool(self);
pub fn move_vec_in(self, _buf: &mut BytesVec) {
#[cfg(feature = "mpool")]
_buf.move_to_pool(self);
}

#[inline]
Expand Down Expand Up @@ -361,21 +368,28 @@ impl PoolRef {
}

#[inline]
pub(crate) fn acquire(self, size: usize) {
let prev = self.0.size.fetch_add(size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev + size)
pub(crate) fn acquire(self, _size: usize) {
#[cfg(feature = "mpool")]
{
let prev = self.0.size.fetch_add(_size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev + _size)

Check warning on line 376 in ntex-bytes/src/pool.rs

View check run for this annotation

Codecov / codecov/patch

ntex-bytes/src/pool.rs#L376

Added line #L376 was not covered by tests
}
}
}

#[inline]
pub(crate) fn release(self, size: usize) {
let prev = self.0.size.fetch_sub(size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev - size)
pub(crate) fn release(self, _size: usize) {
#[cfg(feature = "mpool")]
{
let prev = self.0.size.fetch_sub(_size, Relaxed);
if self.0.waker_alive.load(Relaxed) {
self.wake_driver(prev - _size)
}
}
}

#[cfg(feature = "mpool")]
fn wake_driver(self, allocated: usize) {
let l = self.0.window_l.get();
let h = self.0.window_h.get();
Expand Down Expand Up @@ -428,7 +442,9 @@ impl MemoryPool {
fn create(id: PoolId) -> &'static MemoryPool {
Box::leak(Box::new(MemoryPool {
id,
#[cfg(feature = "mpool")]
waker: AtomicWaker::new(),
#[cfg(feature = "mpool")]
waker_alive: AtomicBool::new(false),
#[cfg(feature = "mpool")]
waiters: RefCell::new(mpool::Waiters::new()),
Expand Down

0 comments on commit d5ced80

Please sign in to comment.