Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a basic HTTP memory cache #4117

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Document the cache. Refactor incomplete entries to lessen Option-itis.

  • Loading branch information
jdm committed Nov 27, 2014
commit 9815deb70344f205543e378aac2b1d8fb15b44ca
@@ -2,6 +2,9 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

//! A non-validating memory cache that only evicts expired entries and grows
//! without bound.

use http_loader::send_error_direct;
use resource_task::{Metadata, ProgressMsg, LoadResponse, LoadData, Payload, Done, start_sending_opt};

@@ -31,6 +34,7 @@ use url::Url;
//TODO: Use If-Modified-Since, Etag, etc.
//TODO: Doom incomplete entries

/// The key used to differentiate requests in the cache.
#[deriving(Clone, Hash, PartialEq, Eq)]
pub struct CacheKey {
url: Url,
@@ -53,52 +57,74 @@ impl CacheKey {
}
}

/// The list of consumers waiting on this requests's response.
enum PendingConsumers {
/// Consumers awaiting the initial response metadata
AwaitingHeaders(Vec<Sender<LoadResponse>>),
AwaitingBody(Vec<Sender<ProgressMsg>>),
/// Consumers awaiting the remaining response body
AwaitingBody(Metadata, Vec<u8>, Vec<Sender<ProgressMsg>>),

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

What is the Vec<u8>? I'd document that in the comment.

}

/// An unfulfilled request representing both the consumers waiting for the initial
/// metadata and the subsequent response body. If doomed, the entry will be removed
/// after the final payload.
struct PendingResource {
metadata: Option<Metadata>,
body: Vec<u8>,
consumers: PendingConsumers,
expires: Duration,
doomed: bool,
}

/// A complete cached resource.
struct CachedResource {
metadata: Metadata,
body: Vec<u8>,
expires: Duration,
}

/// A memory cache that tracks incomplete and complete responses, differentiated by
/// the initial request.
pub struct MemoryCache {
/// Complete cached responses.
complete_entries: HashMap<CacheKey, CachedResource>,
/// Incomplete cached responses.
pending_entries: HashMap<CacheKey, PendingResource>,
/// The time at which this cache was created for use by expiry checks.
base_time: Timespec,
}

/// Abstraction over the concept of a single target for HTTP response messages.
pub enum ResourceResponseTarget {
/// A response is being streamed into the cache.
CachedPendingResource(CacheKey, Arc<Mutex<MemoryCache>>),
/// A response is being streamed directly to a consumer and skipping the cache.
UncachedPendingResource(Sender<LoadResponse>),
}

/// Abstraction over the concept of a single target for HTTP response payload messages.
pub enum ResourceProgressTarget {
/// A response is being streamed into the cache.
CachedInProgressResource(CacheKey, Arc<Mutex<MemoryCache>>),
/// A response is being streamed directly to a consumer and skipping the cache.
UncachedInProgressResource(Sender<ProgressMsg>),
}

/// The result of matching a request against an HTTP cache.
pub enum CacheOperationResult {
/// The request cannot be cached for a given reason.
Uncacheable(&'static str),
/// The request is in the cache and the response data is forthcoming.
CachedContentPending,
/// The request is not present in the cache but will be cached with the given key.
NewCacheEntry(CacheKey),
}

/// Tokenize a header value.
fn split_header(header: &str) -> Map<&str, &str, CharSplits<char>> {
header.split(',')
.map(|v| v.trim())
}

/// Determine if a given response is cacheable based on the initial metadata received.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

What spec are you using here? Might want to cite it for posterity.

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

(also so that I can review it) :)

fn response_is_cacheable(metadata: &Metadata) -> bool {
if metadata.status != StatusOk {
return false;
@@ -134,6 +160,7 @@ fn response_is_cacheable(metadata: &Metadata) -> bool {
return true;
}

/// Determine the expiry date of the given response.
fn get_response_expiry(metadata: &Metadata) -> Duration {
metadata.headers.as_ref().and_then(|headers| {
headers.cache_control.as_ref().and_then(|cache_control| {
@@ -157,6 +184,7 @@ fn get_response_expiry(metadata: &Metadata) -> Duration {
}

impl MemoryCache {
/// Create a new memory cache instance.
pub fn new() -> MemoryCache {
MemoryCache {
complete_entries: HashMap::new(),
@@ -165,6 +193,8 @@ impl MemoryCache {
}
}

/// Mark a cached request as doomed. Any waiting consumers will immediately receive
/// an error message or a final body payload. The cache entry is immediately removed.
pub fn doom_request(&mut self, key: &CacheKey, err: String) {
info!("dooming entry for {}", key.url);
let resource = self.pending_entries.remove(key).unwrap();
@@ -174,14 +204,18 @@ impl MemoryCache {
send_error_direct(key.url.clone(), err.clone(), consumer.clone());
}
}
AwaitingBody(ref consumers) => {
AwaitingBody(_, _, ref consumers) => {
for consumer in consumers.iter() {
let _ = consumer.send_opt(Done(Ok(())));
}
}
}
}

/// Handle the initial response metadata for an incomplete cached request.
/// If the response should not be cached, the entry will be doomed and any
/// subsequent requests will not see the cached request. All waiting consumers
/// will see the new metadata.
pub fn process_metadata(&mut self, key: &CacheKey, metadata: Metadata) {
info!("storing metadata for {}", key.url);
let resource = self.pending_entries.get_mut(key).unwrap();
@@ -194,24 +228,25 @@ impl MemoryCache {
.map(|chan| chan.unwrap())
.collect();
}
AwaitingBody(_) => panic!("obtained headers for {} but awaiting body?", key.url)
AwaitingBody(..) => panic!("obtained headers for {} but awaiting body?", key.url)
}

if !response_is_cacheable(&metadata) {
resource.doomed = true;
}

resource.expires = get_response_expiry(&metadata);
resource.metadata = Some(metadata);
resource.consumers = AwaitingBody(chans);
resource.consumers = AwaitingBody(metadata, vec!(), chans);
}

/// Handle a repsonse body payload for an incomplete cached response.
/// All waiting consumers will see the new payload addition.
pub fn process_payload(&mut self, key: &CacheKey, payload: Vec<u8>) {
info!("storing partial response for {}", key.url);
let resource = self.pending_entries.get_mut(key).unwrap();
resource.body.push_all(payload.as_slice());
match resource.consumers {
AwaitingBody(ref consumers) => {
AwaitingBody(_, ref mut body, ref consumers) => {
body.push_all(payload.as_slice());
for consumer in consumers.iter() {
//FIXME: maybe remove consumer on failure to avoid extra clones?
let _ = consumer.send_opt(Payload(payload.clone()));
@@ -221,12 +256,15 @@ impl MemoryCache {
}
}

/// Handle a response body final payload for an incomplete cached response.
/// All waiting consumers will see the new message. If the cache entry is
/// doomed, it will not be transferred to the set of complete cache entries.
pub fn process_done(&mut self, key: &CacheKey) {
info!("finished fetching {}", key.url);
let resource = self.pending_entries.remove(key).unwrap();
match resource.consumers {
AwaitingHeaders(_) => panic!("saw Done for {} but awaiting headers?", key.url),
AwaitingBody(ref consumers) => {
AwaitingBody(_, _, ref consumers) => {
for consumer in consumers.iter() {
let _ = consumer.send_opt(Done(Ok(())));
}
@@ -238,14 +276,26 @@ impl MemoryCache {
return;
}

let (metadata, body) = match resource.consumers {
AwaitingBody(metadata, body, _) => (metadata, body),
_ => panic!("expected consumer list awaiting bodies"),
};

let complete = CachedResource {
metadata: resource.metadata.unwrap(),
body: resource.body,
metadata: metadata,
body: body,
expires: resource.expires,
};
self.complete_entries.insert(key.clone(), complete);
}

/// Match a new request against the set of incomplete and complete cached requests.
/// If the request matches an existing, non-doomed entry, any existing response data will
/// be synchronously streamed to the consumer. If the request does not match but can be
/// cached, a new cache entry will be created and the request will be responsible for
/// notifying the cache of the subsequent HTTP response. If the request does not match
/// and cannot be cached, the request is responsible for handling its own response and
/// consumer.
pub fn process_pending_request(&mut self, load_data: &LoadData, start_chan: Sender<LoadResponse>)
-> CacheOperationResult {
if load_data.method != Get {
@@ -286,10 +336,9 @@ impl MemoryCache {
}
}

/// Add a new pending request to the set of incomplete cache entries.
fn add_pending_cache_entry(&mut self, key: CacheKey, start_chan: Sender<LoadResponse>) {
let resource = PendingResource {
metadata: None,
body: vec!(),
consumers: AwaitingHeaders(vec!(start_chan)),
expires: MAX,
doomed: false,
@@ -298,6 +347,7 @@ impl MemoryCache {
self.pending_entries.insert(key, resource);
}

/// Synchronously send the entire cached response body to the given consumer.
fn send_complete_entry(&self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning full cache body for {}", key.url);
let resource = self.complete_entries.get(&key).unwrap();
@@ -311,40 +361,33 @@ impl MemoryCache {
}
}

/// Synchronously send all partial stored response data for a cached request to the
/// given consumer.
fn send_partial_entry(&mut self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning partial cache data for {}", key.url);

let resource = self.pending_entries.get_mut(&key).unwrap();

let metadata = resource.metadata.clone();
match metadata {
Some(metadata) => {
match resource.consumers {
AwaitingHeaders(ref mut consumers) => {
consumers.push(start_chan);
}
AwaitingBody(ref metadata, ref body, ref mut consumers) => {
info!("headers available for {}", key.url);
let progress_chan = start_sending_opt(start_chan, metadata);
let progress_chan = start_sending_opt(start_chan, metadata.clone());
match progress_chan {
Ok(chan) => {
match resource.consumers {
AwaitingHeaders(_) =>
panic!("cached metadata, but consumers awaiting headers"),
AwaitingBody(ref mut consumers) => consumers.push(chan.clone()),
}
consumers.push(chan.clone());

if !resource.body.is_empty() {
if !body.is_empty() {
info!("partial body available for {}", key.url);
let _ = chan.send_opt(Payload(resource.body.clone()));
let _ = chan.send_opt(Payload(body.clone()));
}
}

Err(_) => ()
}
}

None => {
match resource.consumers {
AwaitingHeaders(ref mut consumers) => consumers.push(start_chan),
AwaitingBody(_) => panic!("no cached metadata, but consumers awaiting body"),
}
}
}
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.