Skip to content

Commit

Permalink
receiving articles update
Browse files Browse the repository at this point in the history
  • Loading branch information
miquels committed Oct 6, 2018
1 parent 328041a commit 2725df9
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 162 deletions.
20 changes: 2 additions & 18 deletions history/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,9 @@ impl<'a> HCachePartition<'a> {
}

/// add a tentative entry to the history cache.
pub fn store_tentative(&mut self) {
pub fn store_tentative(&mut self, what: HistStatus) {
let he = HistEnt{
status: HistStatus::Tentative,
time: self.when as u64,
head_only: false,
location: None,
};
let inner = &mut *self.inner;
inner.insert(HCacheEnt{
histent: he,
hash: self.hash,
when: self.when,
});
}

/// like tentative, but when writing the article.
pub fn store_begin(&mut self) {
let he = HistEnt{
status: HistStatus::Writing,
status: what,
time: self.when as u64,
head_only: false,
location: None,
Expand Down
150 changes: 91 additions & 59 deletions history/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use nntp_rs_spool as spool;
use cache::HCache;

const PRECOMMIT_MAX_AGE: u32 = 10;
const PRESTORE_MAX_AGE: u32 = 60;

type HistFuture = Future<Item=Option<HistEnt>, Error=io::Error> + Send;
type HistStatusFuture = Future<Item=Option<HistStatus>, Error=io::Error> + Send;

/// Functionality a backend history database needs to make available.
pub trait HistBackend: Send + Sync {
Expand Down Expand Up @@ -86,6 +87,13 @@ pub enum HistStatus {
Rejected,
}

/// Returned as error by some of the methods.
#[derive(Debug)]
pub enum HistError {
IoError(io::Error),
Status(HistStatus),
}

impl History {

/// Open history database.
Expand Down Expand Up @@ -115,38 +123,33 @@ impl History {
})
}

// Find an entry in the history database.
fn cache_lookup(&self, msgid: &str, check: bool, precommit: bool) -> Option<HistEnt> {
let mut partition = self.inner.cache.lock_partition(msgid);
// Do a lookup in the history cache. Just a wrapper around partition.lookup()
// that rewrites tentative cache entries that are too old to NotFound,
// and that returns a HistStatus instead of HistEnt.
fn cache_lookup(&self, partition: &mut cache::HCachePartition, msgid: &str) -> Option<HistStatus> {
if let Some((mut h, age)) = partition.lookup() {
match h.status {
HistStatus::Writing => {
if check {
// In the CHECK case, handle this as Tentative.
h.status = HistStatus::Tentative;
} else {
// Otherwise as "not found"
if age > PRESTORE_MAX_AGE {
// This should never happen, but if it does, log it,
// and invalidate the entry.
error!("cache_lookup: entry for {} in state HistStatus::Writing too old: {}s",
msgid, age);
h.status = HistStatus::NotFound;
}
Some(h)
Some(h.status)
},
HistStatus::Tentative => {
if age > PRECOMMIT_MAX_AGE {
// Not valid as tentative entry anymore, but we can
// interpret it as a negative cache entry.
h.status = HistStatus::NotFound;
if precommit {
partition.store_tentative();
}
}
Some(h)
Some(h.status)
},
_ => Some(h),
_ => Some(h.status),
}
} else {
if precommit {
partition.store_tentative();
}
None
}
}
Expand Down Expand Up @@ -176,69 +179,98 @@ impl History {
})
}

/// This is like `lookup_through_cache`, but it can return HistStatus::Tentative as well.
/// It will also put a Tentative entry in the history cache if we did not
/// have an entry for this message-id yet.
pub fn check(&self, msgid: &str) -> Box<HistFuture> {
/// The CHECK command.
///
/// Lookup a message-id through the cache and the history db. If not found
/// or invalid, mark the message-id in the cache as Tentative.
///
/// Returns:
/// - None: message-id not found
/// - HistStatus::Tentative - message-id already tentative in cache
/// - HistEnt with any other status - message-id already present
///
pub fn check(&self, msgid: &str) -> Box<HistStatusFuture> {
self.do_check(msgid, HistStatus::Tentative)
}

/// Article received. Call this before writing to history / spool.
///
/// Much like the check method, but succeeds when the cache entry
/// is Tentative, and sets the cache entry to status Writing.
///
/// Returns future::ok if we succesfully set the cache entry to state Writing,
/// future::err(HistError) otherwise.
pub fn store_begin(&self, msgid: &str) -> Box<Future<Item=(), Error=HistError> + Send> {
let fut = self.do_check(msgid, HistStatus::Writing)
.then(|res| {
match res {
Err(e) => future::err(HistError::IoError(e)),
Ok(h) => match h {
None|
Some(HistStatus::NotFound) => future::ok(()),
Some(s) => future::err(HistError::Status(s)),
}
}
});
Box::new(fut)
}

// Function that does the actual work for check / store_begin.
fn do_check(&self, msgid: &str, what: HistStatus) -> Box<HistStatusFuture> {

// First check the cache. HistStatus::NotFound means it WAS found in
// the cache as negative entry, so we do not need to go check
// the actual history db!
if let Some(he) = self.cache_lookup(msgid, true, false) {
let f = if he.status == HistStatus::NotFound {
None
} else {
Some(he)
loop {
let mut partition = self.inner.cache.lock_partition(msgid);
let f = match self.cache_lookup(&mut partition, msgid) {
Some(HistStatus::NotFound) => {
partition.store_tentative(what);
None
},
Some(HistStatus::Tentative) if what == HistStatus::Writing => {
partition.store_tentative(what);
None
},
Some(HistStatus::Writing) => Some(HistStatus::Tentative),
hs @ Some(_) => hs,
None => break,
};
return Box::new(future::ok(f));
}

// Check the actual history database.
// No cache entry. Check the actual history database.
let this = self.clone();
let msgid2 = msgid.to_string();
let f = self.lookup(msgid)
.map(move |he| {
match he {
Some(he) => Some(he),
Some(he) => Some(he.status),
None => {
// Not present. Try to put a tentative entry in the cache.
match this.cache_lookup(&msgid2, true, true) {
Some(he) => {
if he.status == HistStatus::NotFound {
None
} else {
Some(he)
}
let mut partition = this.inner.cache.lock_partition(&msgid2);
match this.cache_lookup(&mut partition, &msgid2) {
None|
Some(HistStatus::NotFound) => {
partition.store_tentative(what);
None
},
Some(HistStatus::Tentative) if what == HistStatus::Writing => {
partition.store_tentative(what);
None
},
None => None,
Some(HistStatus::Writing) => Some(HistStatus::Tentative),
hs @ Some(_) => hs,
}
}
},
}
});
Box::new(f)
}

/// We have received the article. Before we write it to the spool,
/// mark it in the cache with status "Writing".
pub fn store_begin(&self, msgid: &str) -> bool {
let mut partition = self.inner.cache.lock_partition(msgid);
if let Some((h, _age)) = partition.lookup() {
match h.status {
HistStatus::Present |
HistStatus::Writing |
HistStatus::Expired |
HistStatus::Rejected => return false,
HistStatus::Tentative |
HistStatus::NotFound => {},
}
}
partition.store_begin();
true
}

/// Done writing to the spool. Update the cache-entry and write-through
/// to the backend storage.
pub fn store_commit(&self, msgid: &str, he: HistEnt) -> impl Future<Item=bool, Error=io::Error> {
/// to the actual history database backend.
pub fn store_commit(&self, msgid: &str, he: HistEnt) -> impl Future<Item=(), Error=io::Error> {
{
let mut partition = self.inner.cache.lock_partition(msgid);
if partition.store_commit(he.clone()) == false {
Expand All @@ -249,7 +281,7 @@ impl History {
let msgid = msgid.to_string().into_bytes();
let f = self.inner.cpu_pool.spawn_fn(move || {
match inner.backend.store(&msgid, &he) {
Ok(()) => future::ok(true),
Ok(()) => future::ok(()),
Err(e) => future::err(e),
}
});
Expand Down
13 changes: 7 additions & 6 deletions server/src/arttype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//! License: https://opensource.org/licenses/BSD-2-Clause
//!

use std::cmp;
use std::fmt::{self,Display};
use std::str::FromStr;

Expand Down Expand Up @@ -95,7 +94,7 @@ impl FromStr for ArtType {
"none" => ArtType::NONE,
"default" => ArtType::DEFAULT,
"control" => ArtType::CONTROL,
"cancel" => ArtType::MIME,
"cancel" => ArtType::CANCEL,
"binary" => ArtType::BINARY,
"binaries" => ArtType::BINARY,
"uuencode" => ArtType::UUENCODE,
Expand Down Expand Up @@ -154,18 +153,20 @@ impl ArtType {
}

fn tolower(b: u8) -> u8 {
if b >= 65 && b <= 97 {
if b >= b'A' && b <= b'Z' {
b + 32
} else {
b
}
}

fn lcmatch(b: &[u8], s: &str) -> bool {
if s.len() > b.len() {
return false;
}
let mut tmpbuf = [0u8; 64];
let lc = super::article::lowercase(b, &mut tmpbuf[..]);
let len = cmp::min(lc.len(), s.len());
lc == &s.as_bytes()[..len]
&lc[..s.len()] == s.as_bytes()
}

/// Arttype scanner.
Expand Down Expand Up @@ -248,7 +249,7 @@ impl ArtTypeScanner {
* Do some checks that could apply to headers and or body
*/
let first = tolower(line[0]);
if first == b'c' && tolower(line[0]) == b'o' {
if first == b'c' && tolower(line[1]) == b'o' {
if lcmatch(line, "content-type: text/html") {
self.arttype.arttype |= ArtType::HTML;
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub enum ArtError {
/// after receiving article we already had it anyway.
PostDuplicate,
/// racing another article.
Deferred,
/// article is incomplete
ArtIncomplete,
/// if headers + body < 80 chars
Expand Down
6 changes: 5 additions & 1 deletion server/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ pub fn incoming_reject(label: &str, art: &Article, error: ArtError) {
info!("{} - {} {} {} {:?}", label, art.msgid, art.len, art.arttype, error);
}

pub fn incoming_defer(label: &str, art: &Article, error: ArtError) {
info!("{} d {} {} {} {:?}", label, art.msgid, art.len, art.arttype, error);
}

pub fn incoming_accept(label: &str, art: &Article, peers: &[NewsPeer], wantpeers: &[u32]) {
// allocate string with peers in one go.
let len = wantpeers.iter().fold(0, |t, i| t + peers[*i as usize].label.len() + 1);
let mut s = String::with_capacity(len);
// push peers onto string, separated by space.
for idx in 0..wantpeers.len() {
s.push_str(&peers[idx as usize].label);
s.push_str(&peers[wantpeers[idx as usize] as usize].label);
if idx as usize + 1 < wantpeers.len() {
s.push(' ');
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn main() -> io::Result<()> {
println!("Listening on port {}", addr.port());

// install custom panic logger.
handle_panic();
//handle_panic();

// and start server.
let server = server::Server::new(hist, spool);
Expand Down
7 changes: 7 additions & 0 deletions server/src/newsfeeds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ impl NewsPeer {
/// Check if this peer wants to have this article.
pub fn wants(&self, art: &Article, path: &[&str], newsgroups: &mut MatchList) -> bool {

// must be an actual outgoing feed.
if self.outhost.is_empty() && &self.label != "IFILTER" {
debug!("XXX {}.wants: outhost is empty", self.label);
return false;
}
debug!("XXX {}.wants: outhost is [{}]", self.label, self.outhost);

// check article type.
if !art.arttype.matches(&self.arttypes) {
return false;
Expand Down
Loading

0 comments on commit 2725df9

Please sign in to comment.