Skip to content

Commit

Permalink
Add cancellability to file manager load and related refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
izgzhen committed Aug 2, 2016
1 parent 7807895 commit 17ae38a
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 186 deletions.
6 changes: 0 additions & 6 deletions components/constellation/constellation.rs
Expand Up @@ -30,7 +30,6 @@ use msg::constellation_msg::{Key, KeyModifiers, KeyState, LoadData};
use msg::constellation_msg::{PipelineNamespace, PipelineNamespaceId, TraversalDirection};
use msg::constellation_msg::{SubpageId, WindowSizeType};
use net_traits::bluetooth_thread::BluetoothMethodMsg;
use net_traits::filemanager_thread::FileManagerThreadMsg;
use net_traits::image_cache_thread::ImageCacheThread;
use net_traits::storage_thread::StorageThreadMsg;
use net_traits::{self, ResourceThreads, IpcSend};
Expand Down Expand Up @@ -1042,11 +1041,6 @@ impl<Message, LTF, STF> Constellation<Message, LTF, STF>
warn!("Exit storage thread failed ({})", e);
}

debug!("Exiting file manager resource threads.");
if let Err(e) = self.public_resource_threads.send(FileManagerThreadMsg::Exit) {
warn!("Exit storage thread failed ({})", e);
}

