Skip to content

Commit

Permalink
refactor(index): idiomatise various iterators (#1)
Browse files Browse the repository at this point in the history
* chore(index): idiomatise variously

* chore(index): exploit hashset to dedupe entries

* chore(index): format
  • Loading branch information
passcod authored and zkat committed Jul 1, 2019
1 parent 19929c5 commit ad74518
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 67 deletions.
137 changes: 71 additions & 66 deletions src/index.rs
@@ -1,5 +1,6 @@
use std::collections::hash_map::HashMap;
use std::collections::HashSet;
use std::fs::{self, OpenOptions};
use std::hash::{Hash, Hasher};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -36,7 +37,7 @@ pub struct Entry {
pub metadata: Value,
}

#[derive(Deserialize, Serialize, PartialEq, Debug)]
#[derive(Deserialize, Serialize, Debug)]
struct SerializableEntry {
key: String,
integrity: Option<String>,
Expand All @@ -45,6 +46,20 @@ struct SerializableEntry {
metadata: Value,
}

impl PartialEq for SerializableEntry {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}

impl Eq for SerializableEntry {}

impl Hash for SerializableEntry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.key.hash(state);
}
}

pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error> {
let bucket = bucket_path(&cache, &key);
if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? {
Expand All @@ -57,16 +72,15 @@ pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error
size: opts.size.unwrap_or(0),
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
})?;
let str = format!("\n{}\t{}", hash_entry(&stringified), stringified);
OpenOptions::new()
.create(true)
.append(true)
.open(&bucket)?
.write_all(&str.into_bytes())?;

let mut buck = OpenOptions::new().create(true).append(true).open(&bucket)?;

write!(buck, "\n{}\t{}", hash_entry(&stringified), stringified)?;
chownr::chownr(&bucket, opts.uid, opts.gid)?;
Ok(opts
.sri
.unwrap_or_else(|| "sha1-deadbeef".parse::<Integrity>().unwrap()))
.or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
.unwrap())
}

pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>, Error> {
Expand Down Expand Up @@ -109,40 +123,37 @@ pub fn delete(cache: &Path, key: &str) -> Result<(), Error> {
uid: None,
gid: None,
},
)?;
Ok(())
)
.map(|_| ())
}

pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {
let mut path = PathBuf::new();
path.push(cache);
path.push(format!("index-v{}", INDEX_VERSION));
WalkDir::new(path)
WalkDir::new(cache.join(format!("index-v{}", INDEX_VERSION)))
.into_iter()
.map(|bucket| {
let bucket = bucket?;
if bucket.file_type().is_dir() {
return Ok(core::iter::empty().collect::<Vec<Entry>>());
return Ok(Vec::new());
}
let entries = bucket_entries(bucket.path())?;
let mut dedupe: HashMap<String, SerializableEntry> = HashMap::new();
for entry in entries {
dedupe.insert(entry.key.clone(), entry);
}
let iter = dedupe

Ok(bucket_entries(bucket.path())?
.into_iter()
.collect::<HashSet<SerializableEntry>>()
.into_iter()
.filter(|se| se.1.integrity.is_some())
.map(|se| {
let se = se.1;
Entry {
key: se.key,
integrity: se.integrity.unwrap().parse().unwrap(),
time: se.time,
size: se.size,
metadata: se.metadata,
.filter_map(|se| {
if let Some(i) = se.integrity {
Some(Entry {
key: se.key,
integrity: i.parse().unwrap(),
time: se.time,
size: se.size,
metadata: se.metadata,
})
} else {
None
}
});
Ok(iter.collect::<Vec<Entry>>())
})
.collect())
})
.flat_map(|res| match res {
Ok(it) => Left(it.into_iter().map(Ok)),
Expand All @@ -152,13 +163,11 @@ pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {

fn bucket_path(cache: &Path, key: &str) -> PathBuf {
let hashed = hash_key(&key);
let mut path = PathBuf::new();
path.push(cache);
path.push(format!("index-v{}", INDEX_VERSION));
path.push(&hashed[0..2]);
path.push(&hashed[2..4]);
path.push(&hashed[4..]);
path
cache
.join(format!("index-v{}", INDEX_VERSION))
.join(&hashed[0..2])
.join(&hashed[2..4])
.join(&hashed[4..])
}

fn hash_key(key: &str) -> String {
Expand All @@ -181,33 +190,29 @@ fn now() -> u128 {
}

fn bucket_entries(bucket: &Path) -> Result<Vec<SerializableEntry>, Error> {
let lines = match fs::read_to_string(bucket) {
Ok(data) => Ok(data),
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(String::from("")),
err => err,
}?;
Ok(lines.split('\n').fold(vec![], |mut acc, entry: &str| {
if entry.is_empty() {
return acc;
}
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
[hash, entry_str] => {
if hash_entry(entry_str) != hash {
// Hash is no good! Corruption or malice? Doesn't matter!
// EJECT EJECT
return acc;
} else {
entry_str
}
use std::io::{BufRead, BufReader};
fs::File::open(bucket)
.map(|file| {
BufReader::new(file)
.lines()
.filter_map(Result::ok)
.filter_map(|entry| {
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
[hash, entry_str] if hash_entry(entry_str) == hash => entry_str,
// Something's wrong with the entry. Abort.
_ => return None,
};
serde_json::from_str::<SerializableEntry>(entry_str).ok()
})
.collect()
})
.or_else(|err| {
if err.kind() == ErrorKind::NotFound {
Ok(Vec::new())
} else {
Err(err.into())
}
// Something's wrong with the entry. Abort.
_ => return acc,
};
if let Ok(entry) = serde_json::from_str::<SerializableEntry>(entry_str) {
acc.push(entry)
}
acc
}))
})
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -4,8 +4,8 @@

#![warn(missing_docs, missing_doc_code_examples)]

pub use ssri::Algorithm;
pub use serde_json::Value;
pub use ssri::Algorithm;

mod content;
mod errors;
Expand Down

0 comments on commit ad74518

Please sign in to comment.