Skip to content

Commit

Permalink
Merge pull request #183 from dwijnand/fix-stream-caching
Browse files Browse the repository at this point in the history
Fix & test conditional requests work with pages
  • Loading branch information
softprops committed Jan 4, 2019
2 parents 16b3ab9 + d2867eb commit 8d4827c
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ script:
- travis_wait cargo test
- cargo run --example rate_limit
- cargo build --features httpcache --example conditional_requests
- cargo test --features httpcache --doc http_cache
- cargo test --features httpcache --test conditional_requests

# Cache `cargo install`ed tools, but don't cache the project's `target`
# directory (which ends up over-caching and filling all disk space!)
Expand Down
113 changes: 93 additions & 20 deletions src/http_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Implements <https://tools.ietf.org/html/rfc7232> Conditional Requests

use std;
use std::collections::hash_map::DefaultHasher;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs;
use std::hash::{Hash, Hasher};
use std::io;
use std::path::{Path, PathBuf};

Expand All @@ -15,9 +17,16 @@ use {Error, Result};
pub type BoxedHttpCache = Box<HttpCache + Send>;

pub trait HttpCache: HttpCacheClone + Debug {
fn cache_body_and_etag(&self, uri: &str, body: &[u8], etag: &[u8]) -> Result<()>;
fn cache_response(
&self,
uri: &str,
body: &[u8],
etag: &[u8],
next_link: &Option<String>,
) -> Result<()>;
fn lookup_etag(&self, uri: &str) -> Result<String>;
fn lookup_body(&self, uri: &str) -> Result<String>;
fn lookup_next_link(&self, uri: &str) -> Result<Option<String>>;
}

