Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MessagePort and MessageChannel #16622

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Use ReentrantMutex instead for MessagePortInternal

  • Loading branch information
KiChjang committed Jun 9, 2019
commit 25fa53a7545cf54d17dc6a42f2596756926edcd6

Some generated files are not rendered by default. Learn more.

@@ -99,6 +99,7 @@ servo_config = {path = "../config"}
servo_geometry = {path = "../geometry" }
servo-media = {git = "https://github.com/servo/media"}
servo_rand = {path = "../rand"}
servo_remutex = {path = "../remutex"}
servo_url = {path = "../url"}
smallvec = { version = "0.6", features = ["std", "union"] }
style = {path = "../style", features = ["servo"]}
@@ -21,12 +21,13 @@ use crate::task_source::port_message::PortMessageQueue;
use js::jsapi::{JSContext, JSStructuredCloneReader, JSObject, JSTracer, MutableHandleObject};
use js::jsval::UndefinedValue;
use js::rust::{CustomAutoRooterGuard, HandleValue};
use servo_remutex::ReentrantMutex;
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::mem;
use std::os::raw;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

// FIXME: This is wrong, we need to figure out a better way of collecting message port objects per transfer
thread_local! {
@@ -38,35 +39,35 @@ struct PortMessageTask {
}

pub struct MessagePortInternal {
dom_port: Option<Trusted<MessagePort>>,
origin: String,
port_message_queue: PortMessageQueue,
enabled: bool,
has_been_shipped: bool,
entangled_port: Option<Arc<Mutex<MessagePortInternal>>>,
pending_port_messages: VecDeque<PortMessageTask>,
dom_port: RefCell<Option<Trusted<MessagePort>>>,
origin: RefCell<String>,
port_message_queue: RefCell<PortMessageQueue>,
enabled: Cell<bool>,
has_been_shipped: Cell<bool>,
entangled_port: RefCell<Option<Arc<ReentrantMutex<MessagePortInternal>>>>,
pending_port_messages: RefCell<VecDeque<PortMessageTask>>,
}

impl MessagePortInternal {
fn new(port_message_queue: PortMessageQueue, origin: String) -> MessagePortInternal {
MessagePortInternal {
dom_port: None,
origin,
port_message_queue,
enabled: false,
has_been_shipped: false,
entangled_port: None,
pending_port_messages: VecDeque::new(),
dom_port: RefCell::new(None),
origin: RefCell::new(origin),
port_message_queue: RefCell::new(port_message_queue),
enabled: Cell::new(false),
has_been_shipped: Cell::new(false),
entangled_port: RefCell::new(None),
pending_port_messages: RefCell::new(VecDeque::new()),
}
}

/// <https://html.spec.whatwg.org/multipage/#dom-messageport-postmessage>
// Step 7 substeps
#[allow(unrooted_must_root)]
fn process_pending_port_messages(&mut self) {
if let Some(task) = self.pending_port_messages.pop_front() {
fn process_pending_port_messages(&self) {
if let Some(task) = self.pending_port_messages.borrow_mut().pop_front() {
// Substep 1
let final_target_port = self.dom_port.as_ref().unwrap().root();
let final_target_port = self.dom_port.borrow().as_ref().unwrap().root();

// Substep 2
let target_global = final_target_port.global();
@@ -91,7 +92,7 @@ impl MessagePortInternal {
final_target_port.upcast(),
&target_global,
message_clone.handle(),
Some(&self.origin),
Some(&self.origin.borrow()),
None,
new_ports,
);
@@ -106,7 +107,7 @@ pub struct MessagePort {
eventtarget: EventTarget,
detached: Cell<bool>,
#[ignore_malloc_size_of = "Defined in std"]
message_port_internal: Arc<Mutex<MessagePortInternal>>,
message_port_internal: Arc<ReentrantMutex<MessagePortInternal>>,
}

#[allow(unsafe_code)]
@@ -133,20 +134,20 @@ impl MessagePort {
eventtarget: EventTarget::new_inherited(),
detached: Cell::new(false),
message_port_internal: Arc::new(
Mutex::new(
ReentrantMutex::new(
MessagePortInternal::new(global.port_message_queue().clone(), origin)
)
),
}
}

fn new_transferred(
message_port_internal: Arc<Mutex<MessagePortInternal>>,
message_port_internal: Arc<ReentrantMutex<MessagePortInternal>>,
origin: String,
) -> MessagePort {
{
let mut internal = message_port_internal.lock().unwrap();
internal.origin = origin;
let internal = message_port_internal.lock().unwrap();
*internal.origin.borrow_mut() = origin;
}

MessagePort {
@@ -161,27 +162,27 @@ impl MessagePort {
let origin = owner.origin().immutable().ascii_serialization();
let message_port = reflect_dom_object(Box::new(MessagePort::new_inherited(owner, origin)), owner, Wrap);
{
let mut internal = message_port.message_port_internal.lock().unwrap();
internal.dom_port = Some(Trusted::new(&*message_port));
let internal = message_port.message_port_internal.lock().unwrap();
*internal.dom_port.borrow_mut() = Some(Trusted::new(&*message_port));
}
message_port
}

/// <https://html.spec.whatwg.org/multipage/#entangle>
pub fn entangle(&self, other: &MessagePort) {
{
let mut internal = self.message_port_internal.lock().unwrap();
internal.entangled_port = Some(other.message_port_internal.clone());
let internal = self.message_port_internal.lock().unwrap();
*internal.entangled_port.borrow_mut() = Some(other.message_port_internal.clone());
}
let mut internal = other.message_port_internal.lock().unwrap();
internal.entangled_port = Some(self.message_port_internal.clone());
let internal = other.message_port_internal.lock().unwrap();
*internal.entangled_port.borrow_mut() = Some(self.message_port_internal.clone());
}

/// <https://html.spec.whatwg.org/multipage/#dom-messageport-postmessage>
// Step 7 substeps
fn process_pending_port_messages(&self) {
if self.detached.get() { return; }
let mut internal = self.message_port_internal.lock().unwrap();
let internal = self.message_port_internal.lock().unwrap();
internal.process_pending_port_messages();
}
}
@@ -196,16 +197,16 @@ impl Transferable for MessagePort {
extra_data: *mut u64
) -> bool {
{
let mut internal = self.message_port_internal.lock().unwrap();
let internal = self.message_port_internal.lock().unwrap();
// Step 1
internal.has_been_shipped = true;
internal.has_been_shipped.set(true);

// Step 3
if let Some(ref other_port) = internal.entangled_port {
let mut entangled_internal = other_port.lock().unwrap();
if let Some(ref other_port) = *internal.entangled_port.borrow() {
let entangled_internal = other_port.lock().unwrap();
// Substep 1
entangled_internal.has_been_shipped = true;
}
entangled_internal.has_been_shipped.set(true);
}; // This line MUST contain a semicolon, due to the strict drop check rule
}

unsafe {
@@ -230,22 +231,22 @@ impl Transferable for MessagePort {
) -> bool {
let owner = unsafe { GlobalScope::from_context(cx) };

let internal = unsafe { Arc::from_raw(content as *const Mutex<MessagePortInternal>) };
let internal = unsafe { Arc::from_raw(content as *const ReentrantMutex<MessagePortInternal>) };
let value = MessagePort::new_transferred(internal, owner.origin().immutable().ascii_serialization());

// Step 2
let message_port = reflect_dom_object(Box::new(value), &*owner, Wrap);

{
let mut internal = message_port.message_port_internal.lock().unwrap();
let internal = message_port.message_port_internal.lock().unwrap();

// Step 1
internal.has_been_shipped = true;
internal.has_been_shipped.set(true);

let dom_port = Trusted::new(&*message_port);
internal.enabled = false;
internal.dom_port = Some(dom_port);
internal.port_message_queue = owner.port_message_queue().clone();
internal.enabled.set(false);
*internal.dom_port.borrow_mut() = Some(dom_port);
*internal.port_message_queue.borrow_mut() = owner.port_message_queue().clone();
}
return_object.set(message_port.reflector().rootable().get());
TRANSFERRED_MESSAGE_PORTS.with(|list| {
@@ -280,7 +281,7 @@ impl MessagePortMethods for MessagePort {
if self.detached.get() { return Ok(()); }
let internal = self.message_port_internal.lock().unwrap();
// Step 1
let target_port = &internal.entangled_port;
let target_port = internal.entangled_port.borrow();

// Step 3
let mut doomed = false;
@@ -325,14 +326,14 @@ impl MessagePortMethods for MessagePort {

{
let target_port = target_port.as_ref().unwrap();
let mut target_internal = target_port.lock().unwrap();
target_internal.pending_port_messages.push_back(task);
let target_internal = target_port.lock().unwrap();
target_internal.pending_port_messages.borrow_mut().push_back(task);

if target_internal.enabled {
if target_internal.enabled.get() {
let target_port = target_port.clone();
let _ = target_internal.port_message_queue.queue(
let _ = target_internal.port_message_queue.borrow().queue(
task!(process_pending_port_messages: move || {
let mut internal = target_port.lock().unwrap();
let internal = target_port.lock().unwrap();
internal.process_pending_port_messages();
}),
&self.global()
@@ -346,12 +347,13 @@ impl MessagePortMethods for MessagePort {
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-start>
fn Start(&self) {
let len = {
let mut internal = self.message_port_internal.lock().unwrap();
if internal.enabled {
let internal = self.message_port_internal.lock().unwrap();
if internal.enabled.get() {
return;
}
internal.enabled = true;
internal.pending_port_messages.len()
internal.enabled.set(true);
let messages = internal.pending_port_messages.borrow();
messages.len()
};

let global = self.global();
@@ -370,13 +372,14 @@ impl MessagePortMethods for MessagePort {
/// <https://html.spec.whatwg.org/multipage/#dom-messageport-close>
fn Close(&self) {
let maybe_port = {
let mut internal = self.message_port_internal.lock().unwrap();
internal.entangled_port.take()
let internal = self.message_port_internal.lock().unwrap();
let mut maybe_port = internal.entangled_port.borrow_mut();
maybe_port.take()
};

if let Some(other) = maybe_port {
let mut other_internal = other.lock().unwrap();
other_internal.entangled_port = None;
let other_internal = other.lock().unwrap();
*other_internal.entangled_port.borrow_mut() = None;
}
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.