debug!("Exiting bluetooth thread.");
if let Err(e) = self.bluetooth_thread.send(BluetoothMethodMsg::Exit) {
warn!("Exit bluetooth thread failed ({})", e);
Expand Down
22 changes: 10 additions & 12 deletions components/net/blob_loader.rs
Expand Up @@ -2,10 +2,11 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use filemanager_thread::{FileManager, UIProvider};
use hyper::header::{DispositionType, ContentDisposition, DispositionParam};
use hyper::header::{Headers, ContentType, ContentLength, Charset};
use hyper::http::RawStatus;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::ipc;
use mime::{Mime, Attr};
use mime_classifier::MimeClassifier;
use net_traits::ProgressMsg::{Payload, Done};
Expand All @@ -22,29 +23,26 @@ use util::thread::spawn_named;
// TODO: Check on GET
// https://w3c.github.io/FileAPI/#requestResponseModel

pub fn factory(filemanager_chan: IpcSender<FileManagerThreadMsg>)
-> Box<FnBox(LoadData,
LoadConsumer,
Arc<MimeClassifier>,
CancellationListener) + Send> {
pub fn factory<UI: 'static + UIProvider>(filemanager: Arc<FileManager<UI>>)
-> Box<FnBox(LoadData, LoadConsumer, Arc<MimeClassifier>, CancellationListener) + Send> {
box move |load_data: LoadData, start_chan, classifier, cancel_listener| {
spawn_named(format!("blob loader for {}", load_data.url), move || {
load_blob(load_data, start_chan, classifier, filemanager_chan, cancel_listener);
load_blob(load_data, start_chan, classifier, filemanager, cancel_listener);
})
}
}

fn load_blob(load_data: LoadData, start_chan: LoadConsumer,
fn load_blob<UI: 'static + UIProvider>
(load_data: LoadData, start_chan: LoadConsumer,
classifier: Arc<MimeClassifier>,
filemanager_chan: IpcSender<FileManagerThreadMsg>,
// XXX(izgzhen): we should utilize cancel_listener, filed in #12589
_cancel_listener: CancellationListener) {
filemanager: Arc<FileManager<UI>>,
cancel_listener: CancellationListener) {
let (chan, recv) = ipc::channel().unwrap();
if let Ok((id, origin, _fragment)) = parse_blob_url(&load_data.url.clone()) {
let id = SelectedFileId(id.simple().to_string());
let check_url_validity = true;
let msg = FileManagerThreadMsg::ReadFile(chan, id, check_url_validity, origin);
let _ = filemanager_chan.send(msg);
let _ = filemanager.handle(msg, Some(cancel_listener));

// Receive first chunk
match recv.recv().unwrap() {
Expand Down
170 changes: 81 additions & 89 deletions components/net/filemanager_thread.rs
Expand Up @@ -2,12 +2,13 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::ipc::IpcSender;
use mime_guess::guess_mime_type_opt;
use net_traits::blob_url_store::{BlobBuf, BlobURLStoreError};
use net_traits::filemanager_thread::{FileManagerThreadMsg, FileManagerResult, FilterPattern, FileOrigin};
use net_traits::filemanager_thread::{SelectedFile, RelativePos, FileManagerThreadError};
use net_traits::filemanager_thread::{SelectedFileId, ReadFileProgress};
use resource_thread::CancellationListener;
use std::collections::HashMap;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
Expand All @@ -22,10 +23,6 @@ use util::prefs::PREFS;
use util::thread::spawn_named;
use uuid::Uuid;

pub trait FileManagerThreadFactory<UI: 'static + UIProvider> {
fn new(&'static UI) -> Self;
}

/// Trait that provider of file-dialog UI should implement.
/// It will be used to initialize a generic FileManager.
/// For example, we can choose a dummy UI for testing purpose.
Expand Down Expand Up @@ -79,19 +76,6 @@ impl UIProvider for TFDProvider {
}
}

impl<UI: 'static + UIProvider> FileManagerThreadFactory<UI> for IpcSender<FileManagerThreadMsg> {
/// Create a FileManagerThread
fn new(ui: &'static UI) -> IpcSender<FileManagerThreadMsg> {
let (chan, recv) = ipc::channel().unwrap();

spawn_named("FileManager".to_owned(), move || {
FileManager::new(recv, ui).start();
});

chan
}
}

/// FileManagerStore's entry
struct FileStoreEntry {
/// Origin of the entry's "creator"
Expand Down Expand Up @@ -127,83 +111,79 @@ enum FileImpl {
Sliced(Uuid, RelativePos),
}

struct FileManager<UI: 'static + UIProvider> {
receiver: IpcReceiver<FileManagerThreadMsg>,
pub struct FileManager<UI: 'static + UIProvider> {
store: Arc<FileManagerStore<UI>>,
}

impl<UI: 'static + UIProvider> FileManager<UI> {
fn new(recv: IpcReceiver<FileManagerThreadMsg>, ui: &'static UI) -> FileManager<UI> {
pub fn new(ui: &'static UI) -> FileManager<UI> {
FileManager {
receiver: recv,
store: Arc::new(FileManagerStore::new(ui)),
}
}

/// Start the file manager event loop
fn start(&mut self) {
loop {
let store = self.store.clone();
match self.receiver.recv().unwrap() {
FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => {
spawn_named("select file".to_owned(), move || {
store.select_file(filter, sender, origin, opt_test_path);
});
}
FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => {
spawn_named("select files".to_owned(), move || {
store.select_files(filter, sender, origin, opt_test_paths);
})
}
FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => {
spawn_named("read file".to_owned(), move || {
if let Err(e) = store.try_read_file(sender.clone(), id, check_url_validity, origin) {
let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
}
})
}
FileManagerThreadMsg::PromoteMemory(blob_buf, set_valid, sender, origin) => {
spawn_named("transfer memory".to_owned(), move || {
store.promote_memory(blob_buf, set_valid, sender, origin);
/// Message handler
pub fn handle(&self, msg: FileManagerThreadMsg, cancel_listener: Option<CancellationListener>) {
let store = self.store.clone();
match msg {
FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => {
spawn_named("select file".to_owned(), move || {
store.select_file(filter, sender, origin, opt_test_path);
});
}
FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => {
spawn_named("select files".to_owned(), move || {
store.select_files(filter, sender, origin, opt_test_paths);
})
}
FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => {
spawn_named("read file".to_owned(), move || {
if let Err(e) = store.try_read_file(sender.clone(), id, check_url_validity,
origin, cancel_listener) {
let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e)));
}
})
}
FileManagerThreadMsg::PromoteMemory(blob_buf, set_valid, sender, origin) => {
spawn_named("transfer memory".to_owned(), move || {
store.promote_memory(blob_buf, set_valid, sender, origin);
})
}
FileManagerThreadMsg::AddSlicedURLEntry(id, rel_pos, sender, origin) =>{
spawn_named("add sliced URL entry".to_owned(), move || {
store.add_sliced_url_entry(id, rel_pos, sender, origin);
})
}
FileManagerThreadMsg::DecRef(id, origin, sender) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("dec ref".to_owned(), move || {
// Since it is simple DecRef (possibly caused by close/drop),
// unset_url_validity is false
let _ = sender.send(store.dec_ref(&id, &origin));
})
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
FileManagerThreadMsg::AddSlicedURLEntry(id, rel_pos, sender, origin) =>{
spawn_named("add sliced URL entry".to_owned(), move || {
store.add_sliced_url_entry(id, rel_pos, sender, origin);
}
FileManagerThreadMsg::RevokeBlobURL(id, origin, sender) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("revoke blob url".to_owned(), move || {
// Since it is revocation, unset_url_validity is true
let _ = sender.send(store.set_blob_url_validity(false, &id, &origin));
})
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
FileManagerThreadMsg::RevokeBlobURL(id, origin, sender) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("revoke blob url".to_owned(), move || {
// Since it is revocation, unset_url_validity is true
let _ = sender.send(store.set_blob_url_validity(false, &id, &origin));
})
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
}
FileManagerThreadMsg::DecRef(id, origin, sender) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("dec ref".to_owned(), move || {
// Since it is simple DecRef (possibly caused by close/drop),
// unset_url_validity is false
let _ = sender.send(store.dec_ref(&id, &origin));
})
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
}
FileManagerThreadMsg::ActivateBlobURL(id, sender, origin) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("activate blob url".to_owned(), move || {
let _ = sender.send(store.set_blob_url_validity(true, &id, &origin));
});
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
}
FileManagerThreadMsg::ActivateBlobURL(id, sender, origin) => {
if let Ok(id) = Uuid::parse_str(&id.0) {
spawn_named("activate blob url".to_owned(), move || {
let _ = sender.send(store.set_blob_url_validity(true, &id, &origin));
});
} else {
let _ = sender.send(Err(BlobURLStoreError::InvalidFileID));
}
FileManagerThreadMsg::Exit => break,
};
}
}
}
}
Expand Down Expand Up @@ -405,7 +385,8 @@ impl <UI: 'static + UIProvider> FileManagerStore<UI> {

fn get_blob_buf(&self, sender: IpcSender<FileManagerResult<ReadFileProgress>>,
id: &Uuid, origin_in: &FileOrigin, rel_pos: RelativePos,
check_url_validity: bool) -> Result<(), BlobURLStoreError> {
check_url_validity: bool,
cancel_listener: Option<CancellationListener>) -> Result<(), BlobURLStoreError> {
let file_impl = try!(self.get_impl(id, origin_in, check_url_validity));
match file_impl {
FileImpl::Memory(buf) => {
Expand Down Expand Up @@ -437,7 +418,7 @@ impl <UI: 'static + UIProvider> FileManagerStore<UI> {
let range = rel_pos.to_abs_range(metadata.size as usize);

let mut file = try!(File::open(&metadata.path)
.map_err(|e| BlobURLStoreError::External(e.to_string())));
.map_err(|e| BlobURLStoreError::External(e.to_string())));
let seeked_start = try!(file.seek(SeekFrom::Start(range.start as u64))
.map_err(|e| BlobURLStoreError::External(e.to_string())));

Expand All @@ -447,7 +428,8 @@ impl <UI: 'static + UIProvider> FileManagerStore<UI> {
None => "".to_string(),
};

chunked_read(sender, &mut file, range.len(), opt_filename, type_string);
chunked_read(sender, &mut file, range.len(), opt_filename,
type_string, cancel_listener);
Ok(())
} else {
Err(BlobURLStoreError::InvalidEntry)
Expand All @@ -456,17 +438,20 @@ impl <UI: 'static + UIProvider> FileManagerStore<UI> {
FileImpl::Sliced(parent_id, inner_rel_pos) => {
// Next time we don't need to check validity since
// we have already done that for requesting URL if necessary
self.get_blob_buf(sender, &parent_id, origin_in, rel_pos.slice_inner(&inner_rel_pos), false)
self.get_blob_buf(sender, &parent_id, origin_in,
rel_pos.slice_inner(&inner_rel_pos), false,
cancel_listener)
}
}
}

// Convenient wrapper over get_blob_buf
fn try_read_file(&self, sender: IpcSender<FileManagerResult<ReadFileProgress>>,
id: SelectedFileId, check_url_validity: bool,
origin_in: FileOrigin) -> Result<(), BlobURLStoreError> {
id: SelectedFileId, check_url_validity: bool, origin_in: FileOrigin,
cancel_listener: Option<CancellationListener>) -> Result<(), BlobURLStoreError> {
let id = try!(Uuid::parse_str(&id.0).map_err(|_| BlobURLStoreError::InvalidFileID));
self.get_blob_buf(sender, &id, &origin_in, RelativePos::full_range(), check_url_validity)
self.get_blob_buf(sender, &id, &origin_in, RelativePos::full_range(),
check_url_validity, cancel_listener)
}

fn dec_ref(&self, id: &Uuid, origin_in: &FileOrigin) -> Result<(), BlobURLStoreError> {
Expand Down Expand Up @@ -580,7 +565,8 @@ fn select_files_pref_enabled() -> bool {
const CHUNK_SIZE: usize = 8192;

fn chunked_read(sender: IpcSender<FileManagerResult<ReadFileProgress>>,
file: &mut File, size: usize, opt_filename: Option<String>, type_string: String) {
file: &mut File, size: usize, opt_filename: Option<String>,
type_string: String, cancel_listener: Option<CancellationListener>) {
// First chunk
let mut buf = vec![0; CHUNK_SIZE];
match file.read(&mut buf) {
Expand All @@ -602,6 +588,12 @@ fn chunked_read(sender: IpcSender<FileManagerResult<ReadFileProgress>>,

// Send the remaining chunks
loop {
if let Some(ref listener) = cancel_listener.as_ref() {
if listener.is_cancelled() {
break;
}
}

let mut buf = vec![0; CHUNK_SIZE];
match file.read(&mut buf) {
Ok(0) => {
Expand Down

0 comments on commit 17ae38a

Please sign in to comment.