Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge functionality of WebsocketConnect into Fetch #18871

Merged
merged 1 commit into from Oct 25, 2017
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Merge functionality of WebsocketConnect into Fetch

  • Loading branch information
KiChjang committed Oct 25, 2017
commit 99f9696a24ece562f74831dbde957af1149eb9eb
@@ -11,7 +11,7 @@ use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use msg::constellation_msg::PipelineId;
use net::http_loader::{set_default_accept, set_default_accept_language};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg};
use net_traits::{CoreResourceMsg, FetchChannels, FetchMetadata, FetchResponseMsg};
use net_traits::{IpcSend, NetworkError, ResourceThreads};
use net_traits::request::{Destination, RequestInit};
use net_traits::response::ResponseInit;
@@ -64,7 +64,7 @@ impl NetworkListener {

CoreResourceMsg::Fetch(
listener.req_init.clone(),
ipc_sender)
FetchChannels::ResponseMsg(ipc_sender))
}
};

@@ -16,9 +16,9 @@ use http_loader::{HttpState, http_redirect_fetch};
use hyper_serde::Serde;
use ipc_channel::ipc::{self, IpcReceiver, IpcReceiverSet, IpcSender};
use net_traits::{CookieSource, CoreResourceThread};
use net_traits::{CoreResourceMsg, FetchResponseMsg};
use net_traits::{CoreResourceMsg, FetchChannels, FetchResponseMsg};
use net_traits::{CustomResponseMediator, ResourceId};
use net_traits::{ResourceThreads, WebSocketCommunicate, WebSocketConnectData};
use net_traits::{ResourceThreads, WebSocketDomAction, WebSocketNetworkEvent};
use net_traits::request::{Request, RequestInit};
use net_traits::response::{Response, ResponseInit};
use net_traits::storage_thread::StorageThreadMsg;
@@ -155,12 +155,16 @@ impl ResourceChannelManager {
msg: CoreResourceMsg,
http_state: &Arc<HttpState>) -> bool {
match msg {
CoreResourceMsg::Fetch(req_init, sender) =>
self.resource_manager.fetch(req_init, None, sender, http_state),
CoreResourceMsg::Fetch(req_init, channels) => {
match channels {
FetchChannels::ResponseMsg(sender) =>
self.resource_manager.fetch(req_init, None, sender, http_state),
FetchChannels::WebSocket { event_sender, action_receiver } =>
self.resource_manager.websocket_connect(req_init, event_sender, action_receiver, http_state),
}
}
CoreResourceMsg::FetchRedirect(req_init, res_init, sender) =>
self.resource_manager.fetch(req_init, Some(res_init), sender, http_state),
CoreResourceMsg::WebsocketConnect(connect, connect_data) =>
self.resource_manager.websocket_connect(connect, connect_data, http_state),
CoreResourceMsg::SetCookieForUrl(request, cookie, source) =>
self.resource_manager.set_cookie_for_url(&request, cookie.into_inner(), source, http_state),
CoreResourceMsg::SetCookiesForUrl(request, cookies, source) => {
@@ -360,10 +364,13 @@ impl CoreResourceManager {
}).expect("Thread spawning failed");
}

fn websocket_connect(&self,
connect: WebSocketCommunicate,
connect_data: WebSocketConnectData,
http_state: &Arc<HttpState>) {
websocket_loader::init(connect, connect_data, http_state.clone());
fn websocket_connect(
&self,
request: RequestInit,
event_sender: IpcSender<WebSocketNetworkEvent>,
action_receiver: IpcReceiver<WebSocketDomAction>,
http_state: &Arc<HttpState>
) {
websocket_loader::init(request, event_sender, action_receiver, http_state.clone());
}
}
@@ -15,9 +15,10 @@ use hyper::method::Method;
use hyper::net::HttpStream;
use hyper::status::StatusCode;
use hyper::version::HttpVersion;
use net_traits::{CookieSource, MessageData, NetworkError, WebSocketCommunicate, WebSocketConnectData};
use ipc_channel::ipc::{IpcReceiver, IpcSender};
use net_traits::{CookieSource, MessageData, NetworkError};
use net_traits::{WebSocketDomAction, WebSocketNetworkEvent};
use net_traits::request::Destination;
use net_traits::request::{Destination, RequestInit, RequestMode};
use servo_url::ServoUrl;
use std::ascii::AsciiExt;
use std::io::{self, Write};
@@ -32,22 +33,22 @@ use websocket::message::Type as MessageType;
use websocket::receiver::Receiver;
use websocket::sender::Sender;

pub fn init(connect: WebSocketCommunicate,
connect_data: WebSocketConnectData,
http_state: Arc<HttpState>) {
thread::Builder::new().name(format!("WebSocket connection to {}", connect_data.resource_url)).spawn(move || {
let channel = establish_a_websocket_connection(connect_data.resource_url,
connect_data.origin,
connect_data.protocols,
&http_state);
pub fn init(
req_init: RequestInit,
resource_event_sender: IpcSender<WebSocketNetworkEvent>,
dom_action_receiver: IpcReceiver<WebSocketDomAction>,
http_state: Arc<HttpState>
) {
thread::Builder::new().name(format!("WebSocket connection to {}", req_init.url)).spawn(move || {
let channel = establish_a_websocket_connection(req_init, &http_state);
let (ws_sender, mut receiver) = match channel {
Ok((protocol_in_use, sender, receiver)) => {
let _ = connect.event_sender.send(WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use });
let _ = resource_event_sender.send(WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use });
(sender, receiver)
},
Err(e) => {
debug!("Failed to establish a WebSocket connection: {:?}", e);
let _ = connect.event_sender.send(WebSocketNetworkEvent::Fail);
let _ = resource_event_sender.send(WebSocketNetworkEvent::Fail);
return;
}

@@ -58,7 +59,6 @@ pub fn init(connect: WebSocketCommunicate,

let initiated_close_incoming = initiated_close.clone();
let ws_sender_incoming = ws_sender.clone();
let resource_event_sender = connect.event_sender;
thread::spawn(move || {
for message in receiver.incoming_messages() {
let message: Message = match message {
@@ -92,7 +92,7 @@ pub fn init(connect: WebSocketCommunicate,
}
});

while let Ok(dom_action) = connect.action_receiver.recv() {
while let Ok(dom_action) = dom_action_receiver.recv() {
match dom_action {
WebSocketDomAction::SendMessage(MessageData::Text(data)) => {
ws_sender.lock().unwrap().send_message(&Message::text(data)).unwrap();
@@ -146,14 +146,15 @@ fn obtain_a_websocket_connection(url: &ServoUrl) -> Result<Stream, NetworkError>
}

// https://fetch.spec.whatwg.org/#concept-websocket-establish
fn establish_a_websocket_connection(resource_url: ServoUrl,
origin: String,
protocols: Vec<String>,
http_state: &HttpState)
-> Result<(Option<String>,
Sender<Stream>,
Receiver<Stream>),
NetworkError> {
fn establish_a_websocket_connection(
req_init: RequestInit,
http_state: &HttpState
) -> Result<(Option<String>, Sender<Stream>, Receiver<Stream>), NetworkError>
{
let protocols = match req_init.mode {
RequestMode::WebSocket { protocols } => protocols.clone(),
_ => panic!("Received a RequestInit with a non-websocket mode in websocket_loader"),
};
// Steps 1 is not really applicable here, given we don't exactly go
// through the same infrastructure as the Fetch spec.

@@ -184,7 +185,7 @@ fn establish_a_websocket_connection(resource_url: ServoUrl,
// TODO: handle permessage-deflate extension.

// Step 11 and network error check from step 12.
let response = fetch(resource_url, origin, headers, http_state)?;
let response = fetch(req_init.url, req_init.origin.ascii_serialization(), headers, http_state)?;

// Step 12, the status code check.
if response.status != StatusCode::SwitchingProtocols {
@@ -340,25 +340,20 @@ pub enum WebSocketNetworkEvent {
}

#[derive(Deserialize, Serialize)]
pub struct WebSocketCommunicate {
pub event_sender: IpcSender<WebSocketNetworkEvent>,
pub action_receiver: IpcReceiver<WebSocketDomAction>,
}

#[derive(Deserialize, Serialize)]
pub struct WebSocketConnectData {
pub resource_url: ServoUrl,
pub origin: String,
pub protocols: Vec<String>,
/// IPC channels to communicate with the script thread about network or DOM events.
pub enum FetchChannels {
ResponseMsg(IpcSender<FetchResponseMsg>),
WebSocket {
event_sender: IpcSender<WebSocketNetworkEvent>,
action_receiver: IpcReceiver<WebSocketDomAction>,
}
}

#[derive(Deserialize, Serialize)]
pub enum CoreResourceMsg {
Fetch(RequestInit, IpcSender<FetchResponseMsg>),
Fetch(RequestInit, FetchChannels),
/// Initiate a fetch in response to processing a redirection
FetchRedirect(RequestInit, ResponseInit, IpcSender<FetchResponseMsg>),
/// Try to make a websocket connection to a URL.
WebsocketConnect(WebSocketCommunicate, WebSocketConnectData),
/// Store a cookie for a given originating URL
SetCookieForUrl(ServoUrl, Serde<Cookie<'static>>, CookieSource),
/// Store a set of cookies for a given originating URL
@@ -387,7 +382,8 @@ pub fn fetch_async<F>(request: RequestInit, core_resource_thread: &CoreResourceT
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(),
Box::new(move |message| f(message.to().unwrap())));
core_resource_thread.send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
}

#[derive(Clone, Deserialize, MallocSizeOf, Serialize)]
@@ -481,7 +477,8 @@ pub fn load_whole_resource(request: RequestInit,
core_resource_thread: &CoreResourceThread)
-> Result<(Metadata, Vec<u8>), NetworkError> {
let (action_sender, action_receiver) = ipc::channel().unwrap();
core_resource_thread.send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
core_resource_thread.send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();

let mut buf = vec![];
let mut metadata = None;
@@ -69,13 +69,13 @@ pub enum Referrer {
}

/// A [request mode](https://fetch.spec.whatwg.org/#concept-request-mode)
#[derive(Clone, Copy, Deserialize, MallocSizeOf, PartialEq, Serialize)]
#[derive(Clone, Deserialize, MallocSizeOf, PartialEq, Serialize)]
pub enum RequestMode {
Navigate,
SameOrigin,
NoCors,
CorsMode,
WebSocket
WebSocket { protocols: Vec<String> }
}

/// Request [credentials mode](https://fetch.spec.whatwg.org/#concept-request-credentials-mode)
@@ -9,7 +9,8 @@
use dom::bindings::root::Dom;
use dom::document::Document;
use ipc_channel::ipc::IpcSender;
use net_traits::{CoreResourceMsg, FetchResponseMsg, ResourceThreads, IpcSend};
use net_traits::{CoreResourceMsg, FetchChannels, FetchResponseMsg};
use net_traits::{ResourceThreads, IpcSend};
use net_traits::request::RequestInit;
use servo_url::ServoUrl;
use std::thread;
@@ -124,7 +125,8 @@ impl DocumentLoader {
pub fn fetch_async_background(&self,
request: RequestInit,
fetch_target: IpcSender<FetchResponseMsg>) {
self.resource_threads.sender().send(CoreResourceMsg::Fetch(request, fetch_target)).unwrap();
self.resource_threads.sender().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(fetch_target))).unwrap();
}

/// Mark an in-progress network request complete.
@@ -23,7 +23,8 @@ use js::conversions::ToJSValConvertible;
use js::jsapi::JSAutoCompartment;
use js::jsval::UndefinedValue;
use mime::{Mime, TopLevel, SubLevel};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
use net_traits::{CoreResourceMsg, FetchChannels, FetchMetadata};
use net_traits::{FetchResponseMsg, FetchResponseListener, NetworkError};
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
use net_traits::request::{RequestInit, RequestMode};
use network_listener::{NetworkListener, PreInvoke};
@@ -489,7 +490,8 @@ impl EventSource {
ROUTER.add_route(action_receiver.to_opaque(), Box::new(move |message| {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(action_sender))).unwrap();
// Step 13
Ok(ev)
}
@@ -552,6 +554,7 @@ impl EventSourceTimeoutCallback {
request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
}
// Step 5.4
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
global.core_resource_thread().send(
CoreResourceMsg::Fetch(request, FetchChannels::ResponseMsg(self.action_sender))).unwrap();
}
}
@@ -549,7 +549,7 @@ impl RequestMethods for Request {

// https://fetch.spec.whatwg.org/#dom-request-mode
fn Mode(&self) -> RequestMode {
self.request.borrow().mode.into()
self.request.borrow().mode.clone().into()

This comment has been minimized.

@avadacatavra

avadacatavra Oct 24, 2017

Contributor

why is the clone needed?

This comment has been minimized.

@KiChjang

KiChjang Oct 24, 2017

Author Member

RequestMode used to implement Copy, but since I've added the WebSocket(Vec<String>) variant, it cannot implement Copy, hence this clone.

}

// https://fetch.spec.whatwg.org/#dom-request-credentials
@@ -758,7 +758,8 @@ impl Into<RequestMode> for NetTraitsRequestMode {
NetTraitsRequestMode::SameOrigin => RequestMode::Same_origin,
NetTraitsRequestMode::NoCors => RequestMode::No_cors,
NetTraitsRequestMode::CorsMode => RequestMode::Cors,
NetTraitsRequestMode::WebSocket => unreachable!("Websocket request mode should never be exposed to Dom"),
NetTraitsRequestMode::WebSocket { .. } =>
unreachable!("Websocket request mode should never be exposed to Dom"),
}
}
}
@@ -20,15 +20,15 @@ use dom::event::{Event, EventBubbles, EventCancelable};
use dom::eventtarget::EventTarget;
use dom::globalscope::GlobalScope;
use dom::messageevent::MessageEvent;
use dom::urlhelper::UrlHelper;
use dom_struct::dom_struct;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use js::jsapi::JSAutoCompartment;
use js::jsval::UndefinedValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use net_traits::{WebSocketCommunicate, WebSocketConnectData, WebSocketDomAction, WebSocketNetworkEvent};
use net_traits::CoreResourceMsg::WebsocketConnect;
use net_traits::{CoreResourceMsg, FetchChannels};
use net_traits::{WebSocketDomAction, WebSocketNetworkEvent};
use net_traits::MessageData;
use net_traits::request::{RequestInit, RequestMode};
use script_runtime::CommonScriptMsg;
use script_runtime::ScriptThreadEventCategory::WebSocketEvent;
use servo_url::ServoUrl;
@@ -176,12 +176,6 @@ impl WebSocket {
let ws = WebSocket::new(global, url_record.clone());
let address = Trusted::new(&*ws);

let connect_data = WebSocketConnectData {
resource_url: url_record,
origin: UrlHelper::Origin(&global.get_url()).0,
protocols: protocols,
};

// Create the interface for communication with the resource thread
let (dom_action_sender, resource_action_receiver):
(IpcSender<WebSocketDomAction>,
@@ -190,13 +184,18 @@ impl WebSocket {
(IpcSender<WebSocketNetworkEvent>,
IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap();

let connect = WebSocketCommunicate {
// Step 8.
let request = RequestInit {
url: url_record,
origin: global.origin().immutable().clone(),
mode: RequestMode::WebSocket { protocols },
..RequestInit::default()
};
let channels = FetchChannels::WebSocket {
event_sender: resource_event_sender,
action_receiver: resource_action_receiver,
};

// Step 8.
let _ = global.core_resource_thread().send(WebsocketConnect(connect, connect_data));
let _ = global.core_resource_thread().send(CoreResourceMsg::Fetch(request, channels));

*ws.sender.borrow_mut() = Some(dom_action_sender);

@@ -52,7 +52,7 @@ use ipc_channel::router::ROUTER;
use js::jsapi::{Heap, JSContext, JS_ParseJSON};
use js::jsapi::JS_ClearPendingException;
use js::jsval::{JSVal, NullValue, UndefinedValue};
use net_traits::{FetchMetadata, FilteredMetadata};
use net_traits::{FetchChannels, FetchMetadata, FilteredMetadata};
use net_traits::{FetchResponseListener, NetworkError, ReferrerPolicy};
use net_traits::CoreResourceMsg::Fetch;
use net_traits::request::{CredentialsMode, Destination, RequestInit, RequestMode};
@@ -266,7 +266,8 @@ impl XMLHttpRequest {
ROUTER.add_route(action_receiver.to_opaque(), Box::new(move |message| {
listener.notify_fetch(message.to().unwrap());
}));
global.core_resource_thread().send(Fetch(init, action_sender)).unwrap();
global.core_resource_thread().send(
Fetch(init, FetchChannels::ResponseMsg(action_sender))).unwrap();
}
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.