Skip to content

Commit

Permalink
Auto merge of #21325 - gterzian:crossbeam_integration, r=jdm,SimonSapin
Browse files Browse the repository at this point in the history
Replace mpsc with crossbeam-channel

Follow up on #19515

---

Selecting over multiple channels in `std::sync::mpsc` is not stable and likely never will be:

rust-lang/rust#27800 (comment)
> It seems the only thing keeping `mpsc_select` around is Servo.

crossbeam-channel is designed specifically to replace `std::sync::mpsc` and fix many of its shortcomings:
https://github.com/stjepang/rfcs-crossbeam/blob/channel/text/2017-11-09-channel.md

This is to be landed together with servo/ipc-channel#183.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/servo/21325)
<!-- Reviewable:end -->
  • Loading branch information
bors-servo committed Aug 30, 2018
2 parents 6f81ed4 + 4ebaa13 commit 17a9414
Show file tree
Hide file tree
Showing 93 changed files with 635 additions and 374 deletions.
173 changes: 133 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion components/bluetooth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bitflags = "1.0"
bluetooth_traits = {path = "../bluetooth_traits"}
device = {git = "https://github.com/servo/devices", features = ["bluetooth-test"]}
embedder_traits = {path = "../embedder_traits"}
ipc-channel = "0.10"
ipc-channel = "0.11"
log = "0.4"
servo_config = {path = "../config"}
servo_rand = {path = "../rand"}
Expand Down
2 changes: 1 addition & 1 deletion components/bluetooth_traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name = "bluetooth_traits"
path = "lib.rs"

[dependencies]
ipc-channel = "0.10"
ipc-channel = "0.11"
regex = "1.0"
serde = "1.0"
embedder_traits = { path = "../embedder_traits" }
Expand Down
2 changes: 1 addition & 1 deletion components/canvas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cssparser = "0.24"
euclid = "0.19"
fnv = "1.0"
gleam = "0.6"
ipc-channel = "0.10"
ipc-channel = "0.11"
log = "0.4"
num-traits = "0.2"
offscreen_gl_context = {version = "0.21", features = ["serde", "osmesa"]}
Expand Down
2 changes: 1 addition & 1 deletion components/canvas_traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path = "lib.rs"
[dependencies]
cssparser = "0.24.0"
euclid = "0.19"
ipc-channel = "0.10"
ipc-channel = "0.11"
gleam = "0.6"
lazy_static = "1"
malloc_size_of = { path = "../malloc_size_of" }
Expand Down
21 changes: 21 additions & 0 deletions components/channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "servo_channel"
version = "0.0.1"
authors = ["The Servo Project Developers"]
license = "MPL-2.0"
publish = false

[lib]
name = "servo_channel"
path = "lib.rs"
test = false
doctest = false

[dependencies]
crossbeam-channel = "0.2"
ipc-channel = "0.11"
serde = "1.0"

[[test]]
name = "main"
path = "tests/disconnect.rs"
163 changes: 163 additions & 0 deletions components/channel/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

extern crate crossbeam_channel;
extern crate ipc_channel;
extern crate serde;

pub mod base_channel {
pub use crossbeam_channel::*;
}
// Needed to re-export the select macro.
pub use crossbeam_channel::*;

use ipc_channel::ipc::IpcReceiver;
use ipc_channel::router::ROUTER;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};


pub fn route_ipc_receiver_to_new_servo_receiver<T>(ipc_receiver: IpcReceiver<T>) -> Receiver<T>
where
T: for<'de> Deserialize<'de> + Serialize + Send + 'static
{
let (servo_sender, servo_receiver) = channel();
ROUTER.add_route(
ipc_receiver.to_opaque(),
Box::new(move |message| {
drop(servo_sender.send(message.to::<T>().unwrap()))
}),
);
servo_receiver
}

