Skip to content

Commit

Permalink
Add Websocket task source
Browse files Browse the repository at this point in the history
According to the doc: https://html.spec.whatwg.org/multipage/web-sockets.html#network

The task source for all tasks queued in the websocket section are the
websocket task source, so this commit also updates those references to
use the appropriate one.
  • Loading branch information
Agustin Chiappe Berrini committed Sep 8, 2018
1 parent f7630da commit 5dd6e21
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 13 deletions.
13 changes: 13 additions & 0 deletions components/script/dom/globalscope.rs
Expand Up @@ -59,6 +59,7 @@ use task_source::file_reading::FileReadingTaskSource;
use task_source::networking::NetworkingTaskSource;
use task_source::performance_timeline::PerformanceTimelineTaskSource;
use task_source::remote_event::RemoteEventTaskSource;
use task_source::websocket::WebsocketTaskSource;
use time::{Timespec, get_time};
use timers::{IsInterval, OneshotTimerCallback, OneshotTimerHandle};
use timers::{OneshotTimers, TimerCallback};
Expand Down Expand Up @@ -430,6 +431,18 @@ impl GlobalScope {
unreachable!();
}

/// `ScriptChan` to send messages to the websocket task source of
/// this global scope.
pub fn websocket_task_source(&self) -> WebsocketTaskSource {
if let Some(window) = self.downcast::<Window>() {
return window.websocket_task_source();
}
if let Some(worker) = self.downcast::<WorkerGlobalScope>() {
return worker.websocket_task_source();
}
unreachable!();
}

