Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign upAdd in-process mpsc-channels based implementation #7
Conversation
| if h.id() == id { | ||
| r_index = index; | ||
| r_id = self.receiver_ids[index] as i64; | ||
| //println!("r_id {}", r_index); |
This comment has been minimized.
This comment has been minimized.
| { | ||
| match self.sender.borrow().send(MpscChannelMessage(data.to_vec(), ports, shared_memory_regions)) { | ||
| Err(_) => { Err(MpscError::ChannelClosedError) }, | ||
| Ok(_) => { Ok(()) }, |
This comment has been minimized.
This comment has been minimized.
| } | ||
|
|
||
| pub fn add(&mut self, receiver: MpscReceiver) -> Result<i64,MpscError> { | ||
| self.last_index = self.last_index + 1; |
This comment has been minimized.
This comment has been minimized.
|
|
||
| let receivers = &mut self.receivers; | ||
| match receivers[r_index].recv() { | ||
| Ok((data, channels, shmems)) => return Ok(vec![MpscSelectionResult::DataReceived(r_id, data, channels, shmems)]), |
This comment has been minimized.
This comment has been minimized.
pcwalton
Sep 29, 2015
Collaborator
nit: I don't think you need the return here or on any of the other branches
| Ok((data, channels, shmems)) => return Ok(vec![MpscSelectionResult::DataReceived(r_id, data, channels, shmems)]), | ||
| Err(err) => { | ||
| match err { | ||
| MpscError::ChannelClosedError => { |
This comment has been minimized.
This comment has been minimized.
pcwalton
Sep 29, 2015
Collaborator
Could just be folded into the match above: Err(MpscError::ChannelClosedError) => { ... } err => err
| } | ||
| } | ||
|
|
||
| fn signal_acquisition(&self) -> () { |
This comment has been minimized.
This comment has been minimized.
|
|
||
| fn signal_acquisition(&self) -> () { | ||
| let mut notified = self.lock.lock().unwrap(); | ||
| *notified = true; |
This comment has been minimized.
This comment has been minimized.
pcwalton
Sep 29, 2015
Collaborator
Is there a reason why this lock/condvar code can't be just an MPSC Mutex<Receiver<()>>/Sender<()> pair? That is, clients just lock the receiver and wait for a ping from the sender. This code isn't performance critical and I find channels easier to think about than condvars. (Maybe it's just from writing a lot of Servo code though.)
This comment has been minimized.
This comment has been minimized.
vvuk
Sep 30, 2015
Author
Contributor
In theory I'd need the sender to also block until the receiver actually finishes receiving. But maybe that's not actually necessary in this case, since the actual connection setup can't really fail...
| let r = self.receiver.borrow(); | ||
| match r.as_ref().unwrap().recv() { | ||
| Ok(MpscChannelMessage(d,c,s)) => Ok((d, | ||
| c.into_iter().map(|x| OpaqueMpscChannel::new(x)).collect(), |
This comment has been minimized.
This comment has been minimized.
| for r in &self.receivers { | ||
| let inner_r = mem::replace(&mut *r.receiver.borrow_mut(), None); | ||
| receivers.push(inner_r); | ||
| } |
This comment has been minimized.
This comment has been minimized.
pcwalton
Sep 29, 2015
Collaborator
Instead of swapping out each receiver individually, is it possible to do something like let receivers = mem::replace(&mut self.receivers, vec![]).into_iter().map(Some).collect()?
This comment has been minimized.
This comment has been minimized.
vvuk
Sep 30, 2015
Author
Contributor
I don't think so, at least not something that simple... the loop I have is reaching replacing the MpscReceiver's RefCell<Option<>> member value's with None, and pulling out the inner mpsc::Receiver. It's not just replacing the self.receivers array values themselves (they don't change).
I could probably write it as a map that does a mem::replace, but I don't know that it would be any clearer.
|
Okay, converted to using a mpsc::channel() instead of a mutex/condvar (and renamed ONE_SHOT_SENDERS to ONE_SHOT_SERVERS, derp). Though now I'm actually confused as to how this is working at all -- OneShotServer::accept() does a get on the ONE_SHOT_SERVERS hashmap, and then calls accept() which calls recv(), which should block. MpscSender::connect() does a remove on the ONE_SHOT_SERVERS hashmap and calls send() to poke the server. But the server part should be blocked in recv() inside accept()... so who owns the ServerRecord? |
|
I'm confused—are you asking who owns the memory for the |
|
Yeah, I guess I'm getting a copy during the get(). This should be good to go. |
Add in-process mpsc-channels based implementation
vvuk commentedSep 22, 2015
This PR adds an in-process-only implementation on top of mpsc channels. It uses this as the default on windows until we finish a real windows implementation.