impl HttpCache {
Expand All @@ -28,7 +37,7 @@ impl HttpCache {
pub fn in_home_dir() -> BoxedHttpCache {
let mut dir = dirs::home_dir().expect("Expected a home dir");
dir.push(".hubcaps/cache");
Box::new(FileBasedCache { root: dir })
Box::new(FileBasedCache::new(dir))
}
}

Expand All @@ -42,7 +51,7 @@ impl Clone for BoxedHttpCache {
pub struct NoCache;

impl HttpCache for NoCache {
fn cache_body_and_etag(&self, _: &str, _: &[u8], _: &[u8]) -> Result<()> {
fn cache_response(&self, _: &str, _: &[u8], _: &[u8], _: &Option<String>) -> Result<()> {
Ok(())
}
fn lookup_etag(&self, _uri: &str) -> Result<String> {
Expand All @@ -51,22 +60,43 @@ impl HttpCache for NoCache {
fn lookup_body(&self, _uri: &str) -> Result<String> {
no_read("No body cached")
}
fn lookup_next_link(&self, _uri: &str) -> Result<Option<String>> {
no_read("No next link cached")
}
}

#[derive(Clone, Debug)]
pub struct FileBasedCache {
root: PathBuf,
}

impl FileBasedCache {
#[doc(hidden)] // public for integration testing only
pub fn new<P: Into<PathBuf>>(root: P) -> FileBasedCache {
FileBasedCache { root: root.into() }
}
}

impl HttpCache for FileBasedCache {
fn cache_body_and_etag(&self, uri: &str, body: &[u8], etag: &[u8]) -> Result<()> {
fn cache_response(
&self,
uri: &str,
body: &[u8],
etag: &[u8],
next_link: &Option<String>,
) -> Result<()> {
let mut path = cache_path(&self.root, &uri, "json");
trace!("caching body at path: {}", path.display());
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&path, body)?;
path.set_extension("etag");
fs::write(&path, etag)?;
if let Some(next_link) = next_link {
path.set_extension("next_link");
fs::write(&path, next_link)?;
}
Ok(())
}

Expand All @@ -77,34 +107,58 @@ impl HttpCache for FileBasedCache {
fn lookup_body(&self, uri: &str) -> Result<String> {
read_to_string(cache_path(&self.root, uri, "json"))
}

fn lookup_next_link(&self, uri: &str) -> Result<Option<String>> {
let path = cache_path(&self.root, uri, "next_link");
if path.exists() {
Ok(Some(read_to_string(path)?))
} else {
Ok(None)
}
}
}

/// cache_path("https://api.github.com/users/dwijnand/repos", "json") ==>
/// ~/.hubcaps/cache/v1/https/api.github.com/users/dwijnand/repos.json
fn cache_path<S: AsRef<OsStr>>(dir: &Path, uri: &str, extension: S) -> PathBuf {
/// Construct the cache path for the given URI and extension, from an initial directory.
///
/// # Examples
///
/// ```
/// # use std::path::PathBuf;
/// # use hubcaps::http_cache::cache_path;
/// assert_eq!(
/// cache_path(&PathBuf::from("/home/.hubcaps/cache"), "https://api.github.com/users/dwijnand/repos", "json"),
/// PathBuf::from("/home/.hubcaps/cache/v1/https/api.github.com/users/dwijnand/repos.json"),
/// );
/// assert_eq!(
/// cache_path(&PathBuf::from("/home/.hubcaps/cache"), "https://api.github.com/users/dwijnand/repos?page=2", "json"),
/// PathBuf::from("/home/.hubcaps/cache/v1/https/api.github.com/users/dwijnand/repos/6dd58bde8abb0869.json"),
/// );
/// assert_eq!(
/// cache_path(&PathBuf::from("/home/.hubcaps/cache"), "https://api.github.com/users/dwijnand/repos?page=2&per_page=5", "json"),
/// PathBuf::from("/home/.hubcaps/cache/v1/https/api.github.com/users/dwijnand/repos/d862dcd2d85cebca.json"),
/// );
/// ```
#[doc(hidden)] // public for doc testing only
pub fn cache_path<S: AsRef<OsStr>>(dir: &Path, uri: &str, extension: S) -> PathBuf {
let uri = uri.parse::<Uri>().expect("Expected a URI");
let mut path = dir.to_path_buf();
path.push("v1");
path.push(uri.scheme_part().expect("Expected a URI scheme").as_ref()); // https
path.push(
uri.authority_part()
.expect("Expected a URI authority")
.as_ref(),
); // api.github.com
path.push(
Path::new(uri.path()) // /users/dwijnand/repos
.strip_prefix("/")
.expect("Expected URI path to start with /"),
);
path.set_extension(extension);
path.push(uri.scheme_part().expect("no URI scheme").as_str()); // https
path.push(uri.authority_part().expect("no URI authority").as_str()); // api.github.com
path.push(Path::new(&uri.path()[1..])); // users/dwijnand/repos
if let Some(query) = uri.query() {
path.push(hash1(query, DefaultHasher::new())); // fa269019d5035d5f
}
path.set_extension(extension); // .json
path
}

fn read_to_string<P: AsRef<Path>>(path: P) -> Result<String> {
trace!("reading path: {}", path.as_ref().display());
fs::read_to_string(path).map_err(Error::from)
}

fn no_read<E: Into<Box<std::error::Error + Send + Sync>>>(error: E) -> Result<String> {
fn no_read<T, E: Into<Box<std::error::Error + Send + Sync>>>(error: E) -> Result<T> {
Err(Error::from(io::Error::new(io::ErrorKind::NotFound, error)))
}

Expand All @@ -124,3 +178,22 @@ where
Box::new(self.clone())
}
}

fn hash1<A: Hash, H: Hasher>(x: A, mut hasher: H) -> String {
x.hash(&mut hasher);
u64_to_padded_hex(hasher.finish())
}

/// Construct a 0-padded hex string from a u64.
///
/// # Examples
///
/// ```
/// # use hubcaps::http_cache::u64_to_padded_hex;
/// assert_eq!(u64_to_padded_hex(0), "0000000000000000");
/// assert_eq!(u64_to_padded_hex(u64::max_value()), "ffffffffffffffff");
/// ```
#[doc(hidden)] // public for doc testing only
pub fn u64_to_padded_hex(x: u64) -> String {
format!("{:016x}", x)
}
25 changes: 21 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ use hyper::header::{ACCEPT, AUTHORIZATION, ETAG, LINK, LOCATION, USER_AGENT};
use hyper::{Body, Client, Method, Request, StatusCode, Uri};
#[cfg(feature = "tls")]
use hyper_tls::HttpsConnector;
#[cfg(feature = "httpcache")]
use hyperx::header::LinkValue;
use hyperx::header::{qitem, Link, RelationType};
use mime::Mime;
use serde::de::DeserializeOwned;
use url::Url;

#[doc(hidden)] // public for doc testing and integration testing only
#[cfg(feature = "httpcache")]
mod http_cache;
pub mod http_cache;
#[macro_use]
mod macros; // expose json! macro to child modules
pub mod activity;
Expand Down Expand Up @@ -800,10 +803,12 @@ where
#[cfg(feature = "httpcache")]
{
if let Some(etag) = etag {
if let Err(e) = instance2.http_cache.cache_body_and_etag(
let next_link = link.as_ref().and_then(|l| next_link(&l));
if let Err(e) = instance2.http_cache.cache_response(
&uri3,
&response_body,
&etag,
&next_link,
) {
// failing to cache isn't fatal, so just log & swallow the error
debug!("Failed to cache body & etag: {}", e);
Expand All @@ -824,8 +829,20 @@ where
.map_err(Error::from)
.and_then(|body| {
serde_json::from_str::<Out>(&body)
.map(|out| (link, out))
.map_err(|error| ErrorKind::Codec(error).into())
.map_err(Error::from)
.and_then(|out| {
let link = match link {
Some(link) => Ok(Some(link)),
None => instance2
.http_cache
.lookup_next_link(&uri3)
.map(|next_link| next_link.map(|next| {
let next = LinkValue::new(next).push_rel(RelationType::Next);
Link::new(vec![next])
}))
};
link.map(|link| (link, out))
})
})
}
#[cfg(not(feature = "httpcache"))]
Expand Down
103 changes: 103 additions & 0 deletions tests/conditional_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
extern crate pretty_env_logger;
extern crate futures;
extern crate hubcaps;
extern crate hyper;
extern crate hyper_tls;
#[cfg(feature = "httpcache")]
#[macro_use]
extern crate log;
extern crate tokio;

#[cfg(feature = "httpcache")]
use std::env;

#[cfg(feature = "httpcache")]
use futures::{future, Stream};
#[cfg(feature = "httpcache")]
use hyper::Client;
#[cfg(feature = "httpcache")]
use hyper_tls::HttpsConnector;
#[cfg(feature = "httpcache")]
use tokio::runtime::Runtime;

#[cfg(feature = "httpcache")]
use hubcaps::http_cache::FileBasedCache;
#[cfg(feature = "httpcache")]
use hubcaps::repositories::UserRepoListOptions;
#[cfg(feature = "httpcache")]
use hubcaps::{Credentials, Error, Github, Result};

#[cfg(feature = "httpcache")]
mod testkit;

#[test]
#[cfg(feature = "httpcache")]
fn compare_counts() -> Result<()> {
pretty_env_logger::init();

let mut rt = Runtime::new()?;

let agent = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
let credentials = match env::var("GITHUB_TOKEN").ok() {
Some(token) => Some(Credentials::Token(token)),
None => {
if env::var("CI") == Ok(String::from("true")) {
println!("No GITHUB_TOKEN env var in CI, skipping test");
return Ok(());
} else {
None
}
}
};
let owner = "octocat";
let per_page = 5;
let repo_list_options = UserRepoListOptions::builder().per_page(per_page).build();

info!("first get the total count of repos, without caching");

let github = Github::new(agent, credentials.clone());
let repos = github.user_repos(owner).iter(&repo_list_options);
let total_count = rt.block_on(repos.fold(0, |acc, _repo| future::ok::<_, Error>(acc + 1)))?;

// octocat current has 8 repos, so we set per_page to 5 to get 2 pages
// but if octocat ends up having less than 5 repos, it'll be just one page
// and therefore nullify this test, so we sanity check
assert!(
total_count > per_page,
"test sanity check failed, total_count: {}, per_page: {}",
total_count,
per_page,
);

info!("then get the total count with a cache");

let host = "https://api.github.com";
let client = Client::builder().build(HttpsConnector::new(4).unwrap());
let cache_path = testkit::test_home().join(".hubcaps/cache");
let http_cache = Box::new(FileBasedCache::new(cache_path));
let github: Github<_> = Github::custom(host, agent, credentials, client, http_cache);

info!("first populate the cache");

let repos = github.user_repos(owner).iter(&repo_list_options);
let count1 = rt.block_on(repos.fold(0, |acc, _repo| future::ok::<_, Error>(acc + 1)))?;
let status1 = rt.block_on(github.rate_limit().get())?;

info!("then retrieve via the cache");

let repos = github.user_repos(owner).iter(&repo_list_options);
let count2 = rt.block_on(repos.fold(0, |acc, _repo| future::ok::<_, Error>(acc + 1)))?;
let status2 = rt.block_on(github.rate_limit().get())?;

info!("and compare the counts");

assert_eq!(count1, count2);

info!("and while we're at it, compare that caching mitigates rate limiting");

let rem1 = status1.resources.core.remaining;
let rem2 = status2.resources.core.remaining;
assert_eq!(rem1, rem2);

Ok(())
}
Loading

0 comments on commit 8d4827c

Please sign in to comment.