Skip to content

Commit

Permalink
parallelise rebuild-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
jamespharaoh committed Jan 24, 2017
1 parent d2b389c commit f1bcc24
Showing 1 changed file with 196 additions and 102 deletions.
298 changes: 196 additions & 102 deletions src/convert/rebuildindexes.rs
@@ -1,7 +1,15 @@
use std::collections::LinkedList;
use std::path::PathBuf;

use clap;

use futures::BoxFuture;
use futures::Future;

use futures_cpupool::CpuPool;

use num_cpus;

use output::Output;

use ::Repository;
Expand All @@ -12,144 +20,225 @@ use ::read::*;
use ::zbackup::data::*;
use ::zbackup::proto;

pub fn rebuild_indexes (
output: & Output,
arguments: & RebuildIndexesArguments,
) -> Result <bool, String> {

// open repository

let repository =
string_result_with_prefix (
|| format! (
"Error opening repository {}: ",
arguments.repository_path.to_string_lossy ()),
Repository::open (
& output,
Repository::default_config (),
& arguments.repository_path,
arguments.password_file_path.clone ()),
) ?;

// begin transaction

let mut temp_files =
TempFileManager::new (
output,
& arguments.repository_path,
None,
) ?;
struct IndexRebuilder <'a> {
arguments: & 'a RebuildIndexesArguments,
repository: Repository,
num_threads: usize,
cpu_pool: CpuPool,
}

// get list of bundle files
impl <'a> IndexRebuilder <'a> {

fn new (
output: & Output,
arguments: & 'a RebuildIndexesArguments,
) -> Result <IndexRebuilder <'a>, String> {

// open repository

let repository =
string_result_with_prefix (
|| format! (
"Error opening repository {}: ",
arguments.repository_path.to_string_lossy ()),
Repository::open (
& output,
Repository::default_config (),
& arguments.repository_path,
arguments.password_file_path.clone ()),
) ?;

let bundle_ids =
scan_bundle_files (
output,
& arguments.repository_path,
) ?;
// create thread pool

output.message_format (
format_args! (
"Found {} bundle files",
bundle_ids.len ()));
let num_threads =
num_cpus::get () * 2;

// rebuild indexes
let cpu_pool =
CpuPool::new (
num_threads);

let mut entries_buffer: Vec <IndexEntry> =
Vec::new ();
// return

let mut bundle_count: u64 = 0;
Ok (IndexRebuilder {
arguments: arguments,
repository: repository,
num_threads: num_threads,
cpu_pool: cpu_pool,
})

output.status (
"Rebuilding indexes");
}

fn rebuild_indexes (
& mut self,
output: & Output,
) -> Result <bool, String> {

for & bundle_id in bundle_ids.iter () {
// begin transaction

output.status_progress (
bundle_count,
bundle_ids.len () as u64);
let mut temp_files =
TempFileManager::new (
output,
& self.arguments.repository_path,
None,
) ?;

let bundle_path =
repository.bundle_path (
bundle_id);
// get list of bundle files

let bundle_info =
read_bundle_info (
bundle_path,
repository.encryption_key (),
let bundle_ids =
scan_bundle_files (
output,
& self.arguments.repository_path,
) ?;

let mut index_bundle_header =
proto::IndexBundleHeader::new ();
output.message_format (
format_args! (
"Found {} bundle files",
bundle_ids.len ()));

index_bundle_header.set_id (
bundle_id.to_vec ());
// rebuild indexes

entries_buffer.push (
(
index_bundle_header,
bundle_info,
)
);
let mut entries_buffer: Vec <IndexEntry> =
Vec::new ();

// write out a new
let mut bundle_count: u64 = 0;
let bundle_total = bundle_ids.len () as u64;

if entries_buffer.len () as u64 == arguments.bundles_per_index {
output.status (
"Rebuilding indexes");

flush_index_entries (
& repository,
& mut temp_files,
& mut entries_buffer,
) ?;
type BundleFuture =
BoxFuture <(BundleId, proto::BundleInfo), String>;

let mut bundle_futures: LinkedList <BundleFuture> =
LinkedList::new ();

let mut bundle_ids_iter =
bundle_ids.into_iter ();

loop {

// start bundle load tasks

while bundle_futures.len () < self.num_threads {

if let Some (bundle_id) =
bundle_ids_iter.next () {

let repository =
self.repository.clone ();

bundle_futures.push_back (
self.cpu_pool.spawn_fn (move ||
read_bundle_info (
repository.bundle_path (
bundle_id),
repository.encryption_key (),
).map (|bundle_info|
(bundle_id, bundle_info)
)
).boxed ()
);

} else {
break;
}

}

// handle next bundle load

// TODO use select for better parallelism

if let Some (bundle_future) =
bundle_futures.pop_front () {

output.status_progress (
bundle_count,
bundle_total);

let (bundle_id, bundle_info) =
bundle_future.wait () ?;

let mut index_bundle_header =
proto::IndexBundleHeader::new ();

index_bundle_header.set_id (
bundle_id.to_vec ());

entries_buffer.push (
(
index_bundle_header,
bundle_info,
)
);

// write out a new index

if entries_buffer.len () as u64
== self.arguments.bundles_per_index {

flush_index_entries (
& self.repository,
& mut temp_files,
& mut entries_buffer,
) ?;

}

bundle_count += 1;

} else {
break;
}

}

bundle_count += 1;
// write out final index

}
if ! entries_buffer.is_empty () {

if ! entries_buffer.is_empty () {
flush_index_entries (
& self.repository,
& mut temp_files,
& mut entries_buffer,
) ?;

flush_index_entries (
& repository,
& mut temp_files,
& mut entries_buffer,
) ?;
}

}
output.status_done ();

output.status_done ();
// remove old indexes

// remove old indexes
let old_index_ids =
scan_index_files (
& self.arguments.repository_path,
) ?;

let old_index_ids =
scan_index_files (
& arguments.repository_path,
) ?;
output.message_format (
format_args! (
"Removing {} old index files",
old_index_ids.len ()));

output.message_format (
format_args! (
"Removing {} old index files",
old_index_ids.len ()));
for old_index_id in old_index_ids {

for old_index_id in old_index_ids {
temp_files.delete (
self.repository.index_path (
old_index_id));

temp_files.delete (
repository.index_path (
old_index_id));
}

}
// commit changes and return

// commit changes and return
output.status (
"Committing changes ...");

output.status (
"Committing changes ...");
temp_files.commit () ?;

temp_files.commit () ?;
output.status_done ();

output.status_done ();
Ok (true)

Ok (true)
}

}

Expand Down Expand Up @@ -225,7 +314,12 @@ command! (
},

action = |output, arguments| {
rebuild_indexes (output, arguments)
IndexRebuilder::new (
output,
arguments,
) ?.rebuild_indexes (
output,
)
},

);
Expand Down

0 comments on commit f1bcc24

Please sign in to comment.