/// Evaluate JS code on this global scope.
pub fn evaluate_js_on_global_with_result(
&self, code: &str, rval: MutableHandleValue) -> bool {
Expand Down
21 changes: 9 additions & 12 deletions components/script/dom/websocket.rs
Expand Up @@ -39,8 +39,8 @@ use std::cell::Cell;
use std::ptr;
use std::thread;
use task::{TaskOnce, TaskCanceller};
use task_source::{TaskSource, TaskSourceName};
use task_source::networking::NetworkingTaskSource;
use task_source::TaskSource;
use task_source::websocket::WebsocketTaskSource;

#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
enum WebSocketRequestState {
Expand Down Expand Up @@ -70,7 +70,7 @@ mod close_code {

pub fn close_the_websocket_connection(
address: Trusted<WebSocket>,
task_source: &NetworkingTaskSource,
task_source: &WebsocketTaskSource,
canceller: &TaskCanceller,
code: Option<u16>,
reason: String,
Expand All @@ -86,7 +86,7 @@ pub fn close_the_websocket_connection(

pub fn fail_the_websocket_connection(
address: Trusted<WebSocket>,
task_source: &NetworkingTaskSource,
task_source: &WebsocketTaskSource,
canceller: &TaskCanceller,
) {
let close_task = CloseTask {
Expand Down Expand Up @@ -199,11 +199,8 @@ impl WebSocket {
};
let _ = global.core_resource_thread().send(CoreResourceMsg::Fetch(request, channels));

// TODO: use a dedicated task source,
// https://html.spec.whatwg.org/multipage/#websocket-task-source
// When making the switch, also update the task_canceller call.
let task_source = global.networking_task_source();
let canceller = global.task_canceller(TaskSourceName::Networking);
let task_source = global.websocket_task_source();
let canceller = global.task_canceller(WebsocketTaskSource::NAME);
thread::spawn(move || {
while let Ok(event) = dom_event_receiver.recv() {
match event {
Expand Down Expand Up @@ -273,7 +270,7 @@ impl WebSocket {
WebSocketEvent,
task,
Some(pipeline_id),
TaskSourceName::Networking,
WebsocketTaskSource::NAME,
))
.unwrap();
}
Expand Down Expand Up @@ -407,10 +404,10 @@ impl WebSocketMethods for WebSocket {
// TODO: use a dedicated task source,
// https://html.spec.whatwg.org/multipage/#websocket-task-source
// When making the switch, also update the task_canceller call.
let task_source = self.global().networking_task_source();
let task_source = self.global().websocket_task_source();
fail_the_websocket_connection(address,
&task_source,
&self.global().task_canceller(TaskSourceName::Networking));
&self.global().task_canceller(WebsocketTaskSource::NAME));
}
WebSocketRequestState::Open => {
self.ready_state.set(WebSocketRequestState::Closing);
Expand Down
9 changes: 9 additions & 0 deletions components/script/dom/window.rs
Expand Up @@ -128,6 +128,7 @@ use task_source::networking::NetworkingTaskSource;
use task_source::performance_timeline::PerformanceTimelineTaskSource;
use task_source::remote_event::RemoteEventTaskSource;
use task_source::user_interaction::UserInteractionTaskSource;
use task_source::websocket::WebsocketTaskSource;
use time;
use timers::{IsInterval, TimerCallback};
use url::Position;
Expand Down Expand Up @@ -186,6 +187,8 @@ pub struct Window {
performance_timeline_task_source: PerformanceTimelineTaskSource,
#[ignore_malloc_size_of = "task sources are hard"]
remote_event_task_source: RemoteEventTaskSource,
#[ignore_malloc_size_of = "task sources are hard"]
websocket_task_source: WebsocketTaskSource,
navigator: MutNullableDom<Navigator>,
#[ignore_malloc_size_of = "Arc"]
image_cache: Arc<ImageCache>,
Expand Down Expand Up @@ -376,6 +379,10 @@ impl Window {
self.remote_event_task_source.clone()
}

pub fn websocket_task_source(&self) -> WebsocketTaskSource {
self.websocket_task_source.clone()
}

pub fn main_thread_script_chan(&self) -> &Sender<MainThreadScriptMsg> {
&self.script_chan.0
}
Expand Down Expand Up @@ -1904,6 +1911,7 @@ impl Window {
file_reading_task_source: FileReadingTaskSource,
performance_timeline_task_source: PerformanceTimelineTaskSource,
remote_event_task_source: RemoteEventTaskSource,
websocket_task_source: WebsocketTaskSource,
image_cache_chan: Sender<ImageCacheMsg>,
image_cache: Arc<ImageCache>,
resource_threads: ResourceThreads,
Expand Down Expand Up @@ -1957,6 +1965,7 @@ impl Window {
file_reading_task_source,
performance_timeline_task_source,
remote_event_task_source,
websocket_task_source,
image_cache_chan,
image_cache,
navigator: Default::default(),
Expand Down
5 changes: 5 additions & 0 deletions components/script/dom/workerglobalscope.rs
Expand Up @@ -47,6 +47,7 @@ use task_source::file_reading::FileReadingTaskSource;
use task_source::networking::NetworkingTaskSource;
use task_source::performance_timeline::PerformanceTimelineTaskSource;
use task_source::remote_event::RemoteEventTaskSource;
use task_source::websocket::WebsocketTaskSource;
use time::precise_time_ns;
use timers::{IsInterval, TimerCallback};

Expand Down Expand Up @@ -386,6 +387,10 @@ impl WorkerGlobalScope {
RemoteEventTaskSource(self.script_chan(), self.pipeline_id())
}

pub fn websocket_task_source(&self) -> WebsocketTaskSource {
WebsocketTaskSource(self.script_chan(), self.pipeline_id())
}

pub fn new_script_pair(&self) -> (Box<ScriptChan + Send>, Box<ScriptPort + Send>) {
let dedicated = self.downcast::<DedicatedWorkerGlobalScope>();
if let Some(dedicated) = dedicated {
Expand Down
6 changes: 6 additions & 0 deletions components/script/script_thread.rs
Expand Up @@ -125,6 +125,7 @@ use task_source::networking::NetworkingTaskSource;
use task_source::performance_timeline::PerformanceTimelineTaskSource;
use task_source::remote_event::RemoteEventTaskSource;
use task_source::user_interaction::UserInteractionTaskSource;
use task_source::websocket::WebsocketTaskSource;
use time::{get_time, precise_time_ns, Tm};
use url::Position;
use url::percent_encoding::percent_decode;
Expand Down Expand Up @@ -1896,6 +1897,10 @@ impl ScriptThread {
RemoteEventTaskSource(self.remote_event_task_sender.clone(), pipeline_id)
}

pub fn websocket_task_source(&self, pipeline_id: PipelineId) -> WebsocketTaskSource {
WebsocketTaskSource(self.remote_event_task_sender.clone(), pipeline_id)
}

/// Handles a request for the window title.
fn handle_get_title_msg(&self, pipeline_id: PipelineId) {
let document = match { self.documents.borrow().find_document(pipeline_id) } {
Expand Down Expand Up @@ -2220,6 +2225,7 @@ impl ScriptThread {
self.file_reading_task_source(incomplete.pipeline_id),
self.performance_timeline_task_source(incomplete.pipeline_id).clone(),
self.remote_event_task_source(incomplete.pipeline_id),
self.websocket_task_source(incomplete.pipeline_id),
self.image_cache_channel.clone(),
self.image_cache.clone(),
self.resource_threads.clone(),
Expand Down
4 changes: 3 additions & 1 deletion components/script/task_source/mod.rs
Expand Up @@ -10,6 +10,7 @@ pub mod networking;
pub mod performance_timeline;
pub mod remote_event;
pub mod user_interaction;
pub mod websocket;

use dom::globalscope::GlobalScope;
use enum_iterator::IntoEnumIterator;
Expand All @@ -28,7 +29,8 @@ pub enum TaskSourceName {
Networking,
PerformanceTimeline,
UserInteraction,
RemoteEvent
RemoteEvent,
Websocket,
}

impl TaskSourceName {
Expand Down
37 changes: 37 additions & 0 deletions components/script/task_source/websocket.rs
@@ -0,0 +1,37 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use msg::constellation_msg::PipelineId;
use script_runtime::{CommonScriptMsg, ScriptChan, ScriptThreadEventCategory};
use task::{TaskCanceller, TaskOnce};
use task_source::{TaskSource, TaskSourceName};

#[derive(JSTraceable)]
pub struct WebsocketTaskSource(pub Box<ScriptChan + Send + 'static>, pub PipelineId);

impl Clone for WebsocketTaskSource {
fn clone(&self) -> WebsocketTaskSource {
WebsocketTaskSource(self.0.clone(), self.1.clone())
}
}

impl TaskSource for WebsocketTaskSource {
const NAME: TaskSourceName = TaskSourceName::Websocket;

fn queue_with_canceller<T>(
&self,
task: T,
canceller: &TaskCanceller,
) -> Result<(), ()>
where
T: TaskOnce + 'static,
{
self.0.send(CommonScriptMsg::Task(
ScriptThreadEventCategory::NetworkEvent,
Box::new(canceller.wrap_task(task)),
Some(self.1),
WebsocketTaskSource::NAME,
))
}
}

0 comments on commit 5dd6e21

Please sign in to comment.