Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions webgraph/examples/bench_sort_pairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use anyhow::Result;
use clap::Parser;
use dsi_bitstream::traits::BitRead;
use dsi_bitstream::traits::BitWrite;
use dsi_bitstream::traits::Endianness;
use dsi_bitstream::traits::{Endianness, BE};
use dsi_progress_logger::prelude::{ProgressLog, ProgressLogger};
use rand::rngs::SmallRng;
use rand::RngCore;
use rand::SeedableRng;
use tempfile::Builder;
use webgraph::prelude::*;
use webgraph::utils::gaps::GapsCodec;

#[derive(Parser, Debug)]
#[command(about = "Tests the merge speed of SortPairs", long_about = None)]
Expand Down Expand Up @@ -62,11 +63,10 @@ pub fn main() -> Result<()> {
let dir = Builder::new().prefix("bench_sort_pairs").tempdir()?;

if args.labeled {
let mut sp = SortPairs::<Mock, Mock>::new_labeled(
let mut sp = SortPairs::new_labeled(
MemoryUsage::BatchSize(args.batch),
dir.path(),
Mock(),
Mock(),
GapsCodec::<BE, _, _>::new(Mock(), Mock()),
)?;

let mut r = SmallRng::seed_from_u64(0);
Expand Down
3 changes: 1 addition & 2 deletions webgraph/examples/bench_unit_transpose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ where
let mut iter = Left(transform::transpose_labeled(
&unit,
MemoryUsage::BatchSize(10_000_000),
(),
(),
DefaultBatchCodec::default(),
)?)
.iter();
while let Some((x, s)) = iter.next() {
Expand Down
6 changes: 3 additions & 3 deletions webgraph/src/transform/perm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

use crate::graphs::arc_list_graph;
use crate::prelude::sort_pairs::{BatchIterator, KMergeIters};
use crate::prelude::sort_pairs::KMergeIters;
use crate::prelude::*;
use anyhow::{ensure, Context, Result};
use dsi_progress_logger::prelude::*;
Expand All @@ -26,7 +26,7 @@ pub fn permute(
graph: &impl SequentialGraph,
perm: &impl SliceByValue<Value = usize>,
memory_usage: MemoryUsage,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>> {
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>> {
ensure!(
perm.len() == graph.num_nodes(),
"The given permutation has {} values and thus it's incompatible with a graph with {} nodes.",
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn permute_split<S, P>(
perm: &P,
memory_usage: MemoryUsage,
threads: &ThreadPool,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>>
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>>
where
S: SequentialGraph + SplitLabeling,
P: SliceByValue<Value = usize> + Send + Sync + Clone,
Expand Down
19 changes: 14 additions & 5 deletions webgraph/src/transform/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::graphs::{
};
use crate::labels::Left;
use crate::traits::{LenderIntoIter, SequentialGraph, SortedIterator, SortedLender, SplitLabeling};
use crate::utils::sort_pairs::{BatchIterator, KMergeIters, SortPairs};
use crate::utils::MemoryUsage;
use crate::utils::sort_pairs::{KMergeIters, SortPairs};
use crate::utils::{CodecIter, DefaultBatchCodec, MemoryUsage};
use anyhow::{Context, Result};
use dsi_progress_logger::prelude::*;
use itertools::Itertools;
Expand All @@ -32,7 +32,10 @@ pub fn simplify_sorted<G: SequentialGraph>(
memory_usage: MemoryUsage,
) -> Result<
NoSelfLoopsGraph<
UnionGraph<G, Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>>,
UnionGraph<
G,
Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>,
>,
>,
>
where
Expand Down Expand Up @@ -100,7 +103,13 @@ pub fn simplify_split<S>(
graph: &S,
memory_usage: MemoryUsage,
threads: &ThreadPool,
) -> Result<Left<arc_list_graph::ArcListGraph<itertools::Dedup<KMergeIters<BatchIterator<()>, ()>>>>>
) -> Result<
Left<
arc_list_graph::ArcListGraph<
itertools::Dedup<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>,
>,
>,
>
where
S: SequentialGraph + SplitLabeling,
{
Expand Down Expand Up @@ -142,7 +151,7 @@ where

// get a graph on the sorted data
log::debug!("Waiting for threads to finish");
let edges: KMergeIters<BatchIterator> = rx.iter().sum();
let edges: KMergeIters<CodecIter<DefaultBatchCodec>> = rx.iter().sum();
let edges = edges.dedup();
log::debug!("All threads finished");
let sorted = arc_list_graph::ArcListGraph::new_labeled(graph.num_nodes(), edges);
Expand Down
62 changes: 25 additions & 37 deletions webgraph/src/transform/transpose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

use crate::graphs::arc_list_graph;
use crate::prelude::proj::Left;
use crate::prelude::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters};
use crate::prelude::{
BitDeserializer, BitSerializer, LabeledSequentialGraph, SequentialGraph, SortPairs,
};
use crate::prelude::sort_pairs::KMergeIters;
use crate::prelude::{LabeledSequentialGraph, SequentialGraph, SortPairs};
use crate::traits::graph::UnitLabelGraph;
use crate::traits::{NodeLabelsLender, SplitLabeling};
use crate::utils::{MemoryUsage, ParSortIters, SplitIters};
use crate::utils::{
BatchCodec, CodecIter, DefaultBatchCodec, MemoryUsage, ParSortIters, SplitIters,
};
use anyhow::Result;
use dsi_bitstream::traits::NE;
use dsi_progress_logger::prelude::*;
use lender::prelude::*;
use tempfile::Builder;
Expand All @@ -25,21 +24,18 @@ use tempfile::Builder;
///
/// For the meaning of the additional parameters, see
/// [`SortPairs`](crate::prelude::sort_pairs::SortPairs).
pub fn transpose_labeled<
S: BitSerializer<NE, BitWriter> + Clone,
D: BitDeserializer<NE, BitReader> + Clone + 'static,
>(
graph: &impl LabeledSequentialGraph<S::SerType>,
#[allow(clippy::type_complexity)]
pub fn transpose_labeled<C: BatchCodec>(
graph: &impl LabeledSequentialGraph<C::Label>,
memory_usage: MemoryUsage,
serializer: S,
deserializer: D,
) -> Result<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<D>, D::DeserType>>>
batch_codec: C,
) -> Result<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<C>, C::Label>>>
where
S::SerType: Send + Sync + Copy,
D::DeserType: Clone + Copy,
C::Label: Clone + 'static,
CodecIter<C>: Clone + Send + Sync,
{
let dir = Builder::new().prefix("transpose_").tempdir()?;
let mut sorted = SortPairs::new_labeled(memory_usage, dir.path(), serializer, deserializer)?;
let mut sorted = SortPairs::new_labeled(memory_usage, dir.path(), batch_codec)?;

let mut pl = progress_logger![
item_name = "node",
Expand Down Expand Up @@ -69,12 +65,11 @@ where
pub fn transpose(
graph: impl SequentialGraph,
memory_usage: MemoryUsage,
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<BatchIterator<()>, ()>>>> {
) -> Result<Left<arc_list_graph::ArcListGraph<KMergeIters<CodecIter<DefaultBatchCodec>, ()>>>> {
Ok(Left(transpose_labeled(
&UnitLabelGraph(graph),
memory_usage,
(),
(),
DefaultBatchCodec::default(),
)?))
}

Expand All @@ -90,32 +85,28 @@ pub fn transpose(
pub fn transpose_labeled_split<
'graph,
G: 'graph
+ LabeledSequentialGraph<S::SerType>
+ LabeledSequentialGraph<C::Label>
+ for<'a> SplitLabeling<
SplitLender<'a>: for<'b> NodeLabelsLender<
'b,
Label: crate::traits::Pair<Left = usize, Right = S::SerType> + Copy,
Label: crate::traits::Pair<Left = usize, Right = C::Label> + Copy,
IntoIterator: IntoIterator<IntoIter: Send + Sync>,
> + Send
+ Sync,
IntoIterator<'a>: IntoIterator<IntoIter: Send + Sync>,
>,
S: BitSerializer<NE, BitWriter> + Clone + Send + Sync + 'graph,
D: BitDeserializer<NE, BitReader, DeserType: Clone + Send + Sync> + Clone + Send + Sync + 'static,
C: BatchCodec + 'graph,
>(
graph: &'graph G,
memory_usage: MemoryUsage,
serializer: S,
deserializer: D,
batch_codec: C,
) -> Result<
SplitIters<
impl IntoIterator<Item = ((usize, usize), D::DeserType), IntoIter: Send + Sync>
+ use<'graph, G, S, D>,
impl IntoIterator<Item = ((usize, usize), C::Label), IntoIter: Send + Sync> + use<'graph, G, C>,
>,
>
where
S::SerType: Send + Sync + Copy,
D::DeserType: Clone + Copy,
CodecIter<C>: Clone + Send + Sync,
{
let par_sort_iters = ParSortIters::new(graph.num_nodes())?.memory_usage(memory_usage);
let parts = num_cpus::get();
Expand All @@ -126,11 +117,7 @@ where
.map(|iter| iter.into_labeled_pairs().map(|((a, b), l)| ((b, a), l)))
.collect();

par_sort_iters.try_sort_labeled::<S, D, std::convert::Infallible>(
&serializer,
deserializer,
pairs,
)
par_sort_iters.try_sort_labeled::<C, std::convert::Infallible>(batch_codec, pairs)
}

/// Returns a [`SplitIters`] structure representing the
Expand Down Expand Up @@ -170,8 +157,9 @@ pub fn transpose_split<
.map(|iter| iter.into_pairs().map(|(src, dst)| ((dst, src), ())))
.collect();

let SplitIters { boundaries, iters } =
par_sort_iters.try_sort_labeled::<(), (), std::convert::Infallible>(&(), (), pairs)?;
let batch_codec = DefaultBatchCodec::default();
let SplitIters { boundaries, iters } = par_sort_iters
.try_sort_labeled::<DefaultBatchCodec, std::convert::Infallible>(batch_codec, pairs)?;

Ok(SplitIters {
boundaries,
Expand Down
Loading