Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the join operation #63

Merged
merged 9 commits into from Jul 12, 2017
Merged

Conversation

dns2utf8
Copy link
Member

@dns2utf8 dns2utf8 commented Jul 7, 2017

it restricts the operation to observer only.

Because the std::sync::mpsc::channel does not offer a way to query the queue length I added another counter field: stored_jobs_counter

@frewsxcv
Copy link
Collaborator

frewsxcv commented Jul 7, 2017

What about if we use Condvar for this?

https://doc.rust-lang.org/std/sync/struct.Condvar.html

Seems like we could store a Condvar in the ThreadPool, and if the user calls join, we call wait on the Condvar, and every time the ThreadPool finishes, we call notify_all.

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 8, 2017

Good point, we trade polling for a little memory. I am working on it.

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 8, 2017

The implementation could be optimized further by moving more/all of the data to ThreadPoolSharedData.

While thinking about the problem I was thinking what if the pool were to be used to sync other threads like in the example below?
It currently does not work and it would require the Sync flag:

unsafe impl Sync for ThreadPool { }
        let pool = Arc::new(
                    ThreadPool::new_with_name("multi join test", 8));
        let test_count = Arc::new(AtomicUsize::new(0));

        let pool_r0 = pool.clone();
        let pool_r1 = pool.clone();
        let test_count_r0 = test_count.clone();
        let test_count_r1 = test_count.clone();

        let t0 = thread::spawn(move||{
            for _ in 0..21 {
                let test_count = test_count_r0.clone();
                pool_r0.execute(move || {
                    sleep(Duration::from_secs(2));
                    test_count.fetch_add(1, Ordering::Release);
                });
            }
            pool_r0.join();
            assert_eq!(42, test_count_r0.load(Ordering::Acquire));
        });

        let t1 = thread::spawn(move||{
            for _ in 0..21 {
                let test_count = test_count_r1.clone();
                pool_r1.execute(move || {
                    sleep(Duration::from_secs(2));
                    test_count.fetch_add(1, Ordering::Release);
                });
            }
            pool_r1.join();
            assert_eq!(42, test_count_r1.load(Ordering::Acquire));
        });

        
        pool.join();
        assert_eq!(42, test_count.load(Ordering::Acquire));

        t0.join().unwrap();
        t1.join().unwrap();

@frewsxcv
Copy link
Collaborator

frewsxcv commented Jul 8, 2017

The implementation could be optimized further by moving more/all of the data to ThreadPoolSharedData.

Is it possible you could pull this change out into a separate pull request? We can merge that first without this join method. Then we can rebase this join pull request off those changes once those merge.

Open interface new_with_name to accept anything convertable into String

Regarding this commit, see my response in #58

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 8, 2017

Well it is going to take some time, since I decided to create the shared data struct when I realized the many times I would have to pass around 6 counters for every operation and handle their inconsistent names after I implemented the join. Let me see what I can do.

dns2utf8 added a commit to dns2utf8/rust-threadpool that referenced this pull request Jul 8, 2017
dns2utf8 added a commit to dns2utf8/rust-threadpool that referenced this pull request Jul 8, 2017
dns2utf8 added a commit to dns2utf8/rust-threadpool that referenced this pull request Jul 8, 2017
@dns2utf8 dns2utf8 changed the title This is a naive implementation of the join operation Implementation of the join operation Jul 8, 2017
@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 8, 2017

I created two new branches for the PRs with one commit each. It was simpler than doing some git magic 😉

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 9, 2017

Yes, let me replace them quickly. As mentioned by @bblancha this should solve #66 as well.

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 9, 2017

I added support for a pool where all threads panic. This would have lead to a stall until at least one job would have succeeded.

Some of the sleeps you mentioned are measuring the active_count. Joining the pool would cause the number to be zero so I left them as they were.

Do you see any unhandled cases that we should test for?

Copy link
Collaborator

@frewsxcv frewsxcv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this! added a few comments/questions

lib.rs Outdated
@@ -207,6 +226,9 @@ impl ThreadPool {
let rx = Arc::new(Mutex::new(rx));

let shared_data = Arc::new(ThreadPoolSharedData {
empty_condvar: Condvar::new(),
empty_trigger: Mutex::new(false),
stored_jobs_counter: AtomicUsize::new(0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this queued_count? i think that better parallels the name active_count

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@@ -250,6 +273,8 @@ impl ThreadPool {
/// sleep(Duration::from_secs(5));
/// });
/// }
///
/// // wait for the pool to start working
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

lib.rs Outdated
@@ -310,6 +333,43 @@ impl ThreadPool {
}
}
}

/// Block the current thread until all jobs in the pool are completed.
/// Once waiting for the pool to complete you can no longer add new jobs, &mut self ensures that.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be &mut self? If the user wants to add more jobs to the threadpool, shouldn't they be allowed to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I did it in the first place was because without the mut many threads can join at the same time.

Since the join is blocking, the current thread can not send new jobs anyways.
Accessing the pool from many threads requires the Sync flag. But I have to reason about that some more because there is an edge case, see this test.

lib.rs Outdated

/// Notify all observers joining this pool if there is no more work to do.
fn no_work_notify_all(&self) {
if self.has_work() == false {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !self.has_work()

lib.rs Outdated
/// Once waiting for the pool to complete you can no longer add new jobs, &mut self ensures that.
///
/// ```
/// # use threadpool::ThreadPool;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should uncomment this so we have a complete example a user can just copy/paste

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

lib.rs Outdated
@@ -128,6 +128,7 @@ impl<'a> Drop for Sentinel<'a> {
self.shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
if panicking() {
self.shared_data.panic_count.fetch_add(1, Ordering::SeqCst);
self.shared_data.no_work_notify_all();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this line was moved outside this if panicking() conditional, could we get rid of this line? Would that be the same thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because a worker thread keeps on working on jobs for as long as possible and reducing the expensive create/cleanup cycle of threads.

@dns2utf8
Copy link
Member Author

dns2utf8 commented Jul 9, 2017

I added test for the new cases. The PR is currently mergable up to 254fa9d.

@dns2utf8
Copy link
Member Author

I found a very nasty deadlock with the fn test_multi_join(). The reason was some threads could end up in an infinite loop because they did not exit the false-positive loop fast enough. Since testing for the Condvar requires the thread to acquire the Mutex shared_data.empty_trigger they would block until the thread was joinable.

While debugging I added the .field("queued_count", &self.queued_count()) to the debug output and exposed the queued_count like the other counters with a function.

What do you think?

@dns2utf8 dns2utf8 mentioned this pull request Jul 11, 2017
@frewsxcv
Copy link
Collaborator

LGTM, thanks for doing this!

@frewsxcv frewsxcv merged commit 0824dce into rust-threadpool:master Jul 12, 2017
@dns2utf8 dns2utf8 deleted the join branch July 12, 2017 06:20
@dns2utf8
Copy link
Member Author

It is a pleasure solving the hard problems 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants