Skip to content

Commit 7448a43

Browse files
committed
Gracefully shutdown and cleanup after handling 3 requests.
1 parent efc319f commit 7448a43

File tree

2 files changed

+42
-12
lines changed

2 files changed

+42
-12
lines changed

c20_web_server/src/lib.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::{sync::{mpsc, Arc, Mutex}, thread};
33

44
pub struct ThreadPool{
55
workers: Vec<Worker>,
6-
sender: mpsc::Sender<Job>,
6+
// Option<> to wrap sender so that ownership can be taken out sometime later.
7+
sender: Option<mpsc::Sender<Job>>,
78
}
89

910
impl ThreadPool{
@@ -17,34 +18,57 @@ impl ThreadPool{
1718
workers.push(Worker::new(id, Arc::clone(&receiver)));
1819

1920
}
20-
ThreadPool {workers, sender}
21+
ThreadPool {workers, sender:Some(sender)}
2122
}
2223

2324
pub fn execute<F>(&self, f:F)
2425
where
2526
F: FnOnce() + Send + 'static,
2627
{
2728
let job = Box::new(f);
28-
self.sender.send(job).unwrap();
2929

30+
self.sender.as_ref().expect("Fail to unwrap self.sender").send(job).unwrap();
3031
}
3132
}
3233

34+
impl Drop for ThreadPool {
35+
fn drop(&mut self) {
36+
//drop() can handle Option<T>?
37+
drop(self.sender.take());
38+
for worker in &mut self.workers {
39+
eprintln!("Shutting down work {}", worker.id);
40+
if let Some(thread) = worker.thread.take(){
41+
thread.join().unwrap();
42+
}
43+
}
44+
}
45+
}
46+
3347
type Job = Box<dyn FnOnce() + Send + 'static>;
3448

3549
struct Worker {
3650
id: usize,
37-
thread: thread::JoinHandle<()>,
51+
// Option<> to wrap thread so that ownership can be taken out sometime later.
52+
thread: Option<thread::JoinHandle<()>>,
3853
}
3954

4055
impl Worker {
4156
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>> ) -> Worker {
4257
eprintln!("Worker {id} constructed and spawned a thread that loops forever over a receiver.");
4358
let thread = thread::spawn(move || loop {
44-
let job = receiver.lock().unwrap().recv().unwrap();
45-
eprintln!("Worker {id} got a job and is to execute it.");
46-
job();
59+
let mesg = receiver.lock().unwrap().recv();
60+
match mesg {
61+
Ok(job) => {
62+
eprintln!("Worker {id} got a job and is to execute it.");
63+
job();
64+
}
65+
Err(err) => {
66+
eprintln!("Worker {id} received an error {:?} and is shutting down.", err);
67+
break;
68+
}
69+
}
4770
});
48-
Worker {id, thread}
71+
// specify the field name for thread as it is Some(thread), not simply thread.
72+
Worker {id, thread:Some(thread)}
4973
}
5074
}

c20_web_server/src/main.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,25 @@ fn handle_connection(mut stream: TcpStream) {
2424
};
2525
let contents = fs::read_to_string(filename).unwrap();
2626
let length = contents.len();
27-
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
27+
let response = format!(
28+
"{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
2829
stream.write_all(response.as_bytes()).unwrap();
30+
// flush the buffer
31+
stream.flush().unwrap();
2932
eprintln!(" Done.");
3033
}
3134

3235
fn main() {
3336
// [0; 3] is [0,0,0];
3437
//println!("{:#?}", &[0; 3]);
35-
println!("Listening at TCP 127.0.0.1:7878.");
38+
let num_threads = 4;
39+
let num_requests = 3;
40+
println!("Listening at TCP 127.0.0.1:7878 with {num_threads} threads.");
3641
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
37-
let pool = ThreadPool::new(4);
42+
let pool = ThreadPool::new(num_threads);
43+
println!("Will gracefully exit after handling {num_requests} requests.");
3844

39-
for stream in listener.incoming() {
45+
for stream in listener.incoming().take(num_requests) {
4046
let stream = stream.unwrap();
4147

4248
pool.execute(|| {

0 commit comments

Comments
 (0)