Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a basic HTTP memory cache #4117

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Prev

Address review comments.

  • Loading branch information
jdm committed Dec 16, 2014
commit f3074f5719d916a0c6cce88690de100c816c75fb
@@ -5,7 +5,8 @@
#![deny(missing_docs)]

//! A non-validating memory cache that only evicts expired entries and grows
//! without bound.
//! without bound. Implements the logic specified in http://tools.ietf.org/html/rfc7234
//! and http://tools.ietf.org/html/rfc7232.

use http_loader::send_error_direct;
use resource_task::{Metadata, ProgressMsg, LoadResponse, LoadData, Payload, Done, start_sending_opt};
@@ -21,7 +22,7 @@ use http::status::Ok as StatusOk;
use std::collections::HashMap;
use std::comm::Sender;
use std::iter::Map;
use std::mem::swap;
use std::mem;
use std::num::{Bounded, FromStrRadix};
use std::str::CharSplits;
use std::sync::{Arc, Mutex};
@@ -40,7 +41,7 @@ use url::Url;
//TODO: Last-Modified
//TODO: Range requests
//TODO: Revalidation rules for query strings
//TODO: Vary
//TODO: Vary header

/// The key used to differentiate requests in the cache.
#[deriving(Clone, Hash, PartialEq, Eq)]
@@ -70,7 +71,7 @@ impl CacheKey {
enum PendingConsumers {
/// Consumers awaiting the initial response metadata
AwaitingHeaders(Vec<Sender<LoadResponse>>),
/// Consumers awaiting the remaining response body
/// Consumers awaiting the remaining response body. Incomplete body stored as Vec<u8>.
AwaitingBody(Metadata, Vec<u8>, Vec<Sender<ProgressMsg>>),

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

What is the Vec<u8>? I'd document that in the comment.

}

@@ -80,8 +81,8 @@ enum PendingConsumers {
struct PendingResource {
consumers: PendingConsumers,
expires: Duration,
doomed: bool,
last_validated: Tm,

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

nit: if you switch the order of last_validated and doomed you may shave off a few bytes in the struct

doomed: bool,
}

/// A complete cached resource.
@@ -152,6 +153,7 @@ fn any_token_matches(header: &str, tokens: &[&str]) -> bool {
}

/// Determine if a given response is cacheable based on the initial metadata received.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

What spec are you using here? Might want to cite it for posterity.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

(also so that I can review it) :)

/// Based on http://tools.ietf.org/html/rfc7234#section-5
fn response_is_cacheable(metadata: &Metadata) -> bool {
if metadata.status != StatusOk {
return false;
@@ -184,11 +186,12 @@ fn response_is_cacheable(metadata: &Metadata) -> bool {
}

/// Determine the expiry date of the given response headers.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

Can you add "returns a far future date if the response headers do not expire" for clarity?

/// Returns a far-future date if the response does not expire.
fn get_response_expiry_from_headers(headers: &ResponseHeaderCollection) -> Duration {
headers.cache_control.as_ref().and_then(|cache_control| {
for token in split_header(cache_control[]) {
let mut parts = token.split('=');
if parts.next().unwrap() == "max-age" {
if parts.next() == Some("max-age") {
return parts.next()
.and_then(|val| FromStrRadix::from_str_radix(val, 10))
.map(|secs| Duration::seconds(secs));
@@ -212,6 +215,7 @@ fn get_response_expiry_from_headers(headers: &ResponseHeaderCollection) -> Durat
}

/// Determine the expiry date of the given response.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

As above for this comment.

/// Returns a far-future date if this response does not expire.
fn get_response_expiry(metadata: &Metadata) -> Duration {
metadata.headers.as_ref().map(|headers| {
get_response_expiry_from_headers(headers)
@@ -230,7 +234,7 @@ impl MemoryCache {

/// Process a revalidation that returned new content for an expired entry.
pub fn process_revalidation_failed(&mut self, key: &CacheKey) {
info!("recreating entry for {} (cache entry expired)", key.url);
debug!("recreating entry for {} (cache entry expired)", key.url);
let resource = self.complete_entries.remove(key).unwrap();
self.add_pending_cache_entry(key.clone(), resource.revalidating_consumers);
}
@@ -239,7 +243,7 @@ impl MemoryCache {
/// receive an error message or a final body payload. The cache entry is immediately
/// removed.
pub fn doom_request(&mut self, key: &CacheKey, err: String) {
info!("dooming entry for {} ({})", key.url, err);
debug!("dooming entry for {} ({})", key.url, err);

assert!(!self.complete_entries.contains_key(key));

@@ -261,13 +265,11 @@ impl MemoryCache {
/// Handle a 304 response to a revalidation request. Updates the cached response
/// metadata with any new expiration data.
pub fn process_not_modified(&mut self, key: &CacheKey, headers: &ResponseHeaderCollection) {
info!("updating metadata for {}", key.url);
debug!("updating metadata for {}", key.url);
let resource = self.complete_entries.get_mut(key).unwrap();
resource.expires = get_response_expiry_from_headers(headers);

let mut consumers = vec!();
swap(&mut consumers, &mut resource.revalidating_consumers);
for consumer in consumers.into_iter() {
for consumer in mem::replace(&mut resource.revalidating_consumers, vec!()).into_iter() {
MemoryCache::send_complete_resource(resource, consumer);
}
}
@@ -277,7 +279,7 @@ impl MemoryCache {
/// subsequent requests will not see the cached request. All waiting consumers
/// will see the new metadata.
pub fn process_metadata(&mut self, key: &CacheKey, metadata: Metadata) {
info!("storing metadata for {}", key.url);
debug!("storing metadata for {}", key.url);
let resource = self.pending_entries.get_mut(key).unwrap();
let chans: Vec<Sender<ProgressMsg>>;
match resource.consumers {
@@ -303,7 +305,7 @@ impl MemoryCache {
/// Handle a repsonse body payload for an incomplete cached response.
/// All waiting consumers will see the new payload addition.
pub fn process_payload(&mut self, key: &CacheKey, payload: Vec<u8>) {
info!("storing partial response for {}", key.url);
debug!("storing partial response for {}", key.url);
let resource = self.pending_entries.get_mut(key).unwrap();
match resource.consumers {
AwaitingBody(_, ref mut body, ref consumers) => {
@@ -321,7 +323,7 @@ impl MemoryCache {
/// All waiting consumers will see the new message. If the cache entry is
/// doomed, it will not be transferred to the set of complete cache entries.
pub fn process_done(&mut self, key: &CacheKey) {
info!("finished fetching {}", key.url);
debug!("finished fetching {}", key.url);
let resource = self.pending_entries.remove(key).unwrap();
match resource.consumers {
AwaitingHeaders(_) => panic!("saw Done for {} but awaiting headers?", key.url),
@@ -333,7 +335,7 @@ impl MemoryCache {
}

if resource.doomed {
info!("completing dooming of {}", key.url);
debug!("completing dooming of {}", key.url);
return;
}

@@ -383,7 +385,7 @@ impl MemoryCache {
match self.complete_entries.get_mut(&key) {
Some(resource) => {
if self.base_time + resource.expires < time::now().to_timespec() {
info!("entry for {} has expired", key.url());
debug!("entry for {} has expired", key.url());
let expiry = time::at(self.base_time + resource.expires);
return revalidate(resource, &key, start_chan, ExpiryDate(expiry));
}
@@ -395,15 +397,15 @@ impl MemoryCache {
}).unwrap_or(false);

if must_revalidate {
info!("entry for {} must be revalidated", key.url());
debug!("entry for {} must be revalidated", key.url());
let last_validated = resource.last_validated;
return revalidate(resource, &key, start_chan, ExpiryDate(last_validated));
}

let etag = resource.metadata.headers.as_ref().and_then(|headers| headers.etag.clone());
match etag {
Some(etag) => {
info!("entry for {} has an Etag", key.url());
debug!("entry for {} has an Etag", key.url());
return revalidate(resource, &key, start_chan, Etag(etag.clone()));
}
None => ()
@@ -443,7 +445,7 @@ impl MemoryCache {
last_validated: time::now(),
doomed: false,
};
info!("creating cache entry for {}", key.url);
debug!("creating cache entry for {}", key.url);
self.pending_entries.insert(key, resource);
}

@@ -461,15 +463,15 @@ impl MemoryCache {

/// Synchronously send the entire cached response body to the given consumer.
fn send_complete_entry(&self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning full cache body for {}", key.url);
debug!("returning full cache body for {}", key.url);
let resource = self.complete_entries.get(&key).unwrap();
MemoryCache::send_complete_resource(resource, start_chan)
}

/// Synchronously send all partial stored response data for a cached request to the
/// given consumer.
fn send_partial_entry(&mut self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning partial cache data for {}", key.url);
debug!("returning partial cache data for {}", key.url);

let resource = self.pending_entries.get_mut(&key).unwrap();

@@ -478,14 +480,14 @@ impl MemoryCache {
consumers.push(start_chan);
}
AwaitingBody(ref metadata, ref body, ref mut consumers) => {
info!("headers available for {}", key.url);
debug!("headers available for {}", key.url);
let progress_chan = start_sending_opt(start_chan, metadata.clone());
match progress_chan {
Ok(chan) => {
consumers.push(chan.clone());

if !body.is_empty() {
info!("partial body available for {}", key.url);
debug!("partial body available for {}", key.url);
let _ = chan.send_opt(Payload(body.clone()));
}
}
@@ -18,6 +18,8 @@ use std::io::Reader;
use servo_util::task::spawn_named;
use url::Url;

//FIXME: it would be nice to reduce the numbers of procs here, but it's hard to make a consistent
// interface with the other loaders that don't need the cache.
pub fn factory<'a>(cache: Arc<Mutex<MemoryCache>>)
-> proc(load_data: LoadData, start_chan: Sender<LoadResponse>): 'a {
proc(load_data: LoadData, start_chan: Sender<LoadResponse>) {
@@ -68,7 +70,7 @@ fn load(mut load_data: LoadData, start_chan: Sender<LoadResponse>, cache: Arc<Mu
let mut url = load_data.url.clone();
let mut redirected_to = HashSet::new();

info!("checking cache for {}", url);
debug!("checking cache for {}", url);
let cache_result = {
let mut cache = cache.lock();
cache.process_pending_request(&load_data, start_chan.clone())
@@ -90,16 +92,16 @@ fn load(mut load_data: LoadData, start_chan: Sender<LoadResponse>, cache: Arc<Mu

let start_chan = match cache_result {
Uncacheable(reason) => {
info!("request for {} can't be cached: {}", url, reason);
debug!("request for {} can't be cached: {}", url, reason);
UncachedPendingResource(start_chan)
}
CachedContentPending => return,
NewCacheEntry(key) => {
info!("new cache entry for {}", url);
debug!("new cache entry for {}", url);
CachedPendingResource(key, cache)
}
Revalidate(key, _) => {
info!("revalidating {}", url);
debug!("revalidating {}", url);
CachedPendingResource(key, cache)
}
};
@@ -293,8 +293,5 @@ pub fn parse_http_timestamp(timestamp: &str) -> Option<Tm> {
}

// ANSI C's asctime() format
match strptime(timestamp, "%c") {
Ok(t) => Some(t),
Err(_) => None,
}
strptime(timestamp, "%c").ok()
}
@@ -52,17 +52,15 @@ fn run_http_server(source_dir: String) -> (Sender<ServerMsg>, u16) {
let (tx, rx) = channel();
let (port_sender, port_receiver) = channel();
task::spawn(proc() {
let mut prc = match Command::new("python")
let mut prc = Command::new("python")
.args(["../httpserver.py"])
.stdin(Ignored)
.stdout(CreatePipe(false, true))
.stderr(Ignored)
.cwd(&Path::new(source_dir))
.spawn()
{
Ok(p) => p,
_ => panic!("Unable to spawn server."),
};
.ok()
.expect("Unable to spawn server.");

let mut bytes = vec!();
loop {
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.