Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Restarting fetch client every now and then (#4399)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomusdrw authored and arkpar committed Feb 3, 2017
1 parent 630b728 commit 70efdae
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 23 deletions.
92 changes: 81 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion util/fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
parking_lot = "0.3"
log = "0.3"
reqwest = "0.2"
reqwest = "0.4"
mime = "0.2"
clippy = { version = "0.0.90", optional = true}

Expand Down
58 changes: 47 additions & 11 deletions util/fetch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

//! Fetching

use std::{io, fmt};
use std::{io, fmt, time};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};

use futures::{self, BoxFuture, Future};
use futures_cpupool::{CpuPool, CpuFuture};
use mime::{self, Mime};
use parking_lot::RwLock;
use reqwest;

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -71,24 +72,52 @@ pub trait Fetch: Clone + Send + Sync + 'static {
fn close(self) where Self: Sized {}
}

#[derive(Clone)]
const CLIENT_TIMEOUT_SECONDS: u64 = 5;

pub struct Client {
client: Arc<reqwest::Client>,
client: RwLock<(time::Instant, Arc<reqwest::Client>)>,
pool: CpuPool,
limit: Option<usize>,
}

impl Clone for Client {
fn clone(&self) -> Self {
let (ref time, ref client) = *self.client.read();
Client {
client: RwLock::new((time.clone(), client.clone())),
pool: self.pool.clone(),
limit: self.limit.clone(),
}
}
}

impl Client {
fn with_limit(limit: Option<usize>) -> Result<Self, Error> {
fn new_client() -> Result<Arc<reqwest::Client>, Error> {
let mut client = reqwest::Client::new()?;
client.redirect(reqwest::RedirectPolicy::limited(5));
Ok(Arc::new(client))
}

fn with_limit(limit: Option<usize>) -> Result<Self, Error> {
Ok(Client {
client: Arc::new(client),
client: RwLock::new((time::Instant::now(), Self::new_client()?)),
pool: CpuPool::new(4),
limit: limit,
})
}

fn client(&self) -> Result<Arc<reqwest::Client>, Error> {
{
let (ref time, ref client) = *self.client.read();
if time.elapsed() < time::Duration::from_secs(CLIENT_TIMEOUT_SECONDS) {
return Ok(client.clone());
}
}

let client = Self::new_client()?;
*self.client.write() = (time::Instant::now(), client.clone());
Ok(client)
}
}

impl Fetch for Client {
Expand All @@ -108,12 +137,19 @@ impl Fetch for Client {
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result {
debug!(target: "fetch", "Fetching from: {:?}", url);

self.pool.spawn(FetchTask {
url: url.into(),
client: self.client.clone(),
limit: self.limit,
abort: abort,
})
match self.client() {
Ok(client) => {
self.pool.spawn(FetchTask {
url: url.into(),
client: client,
limit: self.limit,
abort: abort,
})
},
Err(err) => {
self.pool.spawn(futures::future::err(err))
},
}
}
}

Expand Down
1 change: 1 addition & 0 deletions util/fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate log;

extern crate futures;
extern crate futures_cpupool;
extern crate parking_lot;
extern crate reqwest;

pub extern crate mime;
Expand Down

0 comments on commit 70efdae

Please sign in to comment.