Skip to content

Commit

Permalink
context.rs: Update index after hashing a new commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sdleffler committed Oct 17, 2017
1 parent 545d0c9 commit a7092c4
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions src/context.rs
Expand Up @@ -8,7 +8,7 @@ use chrono::prelude::*;
use futures::future::{self, Either};
use futures::prelude::*;
use futures::stream;
use futures::sync::mpsc::{self, Sender};
use futures::sync::mpsc::{self, Sender, Receiver};
use futures_cpupool::CpuPool;
use globset::GlobSet;
use memmap::{Mmap, Protection};
Expand Down Expand Up @@ -39,6 +39,9 @@ pub struct Context<'a, T: Trace, S: Store> {

marshal_tx: Sender<Hashed>,
writes: Box<Future<Item = (), Error = Error> + Send>,

index_tx: Sender<(PathBuf, ObjectHash)>,
index_rx: Receiver<(PathBuf, ObjectHash)>,
}


Expand Down Expand Up @@ -81,6 +84,7 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {
io_pool: &CpuPool,
) -> Self {
let (marshal_tx, marshal_rx) = mpsc::channel(BATCH_FUTURE_BUFFER_SIZE);
let (index_tx, index_rx) = mpsc::channel(BATCH_FUTURE_BUFFER_SIZE);

let writes = {
let trace = trace.clone();
Expand Down Expand Up @@ -112,6 +116,9 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {

marshal_tx,
writes,

index_tx,
index_rx,
}
}

Expand Down Expand Up @@ -170,7 +177,7 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {
}

pub fn hash_commit(
&mut self,
&self,
include_opt: Option<&GlobSet>,
exclude_opt: Option<&GlobSet>,
parents: Vec<ObjectHash>,
Expand All @@ -184,7 +191,7 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {
.filter(|&(path, entry)| {
let is_included = include_opt
.map(|include| include.is_match(path))
.unwrap_or(true);
.unwrap_or(false);
let is_excluded = exclude_opt
.map(|exclude| exclude.is_match(path))
.unwrap_or(false);
Expand All @@ -201,21 +208,28 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {
Some(Cached::Unhashed) | None => {
let path = path.to_owned();
let chunk_stream = self.split_file(&path);
let index_tx = self.index_tx.clone();
let hash_future = self.hash_file(chunk_stream);

Either::B(hash_future.map(|object_hash| (path, Some(object_hash))))
Either::B(hash_future.and_then(|object_hash| {
index_tx
.send((path.clone(), object_hash))
.map(move |_| (path, Some(object_hash)))
.map_err(|_| Error::from_kind(ErrorKind::Absurd))
}))
}
}
});

let marshaller = marshaller.clone();
let store = self.store.clone();
let root = self.refs.head_hash().cloned();
let root = self.refs.head().cloned();
stream::futures_unordered(entries_iter)
.collect()
.and_then(move |ops| DirTree::delta(store, root, ops))
.and_then(move |dir_tree| marshaller.process_dir_tree(dir_tree))
};

let commit_future = subtree_future.and_then(move |subtree| {
marshaller.process(CommitObject {
subtree,
Expand All @@ -225,14 +239,22 @@ impl<'a, T: Trace, S: Store> Context<'a, T, S> {
})
});

Box::new(commit_future)
Box::new(self.marshal_pool.spawn(commit_future))
}

pub fn store(&self) -> &S {
&self.store
}

pub fn close(self) -> Box<Future<Item = (), Error = Error> + Send> {
self.writes
pub fn close(self) -> Box<Future<Item = (), Error = Error> + Send + 'a> {
let repository = self.repository;
let close_future = self.writes.join(
self.index_rx.map_err(|_| Error::from_kind(ErrorKind::Absurd)).for_each(move |(path, object_hash)| {
println!("\n\nCleaning entry: ({}, {})\n", path.display(), object_hash);
repository.index.clean(path, object_hash)
}),
).map(|((), ())| ());

Box::new(close_future)
}
}

0 comments on commit a7092c4

Please sign in to comment.