pub fn route_ipc_receiver_to_new_servo_sender<T>(ipc_receiver: IpcReceiver<T>, servo_sender: Sender<T>)
where
T: for<'de> Deserialize<'de> + Serialize + Send + 'static
{
ROUTER.add_route(
ipc_receiver.to_opaque(),
Box::new(move |message| {
drop(servo_sender.send(message.to::<T>().unwrap()))
}),
)
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (base_sender, base_receiver) = crossbeam_channel::unbounded::<T>();
let is_disconnected = Arc::new(AtomicBool::new(false));
(Sender::new(base_sender, is_disconnected.clone()),
Receiver::new(base_receiver, is_disconnected))
}

#[derive(Debug, PartialEq)]
pub enum ChannelError {
ChannelClosedError
}

pub struct Receiver<T> {
receiver: crossbeam_channel::Receiver<T>,
is_disconnected: Arc<AtomicBool>,
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.is_disconnected.store(true, Ordering::SeqCst);
}
}

impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Receiver {
receiver: self.receiver.clone(),
is_disconnected: self.is_disconnected.clone(),
}
}
}

impl<T> Receiver<T> {
pub fn new(receiver: crossbeam_channel::Receiver<T>, is_disconnected: Arc<AtomicBool>) -> Receiver<T> {
Receiver {
receiver,
is_disconnected,
}
}

pub fn recv(&self) -> Option<T> {
self.receiver.recv()
}

pub fn try_recv(&self) -> Option<T> {
self.receiver.try_recv()
}

pub fn len(&self) -> usize {
self.receiver.len()
}

pub fn select(&self) -> &crossbeam_channel::Receiver<T> {
&self.receiver
}
}

impl<T> Iterator for Receiver<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv()
}
}

impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = crossbeam_channel::Receiver<T>;

fn into_iter(self) -> Self::IntoIter {
self.receiver.clone()
}
}

pub struct Sender<T> {
sender: crossbeam_channel::Sender<T>,
is_disconnected: Arc<AtomicBool>,
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
sender: self.sender.clone(),
is_disconnected: self.is_disconnected.clone(),
}
}
}

impl<T> Sender<T> {
pub fn new(sender: crossbeam_channel::Sender<T>, is_disconnected: Arc<AtomicBool>) -> Sender<T> {
Sender {
sender,
is_disconnected,
}
}

pub fn send(&self, msg: T) -> Result<(), ChannelError> {
if self.is_disconnected.load(Ordering::SeqCst) {
Err(ChannelError::ChannelClosedError)
} else {
Ok(self.sender.send(msg))
}
}

pub fn len(&self) -> usize {
self.sender.len()
}

pub fn select(&self) -> Option<&crossbeam_channel::Sender<T>> {
if self.is_disconnected.load(Ordering::SeqCst) {
None
} else {
Some(&self.sender)
}
}
}
31 changes: 31 additions & 0 deletions components/channel/tests/disconnect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#[macro_use]
extern crate servo_channel;

use servo_channel::{channel, ChannelError};

#[test]
fn send_after_receiver_dropped() {
let (sender, receiver) = channel();
drop(receiver);
assert_eq!(sender.send(1), Err(ChannelError::ChannelClosedError));
let sent = select! {
send(sender.select(), 1) => true,
default => false
};
assert_eq!(sent, false);
}

#[test]
fn send_with_receiver_connected() {
let (sender, _receiver) = channel();
assert_eq!(sender.send(1), Ok(()));
let sent = select! {
send(sender.select(), 1) => true,
default => false
};
assert_eq!(sent, true);
}
3 changes: 2 additions & 1 deletion components/compositing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ euclid = "0.19"
gfx_traits = {path = "../gfx_traits"}
gleam = {version = "0.6", optional = true}
image = "0.19"
ipc-channel = "0.10"
ipc-channel = "0.11"
libc = "0.2"
log = "0.4"
msg = {path = "../msg"}
net_traits = {path = "../net_traits"}
profile_traits = {path = "../profile_traits"}
script_traits = {path = "../script_traits"}
servo_channel = {path = "../channel"}
servo_config = {path = "../config"}
servo_geometry = {path = "../geometry"}
servo_url = {path = "../url"}
Expand Down
Loading

0 comments on commit 17a9414

Please sign in to comment.