Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Restarting fetch client every now and then #4399

Merged
merged 1 commit into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 81 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion util/fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
parking_lot = "0.3"
log = "0.3"
reqwest = "0.2"
reqwest = "0.4"
mime = "0.2"
clippy = { version = "0.0.90", optional = true}

Expand Down
58 changes: 47 additions & 11 deletions util/fetch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

//! Fetching

use std::{io, fmt};
use std::{io, fmt, time};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};

use futures::{self, BoxFuture, Future};
use futures_cpupool::{CpuPool, CpuFuture};
use mime::{self, Mime};
use parking_lot::RwLock;
use reqwest;

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -73,24 +74,52 @@ pub trait Fetch: Clone + Send + Sync + 'static {
fn close(self) where Self: Sized {}
}

#[derive(Clone)]
const CLIENT_TIMEOUT_SECONDS: u64 = 5;

pub struct Client {
client: Arc<reqwest::Client>,
client: RwLock<(time::Instant, Arc<reqwest::Client>)>,
pool: CpuPool,
limit: Option<usize>,
}

impl Clone for Client {
fn clone(&self) -> Self {
let (ref time, ref client) = *self.client.read();
Client {
client: RwLock::new((time.clone(), client.clone())),
pool: self.pool.clone(),
limit: self.limit.clone(),
}
}
}

impl Client {
fn with_limit(limit: Option<usize>) -> Result<Self, Error> {
fn new_client() -> Result<Arc<reqwest::Client>, Error> {
let mut client = reqwest::Client::new()?;
client.redirect(reqwest::RedirectPolicy::limited(5));
Ok(Arc::new(client))
}

fn with_limit(limit: Option<usize>) -> Result<Self, Error> {
Ok(Client {
client: Arc::new(client),
client: RwLock::new((time::Instant::now(), Self::new_client()?)),
pool: CpuPool::new(4),
limit: limit,
})
}

fn client(&self) -> Result<Arc<reqwest::Client>, Error> {
{
let (ref time, ref client) = *self.client.read();
if time.elapsed() < time::Duration::from_secs(CLIENT_TIMEOUT_SECONDS) {
return Ok(client.clone());
}
}

let client = Self::new_client()?;
*self.client.write() = (time::Instant::now(), client.clone());
Ok(client)
}
}

impl Fetch for Client {
Expand All @@ -112,12 +141,19 @@ impl Fetch for Client {
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result {
debug!(target: "fetch", "Fetching from: {:?}", url);

self.pool.spawn(FetchTask {
url: url.into(),
client: self.client.clone(),
limit: self.limit,
abort: abort,
})
match self.client() {
Ok(client) => {
self.pool.spawn(FetchTask {
url: url.into(),
client: client,
limit: self.limit,
abort: abort,
})
},
Err(err) => {
self.pool.spawn(futures::future::err(err))
},
}
}
}

Expand Down
1 change: 1 addition & 0 deletions util/fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate log;

extern crate futures;
extern crate futures_cpupool;
extern crate parking_lot;
extern crate reqwest;

pub extern crate mime;
Expand Down