Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mpz-common): multi-threaded executor #136

Merged
merged 9 commits into from
May 29, 2024

Conversation

sinui0
Copy link
Collaborator

@sinui0 sinui0 commented May 15, 2024

This PR implements a multi-threaded executor which utilizes a multiplexed connection to support forking.

It is currently limited to executing 2 futures concurrently, but we can expand this as needed.

EDIT: I've gone all the way and added full forking support which can be accessed via the queue method on Context. This can be used to distribute an arbitrary number of tasks across a configurable number of logical threads.

@sinui0 sinui0 marked this pull request as ready for review May 19, 2024 22:16
@sinui0 sinui0 requested review from th4s and themighty1 May 19, 2024 22:16
Copy link
Member

@th4s th4s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice 👍

crates/mpz-common/src/executor/mt.rs Outdated Show resolved Hide resolved
@sinui0 sinui0 requested a review from th4s May 23, 2024 01:27
Copy link
Member

@th4s th4s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍, some comments

crates/mpz-common/src/queue.rs Outdated Show resolved Hide resolved
let task_count = self.queue.len();

let mut lanes: Vec<_> = (0..lane_count)
.map(|_| Vec::with_capacity((task_count / lane_count) + (task_count % lane_count)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map(|_| Vec::with_capacity((task_count / lane_count) + (task_count % lane_count)))
.map(|_| Vec::with_capacity((task_count / lane_count) + 1))

One more should be enough, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be + (task_count % lane_count != 0)

crates/mpz-common/src/queue/round_robin.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@themighty1 themighty1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gw


assert_eq!(results_b, vec![0, 1]);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test to make sure things run concurrently

    use std::time::{Duration, Instant};
    // Need to enable tokio time feature

    // Tests that the tasks run concurrently.
    #[tokio::test]
    async fn test_mt_executor_concurrent() {
        let ((mux_a, fut_a), (mux_b, fut_b)) = test_yamux_pair_framed(1024, Bincode);

        tokio::spawn(async move {
            futures::try_join!(fut_a.into_future(), fut_b.into_future()).unwrap();
        });

        let mut exec_a = MTExecutor::new(mux_a, 8);
        let mut exec_b = MTExecutor::new(mux_b, 8);

        let (mut ctx_a, mut ctx_b) =
            futures::try_join!(exec_a.new_thread(), exec_b.new_thread()).unwrap();

        let mut queue_a = ctx_a.queue().await.unwrap();
        let mut queue_b = ctx_b.queue().await.unwrap();

        let start = Instant::now();
        let timeout = Duration::from_millis(100);

        for i in 0..8 {
            queue_a.push(move |ctx| {
                let i = i;
                let timeout = timeout;
                Box::pin(async move {
                    tokio::time::sleep(timeout).await;
                    ctx.io_mut().send(i as u8).await.unwrap();
                })
            });
            queue_b.push(|ctx| Box::pin(async { ctx.io_mut().expect_next::<u8>().await.unwrap() }));
        }

        let (_, results_b) = futures::try_join!(queue_a.wait(), queue_b.wait()).unwrap();

        let elapsed = Instant::now().duration_since(start);

        // The overall latency should be approximately that of a single task.
        assert!(elapsed < timeout + Duration::from_millis(50));

        assert_eq!(results_b, vec![0, 1, 2, 3, 4, 5, 6, 7]);
    }

let task_count = self.queue.len();

let mut lanes: Vec<_> = (0..lane_count)
.map(|_| Vec::with_capacity((task_count / lane_count) + (task_count % lane_count)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be + (task_count % lane_count != 0)

@sinui0
Copy link
Collaborator Author

sinui0 commented May 28, 2024

I'm going to just remove this queue stuff and stick with the other changes. Will proceed with merge as scope will be a subset. Will open another PR if/when we need this functionality

@sinui0
Copy link
Collaborator Author

sinui0 commented May 28, 2024

Sorry for changing scope after review. I also identified a bug which I've fixed.

@sinui0 sinui0 requested review from th4s and themighty1 May 28, 2024 18:03
crates/mpz-common/src/executor/mt.rs Show resolved Hide resolved
@sinui0 sinui0 merged commit 5cb1aec into threading-refactor May 29, 2024
@sinui0 sinui0 deleted the feat/mt-executor branch May 29, 2024 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants