Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a basic HTTP memory cache #4117

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Fix incorrect revalidation logic that dropped the consumer channels o…

…n the floor.
  • Loading branch information
jdm committed Nov 27, 2014
commit bcb618384051fefa39f7b1f61030d2673af1c3c2
@@ -21,6 +21,7 @@ use http::status::Ok as StatusOk;
use std::collections::HashMap;
use std::comm::Sender;
use std::iter::Map;
use std::mem::swap;
use std::num::{Bounded, FromStrRadix};
use std::str::CharSplits;
use std::sync::{Arc, Mutex};
@@ -40,7 +41,6 @@ use url::Url;
//TODO: Range requests
//TODO: Revalidation rules for query strings
//TODO: Vary

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

Vary?

//TODO: Fix race between multiple revalidations of same entry

/// The key used to differentiate requests in the cache.
#[deriving(Clone, Hash, PartialEq, Eq)]
@@ -90,6 +90,7 @@ struct CachedResource {
body: Vec<u8>,
expires: Duration,
last_validated: Tm,
revalidating_consumers: Vec<Sender<LoadResponse>>,
}

/// A memory cache that tracks incomplete and complete responses, differentiated by
@@ -227,14 +228,20 @@ impl MemoryCache {
}
}

/// Mark a cached request as doomed. Any waiting consumers will immediately receive
/// an error message or a final body payload. The cache entry is immediately removed.
/// Process a revalidation that returned new content for an expired entry.
pub fn process_revalidation_failed(&mut self, key: &CacheKey) {
info!("recreating entry for {} (cache entry expired)", key.url);

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

Why info instead of debug? These seem like the kind of messages we'd like to compile out in debug builds…

let resource = self.complete_entries.remove(key).unwrap();
self.add_pending_cache_entry(key.clone(), resource.revalidating_consumers);
}

/// Mark an incomplete cached request as doomed. Any waiting consumers will immediately
/// receive an error message or a final body payload. The cache entry is immediately
/// removed.
pub fn doom_request(&mut self, key: &CacheKey, err: String) {
info!("dooming entry for {} ({})", key.url, err);
match self.complete_entries.remove(key) {
Some(_) => return,
None => (),
}

assert!(!self.complete_entries.contains_key(key));

let resource = self.pending_entries.remove(key).unwrap();
match resource.consumers {
@@ -257,6 +264,12 @@ impl MemoryCache {
info!("updating metadata for {}", key.url);
let resource = self.complete_entries.get_mut(key).unwrap();
resource.expires = get_response_expiry_from_headers(headers);

let mut consumers = vec!();
swap(&mut consumers, &mut resource.revalidating_consumers);

This comment has been minimized.

Copy link
@pcwalton

pcwalton Dec 9, 2014

Contributor

I usually use mem::replace for this kind of thing, as it's shorter. You could even eliminate the temporary if you wanted.

for consumer in consumers.into_iter() {
MemoryCache::send_complete_resource(resource, consumer);
}
}

/// Handle the initial response metadata for an incomplete cached request.
@@ -334,6 +347,7 @@ impl MemoryCache {
body: body,
expires: resource.expires,
last_validated: resource.last_validated,
revalidating_consumers: vec!(),
};
self.complete_entries.insert(key.clone(), complete);
}
@@ -347,16 +361,31 @@ impl MemoryCache {
/// consumer.
pub fn process_pending_request(&mut self, load_data: &LoadData, start_chan: Sender<LoadResponse>)
-> CacheOperationResult {
fn revalidate(resource: &mut CachedResource,
key: &CacheKey,
start_chan: Sender<LoadResponse>,
method: RevalidationMethod) -> CacheOperationResult {
// Ensure that at most one revalidation is taking place at a time for a
// cached resource.
resource.revalidating_consumers.push(start_chan);
if resource.revalidating_consumers.len() > 1 {
CachedContentPending
} else {
Revalidate(key.clone(), method)
}
}

if load_data.method != Get {
return Uncacheable("Only GET requests can be cached.");
}

let key = CacheKey::new(load_data.clone());
match self.complete_entries.get(&key) {
match self.complete_entries.get_mut(&key) {
Some(resource) => {
if self.base_time + resource.expires < time::now().to_timespec() {
info!("entry for {} has expired", key.url());
return Revalidate(key, ExpiryDate(time::at(self.base_time + resource.expires)));
let expiry = time::at(self.base_time + resource.expires);
return revalidate(resource, &key, start_chan, ExpiryDate(expiry));
}

let must_revalidate = resource.metadata.headers.as_ref().and_then(|headers| {
@@ -367,34 +396,38 @@ impl MemoryCache {

if must_revalidate {
info!("entry for {} must be revalidated", key.url());
return Revalidate(key, ExpiryDate(resource.last_validated));
let last_validated = resource.last_validated;
return revalidate(resource, &key, start_chan, ExpiryDate(last_validated));
}

match resource.metadata.headers.as_ref().and_then(|headers| headers.etag.as_ref()) {
let etag = resource.metadata.headers.as_ref().and_then(|headers| headers.etag.clone());
match etag {
Some(etag) => {
info!("entry for {} has an Etag", key.url());
return Revalidate(key, Etag(etag.clone()));
return revalidate(resource, &key, start_chan, Etag(etag.clone()));
}
None => ()
}

//TODO: Revalidate once per session for response with no explicit expiry

self.send_complete_entry(key, start_chan);
return CachedContentPending;
}

None => ()
}

if self.complete_entries.contains_key(&key) {
self.send_complete_entry(key, start_chan);
return CachedContentPending;
}

let new_entry = match self.pending_entries.get(&key) {
Some(resource) if resource.doomed => return Uncacheable("Cache entry already doomed"),
Some(_) => false,
None => true,
};

if new_entry {
self.add_pending_cache_entry(key.clone(), start_chan);
self.add_pending_cache_entry(key.clone(), vec!(start_chan));
NewCacheEntry(key)
} else {
self.send_partial_entry(key, start_chan);
@@ -403,9 +436,9 @@ impl MemoryCache {
}

/// Add a new pending request to the set of incomplete cache entries.
fn add_pending_cache_entry(&mut self, key: CacheKey, start_chan: Sender<LoadResponse>) {
fn add_pending_cache_entry(&mut self, key: CacheKey, consumers: Vec<Sender<LoadResponse>>) {
let resource = PendingResource {
consumers: AwaitingHeaders(vec!(start_chan)),
consumers: AwaitingHeaders(consumers),
expires: MAX,
last_validated: time::now(),
doomed: false,
@@ -415,9 +448,7 @@ impl MemoryCache {
}

/// Synchronously send the entire cached response body to the given consumer.
fn send_complete_entry(&self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning full cache body for {}", key.url);
let resource = self.complete_entries.get(&key).unwrap();
fn send_complete_resource(resource: &CachedResource, start_chan: Sender<LoadResponse>) {
let progress_chan = start_sending_opt(start_chan, resource.metadata.clone());
match progress_chan {
Ok(chan) => {
@@ -428,6 +459,13 @@ impl MemoryCache {
}
}

/// Synchronously send the entire cached response body to the given consumer.
fn send_complete_entry(&self, key: CacheKey, start_chan: Sender<LoadResponse>) {
info!("returning full cache body for {}", key.url);
let resource = self.complete_entries.get(&key).unwrap();
MemoryCache::send_complete_resource(resource, start_chan)
}

/// Synchronously send all partial stored response data for a cached request to the
/// given consumer.
fn send_partial_entry(&mut self, key: CacheKey, start_chan: Sender<LoadResponse>) {
@@ -189,7 +189,7 @@ fn load(mut load_data: LoadData, start_chan: Sender<LoadResponse>, cache: Arc<Mu
return;
}

cache.doom_request(key, "cache entry expired".to_string());
cache.process_revalidation_failed(key);
}

if 3 == (response.status.code() / 100) {
@@ -22,7 +22,7 @@ function _pass(s, m) {

function _printer(opstr, op) {
return function (a, b, msg) {
let f = op(a,b) ? _pass : _fail;
var f = op(a,b) ? _pass : _fail;
if (!msg) msg = "";
f(a + " " + opstr + " " + b, msg);
};
@@ -0,0 +1,2 @@
<html>
</html>
@@ -0,0 +1,2 @@
200
Cache-Control: must-revalidate
@@ -0,0 +1,14 @@
<html>
<head>
<script src="harness.js"></script>
<script src="netharness.js"></script>
</head>
<body>
<script>
reset_stats();
fetch('resources/helper_must_revalidate.html');
fetch('resources/helper_must_revalidate.html');
assert_requests_made('resources/helper_must_revalidate.html', 1);
</script>
</body>
</html>
@@ -41,10 +41,34 @@ def do_GET(self):
self.wfile.write(body)
return

requests[self.path] += 1
header_list = []
status = None

path = self.translate_path(self.path)
headers = path + '^headers'

if os.path.isfile(headers):
try:
h = open(headers, 'rb')
except IOError:
self.send_error(404, "Header file not found")
return

header_lines = h.readlines()
status = int(header_lines[0])
for header in header_lines[1:]:
parts = map(lambda x: x.strip(), header.split(':'))
header_list += [parts]

if self.headers.get('If-Modified-Since'):
self.send_response(304)
self.end_headers()
return

if not status or status == 200:
requests[self.path] += 1

if status or header_list:
ctype = self.guess_type(path)
try:
# Always read in binary mode. Opening files in text mode may cause
@@ -56,23 +80,14 @@ def do_GET(self):
return

try:
try:
h = open(headers, 'rb')
except IOError:
self.send_error(404, "Header file not found")
return

header_lines = h.readlines()

self.send_response(int(header_lines[0]))
self.send_response(status or 200)
self.send_header("Content-type", ctype)
fs = os.fstat(f.fileno())
self.send_header("Content-Length", str(fs[6]))
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))

for header in header_lines[1:]:
parts = map(lambda x: x.strip(), header.split(':'))
self.send_header(parts[0], parts[1])
for header in header_list:
self.send_header(header[0], header[1])

self.end_headers()

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.