From 74bc9cbdd2d218cfea01267178df50b330891734 Mon Sep 17 00:00:00 2001 From: Tommaso Fontana Date: Sat, 11 Oct 2025 14:54:41 +0200 Subject: [PATCH 1/4] Introduce BatchCodec to substitute BatchIterator --- webgraph/examples/bench_sort_pairs.rs | 9 +- webgraph/examples/bench_unit_transpose.rs | 3 +- webgraph/src/transform/perm.rs | 6 +- webgraph/src/transform/simplify.rs | 19 +- webgraph/src/transform/transpose.rs | 62 ++-- webgraph/src/utils/batch_codec/gaps.rs | 254 +++++++++++++ .../src/utils/batch_codec/grouped_gaps.rs | 300 +++++++++++++++ webgraph/src/utils/batch_codec/mod.rs | 126 +++++++ webgraph/src/utils/mod.rs | 3 + webgraph/src/utils/par_sort_iters.rs | 105 ++---- webgraph/src/utils/par_sort_pairs.rs | 122 +++--- webgraph/src/utils/sort_pairs.rs | 348 +++--------------- webgraph/tests/test_transpose.rs | 19 +- 13 files changed, 880 insertions(+), 496 deletions(-) create mode 100644 webgraph/src/utils/batch_codec/gaps.rs create mode 100644 webgraph/src/utils/batch_codec/grouped_gaps.rs create mode 100644 webgraph/src/utils/batch_codec/mod.rs diff --git a/webgraph/examples/bench_sort_pairs.rs b/webgraph/examples/bench_sort_pairs.rs index 4974afdc..2ad3236e 100644 --- a/webgraph/examples/bench_sort_pairs.rs +++ b/webgraph/examples/bench_sort_pairs.rs @@ -18,6 +18,7 @@ use rand::RngCore; use rand::SeedableRng; use tempfile::Builder; use webgraph::prelude::*; +use webgraph::utils::gaps::GapsCodec; #[derive(Parser, Debug)] #[command(about = "Tests the merge speed of SortPairs", long_about = None)] @@ -62,11 +63,13 @@ pub fn main() -> Result<()> { let dir = Builder::new().prefix("bench_sort_pairs").tempdir()?; if args.labeled { - let mut sp = SortPairs::::new_labeled( + let mut sp = SortPairs::new_labeled( MemoryUsage::BatchSize(args.batch), dir.path(), - Mock(), - Mock(), + GapsCodec::<_, _> { + serializer: Mock(), + deserializer: Mock(), + }, )?; let mut r = SmallRng::seed_from_u64(0); diff --git a/webgraph/examples/bench_unit_transpose.rs b/webgraph/examples/bench_unit_transpose.rs index 7b4ea9ad..3a7f332e 100644 --- a/webgraph/examples/bench_unit_transpose.rs +++ b/webgraph/examples/bench_unit_transpose.rs @@ -78,8 +78,7 @@ where let mut iter = Left(transform::transpose_labeled( &unit, MemoryUsage::BatchSize(10_000_000), - (), - (), + DefaultBatchCodec::default(), )?) .iter(); while let Some((x, s)) = iter.next() { diff --git a/webgraph/src/transform/perm.rs b/webgraph/src/transform/perm.rs index 1ab8c262..49a14572 100644 --- a/webgraph/src/transform/perm.rs +++ b/webgraph/src/transform/perm.rs @@ -5,7 +5,7 @@ */ use crate::graphs::arc_list_graph; -use crate::prelude::sort_pairs::{BatchIterator, KMergeIters}; +use crate::prelude::sort_pairs::KMergeIters; use crate::prelude::*; use anyhow::{ensure, Context, Result}; use dsi_progress_logger::prelude::*; @@ -26,7 +26,7 @@ pub fn permute( graph: &impl SequentialGraph, perm: &impl SliceByValue, memory_usage: MemoryUsage, -) -> Result, ()>>>> { +) -> Result, ()>>>> { ensure!( perm.len() == graph.num_nodes(), "The given permutation has {} values and thus it's incompatible with a graph with {} nodes.", @@ -79,7 +79,7 @@ pub fn permute_split( perm: &P, memory_usage: MemoryUsage, threads: &ThreadPool, -) -> Result, ()>>>> +) -> Result, ()>>>> where S: SequentialGraph + SplitLabeling, P: SliceByValue + Send + Sync + Clone, diff --git a/webgraph/src/transform/simplify.rs b/webgraph/src/transform/simplify.rs index c46989ec..89f018dd 100644 --- a/webgraph/src/transform/simplify.rs +++ b/webgraph/src/transform/simplify.rs @@ -9,8 +9,8 @@ use crate::graphs::{ }; use crate::labels::Left; use crate::traits::{LenderIntoIter, SequentialGraph, SortedIterator, SortedLender, SplitLabeling}; -use crate::utils::sort_pairs::{BatchIterator, KMergeIters, SortPairs}; -use crate::utils::MemoryUsage; +use crate::utils::sort_pairs::{KMergeIters, SortPairs}; +use crate::utils::{CodecIter, DefaultBatchCodec, MemoryUsage}; use anyhow::{Context, Result}; use dsi_progress_logger::prelude::*; use itertools::Itertools; @@ -32,7 +32,10 @@ pub fn simplify_sorted( memory_usage: MemoryUsage, ) -> Result< NoSelfLoopsGraph< - UnionGraph, ()>>>>, + UnionGraph< + G, + Left, ()>>>, + >, >, > where @@ -100,7 +103,13 @@ pub fn simplify_split( graph: &S, memory_usage: MemoryUsage, threads: &ThreadPool, -) -> Result, ()>>>>> +) -> Result< + Left< + arc_list_graph::ArcListGraph< + itertools::Dedup, ()>>, + >, + >, +> where S: SequentialGraph + SplitLabeling, { @@ -142,7 +151,7 @@ where // get a graph on the sorted data log::debug!("Waiting for threads to finish"); - let edges: KMergeIters = rx.iter().sum(); + let edges: KMergeIters> = rx.iter().sum(); let edges = edges.dedup(); log::debug!("All threads finished"); let sorted = arc_list_graph::ArcListGraph::new_labeled(graph.num_nodes(), edges); diff --git a/webgraph/src/transform/transpose.rs b/webgraph/src/transform/transpose.rs index 106a0d02..edeb9ada 100644 --- a/webgraph/src/transform/transpose.rs +++ b/webgraph/src/transform/transpose.rs @@ -7,15 +7,14 @@ use crate::graphs::arc_list_graph; use crate::prelude::proj::Left; -use crate::prelude::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters}; -use crate::prelude::{ - BitDeserializer, BitSerializer, LabeledSequentialGraph, SequentialGraph, SortPairs, -}; +use crate::prelude::sort_pairs::KMergeIters; +use crate::prelude::{LabeledSequentialGraph, SequentialGraph, SortPairs}; use crate::traits::graph::UnitLabelGraph; use crate::traits::{NodeLabelsLender, SplitLabeling}; -use crate::utils::{MemoryUsage, ParSortIters, SplitIters}; +use crate::utils::{ + BatchCodec, CodecIter, DefaultBatchCodec, MemoryUsage, ParSortIters, SplitIters, +}; use anyhow::Result; -use dsi_bitstream::traits::NE; use dsi_progress_logger::prelude::*; use lender::prelude::*; use tempfile::Builder; @@ -25,21 +24,18 @@ use tempfile::Builder; /// /// For the meaning of the additional parameters, see /// [`SortPairs`](crate::prelude::sort_pairs::SortPairs). -pub fn transpose_labeled< - S: BitSerializer + Clone, - D: BitDeserializer + Clone + 'static, ->( - graph: &impl LabeledSequentialGraph, +#[allow(clippy::type_complexity)] +pub fn transpose_labeled( + graph: &impl LabeledSequentialGraph, memory_usage: MemoryUsage, - serializer: S, - deserializer: D, -) -> Result, D::DeserType>>> + batch_codec: C, +) -> Result, C::Label>>> where - S::SerType: Send + Sync + Copy, - D::DeserType: Clone + Copy, + C::Label: Clone + 'static, + CodecIter: Clone + Send + Sync, { let dir = Builder::new().prefix("transpose_").tempdir()?; - let mut sorted = SortPairs::new_labeled(memory_usage, dir.path(), serializer, deserializer)?; + let mut sorted = SortPairs::new_labeled(memory_usage, dir.path(), batch_codec)?; let mut pl = progress_logger![ item_name = "node", @@ -69,12 +65,11 @@ where pub fn transpose( graph: impl SequentialGraph, memory_usage: MemoryUsage, -) -> Result, ()>>>> { +) -> Result, ()>>>> { Ok(Left(transpose_labeled( &UnitLabelGraph(graph), memory_usage, - (), - (), + DefaultBatchCodec::default(), )?)) } @@ -90,32 +85,28 @@ pub fn transpose( pub fn transpose_labeled_split< 'graph, G: 'graph - + LabeledSequentialGraph + + LabeledSequentialGraph + for<'a> SplitLabeling< SplitLender<'a>: for<'b> NodeLabelsLender< 'b, - Label: crate::traits::Pair + Copy, + Label: crate::traits::Pair + Copy, IntoIterator: IntoIterator, > + Send + Sync, IntoIterator<'a>: IntoIterator, >, - S: BitSerializer + Clone + Send + Sync + 'graph, - D: BitDeserializer + Clone + Send + Sync + 'static, + C: BatchCodec + 'graph, >( graph: &'graph G, memory_usage: MemoryUsage, - serializer: S, - deserializer: D, + batch_codec: C, ) -> Result< SplitIters< - impl IntoIterator - + use<'graph, G, S, D>, + impl IntoIterator + use<'graph, G, C>, >, > where - S::SerType: Send + Sync + Copy, - D::DeserType: Clone + Copy, + CodecIter: Clone + Send + Sync, { let par_sort_iters = ParSortIters::new(graph.num_nodes())?.memory_usage(memory_usage); let parts = num_cpus::get(); @@ -126,11 +117,7 @@ where .map(|iter| iter.into_labeled_pairs().map(|((a, b), l)| ((b, a), l))) .collect(); - par_sort_iters.try_sort_labeled::( - &serializer, - deserializer, - pairs, - ) + par_sort_iters.try_sort_labeled::(batch_codec, pairs) } /// Returns a [`SplitIters`] structure representing the @@ -170,8 +157,9 @@ pub fn transpose_split< .map(|iter| iter.into_pairs().map(|(src, dst)| ((dst, src), ()))) .collect(); - let SplitIters { boundaries, iters } = - par_sort_iters.try_sort_labeled::<(), (), std::convert::Infallible>(&(), (), pairs)?; + let batch_codec = DefaultBatchCodec::default(); + let SplitIters { boundaries, iters } = par_sort_iters + .try_sort_labeled::(batch_codec, pairs)?; Ok(SplitIters { boundaries, diff --git a/webgraph/src/utils/batch_codec/gaps.rs b/webgraph/src/utils/batch_codec/gaps.rs new file mode 100644 index 00000000..0a06a57d --- /dev/null +++ b/webgraph/src/utils/batch_codec/gaps.rs @@ -0,0 +1,254 @@ +/* + * SPDX-FileCopyrightText: 2023 Inria + * SPDX-FileCopyrightText: 2023 Sebastiano Vigna + * SPDX-FileCopyrightText: 2025 Tommaso Fontana + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +use super::{BitReader, BitWriter}; +use crate::traits::SortedIterator; +use crate::utils::{ArcMmapHelper, MmapHelper, Triple}; +use crate::{ + traits::{BitDeserializer, BitSerializer}, + utils::BatchCodec, +}; + +use std::sync::Arc; + +use anyhow::{Context, Result}; +use dsi_bitstream::prelude::*; +use mmap_rs::MmapFlags; +use rdst::*; + +#[derive(Clone, Debug, Default)] +/// A codec for encoding and decoding batches of triples using gap compression. +/// +/// This codec encodes triples of the form `(src, dst, label)` by encoding the +/// gaps between consecutive sources and destinations using a specified code. +/// +/// ## Type Parameters +/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type. +/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type. +/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma). +/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma). +/// +/// ## Fields +/// - `serializer`: The label serializer. +/// - `deserializer`: The label deserializer. +/// +/// ## Encoding Format +/// 1. The batch length is written using delta coding. +/// 2. For each group of triples with the same source: +/// - The gap from the previous source is encoded. +/// - The gap from the previous destination is encoded. +/// - The label is serialized. +/// +/// The bit deserializer must be [`Clone`] because we need one for each +/// [`GapsIterator`], and there are possible scenarios in which the +/// deserializer might be stateful. +/// +/// ## Choosing the codes +/// +/// These are the top 10 codes for src and dst gaps when transposing `enwiki-2024`. +/// ```ignore +/// Src codes: +/// Code: Unary Size: 179553432 +/// Code: Golomb(1) Size: 179553432 +/// Code: Rice(0) Size: 179553432 +/// Code: Gamma Size: 185374984 +/// Code: Zeta(1) Size: 185374984 +/// Code: ExpGolomb(0) Size: 185374984 +/// Code: Omega Size: 185439656 +/// Code: Delta Size: 191544794 +/// Code: Golomb(2) Size: 345986198 +/// Code: Rice(1) Size: 345986198 +/// Dst codes: +/// Code: Pi(2) Size: 2063880685 +/// Code: Pi(3) Size: 2074138948 +/// Code: Zeta(3) Size: 2122730298 +/// Code: Zeta(4) Size: 2123948774 +/// Code: Zeta(5) Size: 2169131998 +/// Code: Pi(4) Size: 2176097847 +/// Code: Zeta(2) Size: 2226573622 +/// Code: Zeta(6) Size: 2237680403 +/// Code: Delta Size: 2272691460 +/// Code: Zeta(7) Size: 2305354857 +/// ``` +/// +/// So the best combination is `Unary` for src gaps and `Pi(2)` for dst gaps. +/// But, `Unary` can behave poorly if the distribution of your data changes, +/// therefore the recommended default is `Gamma` for src gaps and `Delta` for +/// dst gaps as they are universal codes. +pub struct GapsCodec< + S: BitSerializer = (), + D: BitDeserializer + Clone = (), + const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, + const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA }, +> { + /// Serializer for the labels + pub serializer: S, + /// Deserializer for the labels + pub deserializer: D, +} + +impl BatchCodec + for GapsCodec +where + S: BitSerializer + Send + Sync, + D: BitDeserializer + Send + Sync + Clone, + S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort +{ + type Label = S::SerType; + type DecodedBatch = GapsIterator; + + fn encode_batch( + &self, + path: impl AsRef, + batch: &mut [((usize, usize), Self::Label)], + ) -> Result { + let start = std::time::Instant::now(); + Triple::cast_batch_mut(batch).radix_sort_unstable(); + log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed()); + self.encode_sorted_batch(path, batch) + } + + fn encode_sorted_batch( + &self, + path: impl AsRef, + batch: &[((usize, usize), Self::Label)], + ) -> Result { + debug_assert!(Triple::cast_batch(batch).is_sorted()); + // create a batch file where to dump + let file_path = path.as_ref(); + let file = std::io::BufWriter::with_capacity( + 1 << 16, + std::fs::File::create(file_path).with_context(|| { + format!( + "Could not create BatchIterator temporary file {}", + file_path.display() + ) + })?, + ); + // create a bitstream to write to the file + let mut stream = >::new(>::new(file)); + + // prefix the stream with the length of the batch + // we use a delta code since it'll be a big number most of the time + stream + .write_delta(batch.len() as u64) + .context("Could not write length")?; + + // dump the triples to the bitstream + let (mut prev_src, mut prev_dst) = (0, 0); + let mut written_bits = 0; + for ((src, dst), label) in batch.iter() { + // write the source gap as gamma + written_bits += ConstCode:: + .write(&mut stream, (src - prev_src) as u64) + .with_context(|| format!("Could not write {src} after {prev_src}"))?; + if *src != prev_src { + // Reset prev_y + prev_dst = 0; + } + // write the destination gap as gamma + written_bits += ConstCode:: + .write(&mut stream, (dst - prev_dst) as u64) + .with_context(|| format!("Could not write {dst} after {prev_dst}"))?; + // write the label + written_bits += self + .serializer + .serialize(label, &mut stream) + .context("Could not serialize label")?; + (prev_src, prev_dst) = (*src, *dst); + } + // flush the stream and reset the buffer + written_bits += stream.flush().context("Could not flush stream")?; + + Ok(written_bits) + } + + fn decode_batch(&self, path: impl AsRef) -> Result { + // open the file + let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( + MmapHelper::mmap( + path.as_ref(), + MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL, + ) + .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?, + )))); + + // read the length of the batch (first value in the stream) + let len = stream.read_delta().context("Could not read length")? as usize; + + // create the iterator + Ok(GapsIterator { + deserializer: self.deserializer.clone(), + stream, + len, + current: 0, + prev_src: 0, + prev_dst: 0, + }) + } +} + +#[derive(Clone, Debug)] +/// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`]. +pub struct GapsIterator< + D: BitDeserializer = (), + const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, + const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, +> { + /// Deserializer for the labels + deserializer: D, + /// Bitstream to read from + stream: BitReader, + /// Length of the iterator (number of triples) + len: usize, + /// Current position in the iterator + current: usize, + /// Previous source node + prev_src: usize, + /// Previous destination node + prev_dst: usize, +} + +unsafe impl, const SRC_CODE: usize, const DST_CODE: usize> + SortedIterator for GapsIterator +{ +} + +impl, const SRC_CODE: usize, const DST_CODE: usize> Iterator + for GapsIterator +{ + type Item = ((usize, usize), D::DeserType); + + fn next(&mut self) -> Option { + if self.current >= self.len { + return None; + } + let src_gap = ConstCode::.read(&mut self.stream).ok()?; + let dst_gap = ConstCode::.read(&mut self.stream).ok()?; + let label = self.deserializer.deserialize(&mut self.stream).ok()?; + self.prev_src += src_gap as usize; + if src_gap != 0 { + self.prev_dst = 0; + } + self.prev_dst += dst_gap as usize; + self.current += 1; + Some(((self.prev_src, self.prev_dst), label)) + } + + fn size_hint(&self) -> (usize, Option) { + (self.len(), Some(self.len())) + } +} + +impl, const SRC_CODE: usize, const DST_CODE: usize> + ExactSizeIterator for GapsIterator +{ + fn len(&self) -> usize { + self.len - self.current + } +} diff --git a/webgraph/src/utils/batch_codec/grouped_gaps.rs b/webgraph/src/utils/batch_codec/grouped_gaps.rs new file mode 100644 index 00000000..8c70d0d4 --- /dev/null +++ b/webgraph/src/utils/batch_codec/grouped_gaps.rs @@ -0,0 +1,300 @@ +/* + * SPDX-FileCopyrightText: 2025 Tommaso Fontana + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +use super::{BitReader, BitWriter}; +use crate::traits::SortedIterator; +use crate::utils::{ArcMmapHelper, MmapHelper, Triple}; +use crate::{ + traits::{BitDeserializer, BitSerializer}, + utils::BatchCodec, +}; + +use std::sync::Arc; + +use anyhow::{Context, Result}; +use dsi_bitstream::prelude::*; +use mmap_rs::MmapFlags; +use rdst::*; + +#[derive(Clone, Debug, Default)] +/// A codec for encoding and decoding batches of triples using grouped gap compression. +/// +/// This codec encodes triples of the form `(src, dst, label)` by grouping edges with the same source node, +/// and encoding the gaps between consecutive sources and destinations using a specified code (default: gamma). +/// The outdegree (number of edges for each source) is also encoded using the specified code. +/// +/// ## Type Parameters +/// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type. +/// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type. +/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: gamma). +/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma). +/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma). +/// +/// ## Fields +/// - `serializer`: The label serializer. +/// - `deserializer`: The label deserializer. +/// +/// ## Encoding Format +/// 1. The batch length is written using delta coding. +/// 2. For each group of triples with the same source: +/// - The gap from the previous source is encoded. +/// - The outdegree (number of edges for this source) is encoded. +/// - For each destination: +/// - The gap from the previous destination is encoded. +/// - The label is serialized. +/// +/// The bit deserializer must be [`Clone`] because we need one for each +/// [`GroupedGapsIterator`], and there are possible scenarios in which the +/// deserializer might be stateful. +/// +/// ## Choosing the codes +/// +/// When transposing `enwiki-2024`, these are the top 10 codes for src gaps, outdegree, and dst gaps: +/// ```ignore +/// Outdegree stats +/// Code: ExpGolomb(3) Size: 34004796 +/// Code: ExpGolomb(2) Size: 34101784 +/// Code: ExpGolomb(4) Size: 36036394 +/// Code: Zeta(2) Size: 36231582 +/// Code: ExpGolomb(1) Size: 36369750 +/// Code: Zeta(3) Size: 36893285 +/// Code: Pi(2) Size: 37415701 +/// Code: Zeta(4) Size: 38905267 +/// Code: Golomb(20) Size: 38963840 +/// Code: Golomb(19) Size: 39118201 +/// Src stats +/// Code: Golomb(2) Size: 12929998 +/// Code: Rice(1) Size: 12929998 +/// Code: Unary Size: 13025332 +/// Code: Golomb(1) Size: 13025332 +/// Code: Rice(0) Size: 13025332 +/// Code: ExpGolomb(1) Size: 13319930 +/// Code: Golomb(4) Size: 18732384 +/// Code: Rice(2) Size: 18732384 +/// Code: Golomb(3) Size: 18736573 +/// Code: ExpGolomb(2) Size: 18746122 +/// Dst stats +/// Code: Pi(2) Size: 2063880685 +/// Code: Pi(3) Size: 2074138948 +/// Code: Zeta(3) Size: 2122730298 +/// Code: Zeta(4) Size: 2123948774 +/// Code: Zeta(5) Size: 2169131998 +/// Code: Pi(4) Size: 2176097847 +/// Code: Zeta(2) Size: 2226573622 +/// Code: Zeta(6) Size: 2237680403 +/// Code: Delta Size: 2272691460 +/// Code: Zeta(7) Size: 2305354857 +/// ``` +/// +/// The best codes are `Golomb(2)` for src gaps, `ExpGolomb(3)` for outdegree, and `Pi(2)` for dst gaps. +/// However, `Golomb` can perform poorly if the data don't follow the expected distribution, +/// so the recommended defaults are `Gamma` for src gaps, `ExpGolomb3` for outdegree, and `Delta` for dst gaps, +/// as they are universal codes. +pub struct GroupedGapsCodec< + S: BitSerializer = (), + D: BitDeserializer + Clone = (), + const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 }, + const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, + const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA }, +> { + /// Serializer for the labels + pub serializer: S, + /// Deserializer for the labels + pub deserializer: D, +} + +impl BatchCodec + for GroupedGapsCodec +where + S: BitSerializer + Send + Sync, + D: BitDeserializer + Send + Sync + Clone, + S::SerType: Send + Sync + Copy + 'static, // needed by radix sort +{ + type Label = S::SerType; + type DecodedBatch = GroupedGapsIterator; + + fn encode_batch( + &self, + path: impl AsRef, + batch: &mut [((usize, usize), Self::Label)], + ) -> Result { + let start = std::time::Instant::now(); + Triple::cast_batch_mut(batch).radix_sort_unstable(); + log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed()); + self.encode_sorted_batch(path, batch) + } + + fn encode_sorted_batch( + &self, + path: impl AsRef, + batch: &[((usize, usize), Self::Label)], + ) -> Result { + debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted"); + // create a batch file where to dump + let file_path = path.as_ref(); + let file = std::io::BufWriter::with_capacity( + 1 << 16, + std::fs::File::create(file_path).with_context(|| { + format!( + "Could not create BatchIterator temporary file {}", + file_path.display() + ) + })?, + ); + // create a bitstream to write to the file + let mut stream = >::new(>::new(file)); + + // prefix the stream with the length of the batch + // we use a delta code since it'll be a big number most of the time + stream + .write_delta(batch.len() as u64) + .context("Could not write length")?; + + // dump the triples to the bitstream + let mut prev_src = 0; + let mut written_bits = 0; + let mut i = 0; + while i < batch.len() { + let ((src, _), _) = batch[i]; + // write the source gap as gamma + written_bits += ConstCode:: + .write(&mut stream, (src - prev_src) as _) + .with_context(|| format!("Could not write {src} after {prev_src}"))?; + // figure out how many edges have this source + let outdegree = batch[i..].iter().take_while(|t| t.0 .0 == src).count(); + // write the outdegree + written_bits += ConstCode:: + .write(&mut stream, outdegree as _) + .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?; + + // encode the destinations + let mut prev_dst = 0; + for _ in 0..outdegree { + let ((_, dst), label) = &batch[i]; + // write the destination gap as gamma + written_bits += ConstCode:: + .write(&mut stream, (dst - prev_dst) as _) + .with_context(|| format!("Could not write {dst} after {prev_dst}"))?; + // write the label + written_bits += self + .serializer + .serialize(label, &mut stream) + .context("Could not serialize label")?; + prev_dst = *dst; + i += 1; + } + prev_src = src; + } + // flush the stream and reset the buffer + written_bits += stream.flush().context("Could not flush stream")?; + + Ok(written_bits) + } + + fn decode_batch(&self, path: impl AsRef) -> Result { + // open the file + let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( + MmapHelper::mmap( + path.as_ref(), + MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL, + ) + .with_context(|| format!("Could not mmap {}", path.as_ref().display()))?, + )))); + + // read the length of the batch (first value in the stream) + let len = stream.read_delta().context("Could not read length")? as usize; + + // create the iterator + Ok(GroupedGapsIterator { + deserializer: self.deserializer.clone(), + stream, + len, + current: 0, + src: 0, + dst_left: 0, + prev_dst: 0, + }) + } +} + +#[derive(Clone, Debug)] +/// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`]. +pub struct GroupedGapsIterator< + D: BitDeserializer = (), + const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, + const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, + const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, +> { + /// Deserializer for the labels + deserializer: D, + /// Bitstream to read from + stream: BitReader, + /// Length of the iterator (number of triples) + len: usize, + /// Current position in the iterator + current: usize, + /// Current source node + src: usize, + /// Number of destinations left for the current source + dst_left: usize, + /// Previous destination node + prev_dst: usize, +} + +unsafe impl< + D: BitDeserializer, + const OUTDEGREE_CODE: usize, + const SRC_CODE: usize, + const DST_CODE: usize, + > SortedIterator for GroupedGapsIterator +{ +} + +impl< + D: BitDeserializer, + const OUTDEGREE_CODE: usize, + const SRC_CODE: usize, + const DST_CODE: usize, + > Iterator for GroupedGapsIterator +{ + type Item = ((usize, usize), D::DeserType); + fn next(&mut self) -> Option { + if self.current >= self.len { + return None; + } + if self.dst_left == 0 { + // read a new source + let src_gap = ConstCode::.read(&mut self.stream).ok()?; + self.src += src_gap as usize; + // read the outdegree + self.dst_left = ConstCode::.read(&mut self.stream).ok()? as usize; + self.prev_dst = 0; + } + + let dst_gap = ConstCode::.read(&mut self.stream).ok()?; + let label = self.deserializer.deserialize(&mut self.stream).ok()?; + self.prev_dst += dst_gap as usize; + self.current += 1; + self.dst_left -= 1; + Some(((self.src, self.prev_dst), label)) + } + + fn size_hint(&self) -> (usize, Option) { + (self.len(), Some(self.len())) + } +} + +impl< + D: BitDeserializer, + const OUTDEGREE_CODE: usize, + const SRC_CODE: usize, + const DST_CODE: usize, + > ExactSizeIterator for GroupedGapsIterator +{ + fn len(&self) -> usize { + self.len - self.current + } +} diff --git a/webgraph/src/utils/batch_codec/mod.rs b/webgraph/src/utils/batch_codec/mod.rs new file mode 100644 index 00000000..e66fd525 --- /dev/null +++ b/webgraph/src/utils/batch_codec/mod.rs @@ -0,0 +1,126 @@ +/* + * SPDX-FileCopyrightText: 2025 Tommaso Fontana + * + * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + */ + +use anyhow::Result; + +use super::ArcMmapHelper; +use dsi_bitstream::prelude::*; +use rdst::*; +use std::fs::File; +use std::io::BufWriter; +use std::path::Path; + +pub mod gaps; +pub mod grouped_gaps; + +/// The recommended default batch codec for unlabelled batches. +pub type DefaultBatchCodec = grouped_gaps::GroupedGapsCodec; + +pub type BitWriter = BufBitWriter>>; +pub type BitReader = BufBitReader>>; + +/// A trait for encoding and decoding batches of sorted triples. +pub trait BatchCodec: Send + Sync { + /// The label type of the triples to encode and decode. + /// While the bounds are not really necessary, in all the practical cases + /// we need them. + type Label: Copy + Send + Sync + 'static; + //// The type returned by `decode_batch`, the iterator of which yields the + //// decoded triples in sorted order. + /// + /// The type `IntoIter` has to be `Send + Sync + Clone` because most often we want + /// to use them in [`SortPairs`](crate::utils::sort_pairs::SortPairs) and + /// then in [`ArcListGraph`](crate::graphs::arc_list_graph::ArcListGraph) + /// which require them. + type DecodedBatch: IntoIterator< + Item = ((usize, usize), Self::Label), + IntoIter: Send + Sync + Clone, + >; + + /// Given a batch of sorted triples, encodes them to disk and returns the number of bits written. + fn encode_sorted_batch( + &self, + path: impl AsRef, + batch: &[((usize, usize), Self::Label)], + ) -> Result; + + /// Given a batch of triples, encodes them to disk and returns the number of bits written. + /// The batch needs a mutable reference to allow the coded to sort-in-place if needed. + fn encode_batch( + &self, + path: impl AsRef, + batch: &mut [((usize, usize), Self::Label)], + ) -> Result; + + /// Decodes a batch of triples from disk. + /// The returned type's iterator yields the serialized triples in sorted order. + fn decode_batch(&self, path: impl AsRef) -> Result; +} + +/// Convenience alias to extract the iterator type of the decoded batch from a [`BatchCodec`]. +pub type CodecIter = <::DecodedBatch as IntoIterator>::IntoIter; + +/// An arc expressed as a pair of nodes and the associated label. +/// +/// Equality and order are defined only (lexicographically) on the pair of +/// nodes. +/// +/// Since we use this to sort a batch of `(usize, usize, L)` triples, in order to +/// safely transmute between the two types, Triple HAS TO be `repr(transparent)` +/// of the same tuple type. +/// +/// We use this to implement `RadixKey` for sorting batches of triples +/// using `rdst`. +#[derive(Clone, Copy, Debug)] +#[repr(transparent)] +pub struct Triple(((usize, usize), L)); + +impl Triple { + /// Converts a mutable batch of `((usize, usize), L)` triples into a mutable slice of `Triple`. + /// + /// This is safe because `Triple` is `repr(transparent)` of the same tuple type. + pub fn cast_batch_mut(batch: &mut [((usize, usize), L)]) -> &mut [Triple] { + unsafe { std::mem::transmute(batch) } + } + /// Converts a batch of `((usize, usize), L)` triples into a slice of `Triple`. + /// + /// This is safe because `Triple` is `repr(transparent)` of the same tuple type. + pub fn cast_batch(batch: &[((usize, usize), L)]) -> &[Triple] { + unsafe { std::mem::transmute(batch) } + } +} + +impl RadixKey for Triple { + const LEVELS: usize = 16; + + fn get_level(&self, level: usize) -> u8 { + (if level < 8 { + self.0 .0 .1 >> ((level % 8) * 8) + } else { + self.0 .0 .0 >> ((level % 8) * 8) + }) as u8 + } +} + +impl PartialEq for Triple { + fn eq(&self, other: &Self) -> bool { + self.0 .0 == other.0 .0 + } +} + +impl Eq for Triple {} + +impl PartialOrd for Triple { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Triple { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.0 .0.cmp(&other.0 .0) + } +} diff --git a/webgraph/src/utils/mod.rs b/webgraph/src/utils/mod.rs index e3373a92..993e939b 100644 --- a/webgraph/src/utils/mod.rs +++ b/webgraph/src/utils/mod.rs @@ -32,6 +32,9 @@ pub fn temp_dir>(base: P) -> anyhow::Result { } } +mod batch_codec; +pub use batch_codec::*; + mod circular_buffer; pub(crate) use circular_buffer::*; diff --git a/webgraph/src/utils/par_sort_iters.rs b/webgraph/src/utils/par_sort_iters.rs index 0f7580b9..81a9021f 100644 --- a/webgraph/src/utils/par_sort_iters.rs +++ b/webgraph/src/utils/par_sort_iters.rs @@ -1,6 +1,7 @@ /* * SPDX-FileCopyrightText: 2025 Inria * SPDX-FileCopyrightText: 2025 Sebastiano Vigna + * SPDX-FileCopyrightText: 2025 Tommaso Fontana * * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later */ @@ -21,19 +22,17 @@ //! If your pairs are emitted by a single parallel iterator, consider using //! [`ParSortPairs`](crate::utils::par_sort_pairs::ParSortPairs) instead. -use std::marker::PhantomData; -use std::num::NonZeroUsize; +use core::num::NonZeroUsize; use sync_cell_slice::SyncSlice; use anyhow::{Context, Result}; -use dsi_bitstream::traits::NE; use dsi_progress_logger::{concurrent_progress_logger, ProgressLog}; use rayon::prelude::*; -use super::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters, Triple}; +use super::sort_pairs::KMergeIters; use super::MemoryUsage; -use crate::traits::{BitDeserializer, BitSerializer}; use crate::utils::SplitIters; +use crate::utils::{BatchCodec, CodecIter, DefaultBatchCodec}; /// Takes a sequence of iterators of (labelled)pairs as input, and turns them /// into [`SplitIters`] structure which is suitable for @@ -103,23 +102,21 @@ use crate::utils::SplitIters; /// )?; /// # Ok::<(), Box>(()) /// ``` -pub struct ParSortIters { +pub struct ParSortIters { num_nodes: usize, expected_num_pairs: Option, num_partitions: NonZeroUsize, memory_usage: MemoryUsage, - marker: PhantomData, } -impl ParSortIters<()> { - /// See [`try_sort`](ParSortIters::try_sort). - /// +impl ParSortIters { /// This is a convenience method for iterators that cannot fail. + /// See [`try_sort`](ParSortIters::try_sort). pub fn sort( &self, pairs: impl IntoIterator< - Item: IntoIterator + Send, - IntoIter: ExactSizeIterator, + Item: IntoIterator + Send + Sync, + IntoIter: ExactSizeIterator + Send + Sync, >, ) -> Result>> { self.try_sort::(pairs) @@ -130,14 +127,13 @@ impl ParSortIters<()> { pub fn try_sort>( &self, pairs: impl IntoIterator< - Item: IntoIterator + Send, - IntoIter: ExactSizeIterator, + Item: IntoIterator + Send + Sync, + IntoIter: ExactSizeIterator + Send + Sync, >, ) -> Result>> { - let split = >::try_sort_labeled::<(), (), E>( + let split = ::try_sort_labeled::( self, - &(), - (), + DefaultBatchCodec::default(), pairs .into_iter() .map(|iter| iter.into_iter().map(|pair| (pair, ()))), @@ -157,7 +153,7 @@ impl ParSortIters<()> { } } -impl ParSortIters { +impl ParSortIters { /// Creates a new [`ParSortIters`] instance. /// /// The methods [`num_partitions`](ParSortIters::num_partitions) (which sets @@ -174,7 +170,6 @@ impl ParSortIters { expected_num_pairs: None, num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?, memory_usage: MemoryUsage::default(), - marker: PhantomData, }) } @@ -216,31 +211,17 @@ impl ParSortIters { /// See [`try_sort_labeled`](ParSortIters::try_sort_labeled). /// /// This is a convenience method for iterators that cannot fail. - pub fn sort_labeled( + pub fn sort_labeled( &self, - serializer: &S, - deserializer: D, + batch_codec: C, pairs: impl IntoIterator< - Item: IntoIterator + Send, + Item: IntoIterator + Send, IntoIter: ExactSizeIterator, >, ) -> Result< - SplitIters< - impl IntoIterator< - Item = ( - (usize, usize), - >::DeserType, - ), - IntoIter: Send + Sync, - >, - >, - > - where - L: Copy + Send + Sync, - S: Sync + BitSerializer, - D: Clone + Send + Sync + BitDeserializer, - { - self.try_sort_labeled::(serializer, deserializer, pairs) + SplitIters>, + > { + self.try_sort_labeled::(batch_codec, pairs) } /// Sorts the output of the provided sequence of iterators of (labelled) @@ -252,37 +233,23 @@ impl ParSortIters { /// The bit deserializer must be [`Clone`] because we need one for each /// [`BatchIterator`], and there are possible scenarios in which the /// deserializer might be stateful. - pub fn try_sort_labeled>( + pub fn try_sort_labeled>( &self, - serializer: &S, - deserializer: D, + batch_codec: C, pairs: impl IntoIterator< - Item: IntoIterator + Send, + Item: IntoIterator + Send, IntoIter: ExactSizeIterator, >, ) -> Result< - SplitIters< - impl IntoIterator< - Item = ( - (usize, usize), - >::DeserType, - ), - IntoIter: Send + Sync, - >, - >, - > - where - L: Copy + Send + Sync, - S: Sync + BitSerializer, - D: Clone + Send + Sync + BitDeserializer, - { + SplitIters>, + > { let unsorted_pairs = pairs; let num_partitions = self.num_partitions.into(); let num_buffers = rayon::current_num_threads() * num_partitions; let batch_size = self .memory_usage - .batch_size::>() + .batch_size::<((usize, usize), C::Label)>() .div_ceil(num_buffers); let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions); @@ -303,12 +270,11 @@ impl ParSortIters { let mut partitioned_presorted_pairs = vec![vec![]; num_blocks]; let result = partitioned_presorted_pairs.as_sync_slice(); - std::thread::scope(|s| { let presort_tmp_dir = &presort_tmp_dir; for (block_id, pair) in unsorted_pairs.enumerate() { - let deserializer = deserializer.clone(); let mut pl = pl.clone(); + let batch_codec = &batch_codec; s.spawn(move || { let mut unsorted_buffers = (0..num_partitions) .map(|_| Vec::with_capacity(batch_size)) @@ -325,8 +291,7 @@ impl ParSortIters { let buf_len = buf.len(); super::par_sort_pairs::flush_buffer( presort_tmp_dir.path(), - serializer, - deserializer.clone(), + batch_codec, block_id, partition_id, sorted_pairs, @@ -338,10 +303,7 @@ impl ParSortIters { pl.update_with_count(buf_len); } - buf.push(Triple { - pair: [src, dst], - label, - }); + buf.push(((src, dst), label)); } for (partition_id, (pairs, mut buf)) in sorted_pairs @@ -352,8 +314,7 @@ impl ParSortIters { let buf_len = buf.len(); super::par_sort_pairs::flush_buffer( presort_tmp_dir.path(), - serializer, - deserializer.clone(), + batch_codec, block_id, partition_id, pairs, @@ -379,9 +340,9 @@ impl ParSortIters { // Let's merge the {partition_id -> [iterators]} maps of each worker let partitioned_presorted_pairs = partitioned_presorted_pairs.into_par_iter().reduce( || (0..num_partitions).map(|_| Vec::new()).collect(), - |mut pair_partitions1: Vec>>, - pair_partitions2: Vec>>| - -> Vec>> { + |mut pair_partitions1: Vec>>, + pair_partitions2: Vec>>| + -> Vec>> { assert_eq!(pair_partitions1.len(), num_partitions); assert_eq!(pair_partitions2.len(), num_partitions); for (partition1, partition2) in pair_partitions1 diff --git a/webgraph/src/utils/par_sort_pairs.rs b/webgraph/src/utils/par_sort_pairs.rs index 783bf892..e660c387 100644 --- a/webgraph/src/utils/par_sort_pairs.rs +++ b/webgraph/src/utils/par_sort_pairs.rs @@ -1,5 +1,6 @@ /* * SPDX-FileCopyrightText: 2025 Inria + * SPDX-FileCopyrightText: 2025 Tommaso Fontana * * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later */ @@ -21,22 +22,21 @@ //! using [`ParSortIters`](crate::utils::par_sort_iters::ParSortIters) instead. use std::cell::RefCell; -use std::marker::PhantomData; use std::num::NonZeroUsize; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::{ensure, Context, Result}; -use dsi_bitstream::traits::NE; use dsi_progress_logger::{concurrent_progress_logger, ProgressLog}; use rayon::prelude::*; use rayon::Yield; -use rdst::RadixSort; use thread_local::ThreadLocal; -use super::sort_pairs::{BatchIterator, BitReader, BitWriter, KMergeIters, Triple}; +use crate::utils::DefaultBatchCodec; + +use super::sort_pairs::KMergeIters; use super::MemoryUsage; -use crate::traits::{BitDeserializer, BitSerializer}; +use super::{BatchCodec, CodecIter}; use crate::utils::SplitIters; /// Takes a parallel iterator of (labelled) pairs as input, and turns them into @@ -113,15 +113,14 @@ use crate::utils::SplitIters; /// )?; /// # Ok::<(), Box>(()) /// ``` -pub struct ParSortPairs { +pub struct ParSortPairs { num_nodes: usize, expected_num_pairs: Option, num_partitions: NonZeroUsize, memory_usage: MemoryUsage, - marker: PhantomData, } -impl ParSortPairs<()> { +impl ParSortPairs { /// See [`try_sort`](ParSortPairs::try_sort). pub fn sort( &self, @@ -139,8 +138,7 @@ impl ParSortPairs<()> { ) -> Result>> { let split = self.try_sort_labeled( - &(), - (), + &DefaultBatchCodec::default(), pairs.map(|pair| -> Result<_> { let (src, dst) = pair.map_err(Into::into)?; Ok(((src, dst), ())) @@ -161,7 +159,7 @@ impl ParSortPairs<()> { } } -impl ParSortPairs { +impl ParSortPairs { /// Creates a new [`ParSortPairs`] instance. /// /// The methods [`num_partitions`](ParSortPairs::num_partitions) (which sets @@ -178,7 +176,6 @@ impl ParSortPairs { expected_num_pairs: None, num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?, memory_usage: MemoryUsage::default(), - marker: PhantomData, }) } @@ -220,32 +217,16 @@ impl ParSortPairs { /// See [`try_sort_labeled`](ParSortPairs::try_sort_labeled). /// /// This is a convenience method for parallel iterators that cannot fail. - pub fn sort_labeled( + pub fn sort_labeled( &self, - serializer: &S, - deserializer: D, - pairs: impl ParallelIterator, + batch_codec: &C, + pairs: impl ParallelIterator, ) -> Result< SplitIters< - impl IntoIterator< - Item = ( - (usize, usize), - >::DeserType, - ), - IntoIter: Clone + Send + Sync, - >, + impl IntoIterator, >, - > - where - L: Copy + Send + Sync, - S: Sync + BitSerializer, - D: Clone + Send + Sync + BitDeserializer, - { - self.try_sort_labeled::( - serializer, - deserializer, - pairs.map(Ok), - ) + > { + self.try_sort_labeled::(batch_codec, pairs.map(Ok)) } /// Sorts the output of the provided parallel iterator, @@ -257,34 +238,22 @@ impl ParSortPairs { /// The bit deserializer must be [`Clone`] because we need one for each /// [`BatchIterator`], and there are possible scenarios in which the /// deserializer might be stateful. - pub fn try_sort_labeled>( + pub fn try_sort_labeled>( &self, - serializer: &S, - deserializer: D, - pairs: impl ParallelIterator>, + batch_codec: &C, + pairs: impl ParallelIterator>, ) -> Result< SplitIters< - impl IntoIterator< - Item = ( - (usize, usize), - >::DeserType, - ), - IntoIter: Clone + Send + Sync, - >, + impl IntoIterator, >, - > - where - L: Copy + Send + Sync, - S: Sync + BitSerializer, - D: Clone + Send + Sync + BitDeserializer, - { + > { let unsorted_pairs = pairs; let num_partitions = self.num_partitions.into(); let num_buffers = rayon::current_num_threads() * num_partitions; let batch_size = self .memory_usage - .batch_size::>() + .batch_size::<((usize, usize), C::Label)>() .div_ceil(num_buffers); let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions); @@ -300,7 +269,7 @@ impl ParSortPairs { let presort_tmp_dir = tempfile::tempdir().context("Could not create temporary directory")?; - let sorter_thread_states = ThreadLocal::>>::new(); + let sorter_thread_states = ThreadLocal::>>::new(); // iterators in partitioned_presorted_pairs[partition_id] contain all pairs (src, dst, label) // where num_nodes_per_partition*partition_id <= src < num_nodes_per_partition*(partition_id+1) @@ -365,8 +334,7 @@ impl ParSortPairs { let buf_len = buf.len(); flush_buffer( presort_tmp_dir.path(), - serializer, - deserializer.clone(), + batch_codec, *worker_id, partition_id, sorted_pairs, @@ -377,27 +345,24 @@ impl ParSortPairs { pl.update_with_count(buf_len); } - buf.push(Triple { - pair: [src, dst], - label, - }); + buf.push(((src, dst), label)); Ok(()) }, )?; // flush remaining buffers - let partitioned_presorted_pairs: Vec>> = sorter_thread_states + let partitioned_presorted_pairs: Vec>> = sorter_thread_states .into_iter() .collect::>() .into_par_iter() - .map_with(pl.clone(), |pl, thread_state: RefCell>| { + .map_with(pl.clone(), |pl, thread_state: RefCell>| { let thread_state = thread_state.into_inner(); let mut partitioned_sorted_pairs = Vec::with_capacity(num_partitions); assert_eq!(thread_state.sorted_pairs.len(), num_partitions); assert_eq!(thread_state.unsorted_buffers.len(), num_partitions); for (partition_id, (mut sorted_pairs, mut buf)) in thread_state.sorted_pairs.into_iter().zip(thread_state.unsorted_buffers.into_iter()).enumerate() { let buf_len = buf.len(); - flush_buffer(presort_tmp_dir.path(), serializer, deserializer.clone(), thread_state.worker_id, partition_id, &mut sorted_pairs, &mut buf).context("Could not flush buffer at the end")?; + flush_buffer(presort_tmp_dir.path(), batch_codec, thread_state.worker_id, partition_id, &mut sorted_pairs, &mut buf).context("Could not flush buffer at the end")?; assert!(buf.is_empty(), "flush_buffer did not empty the buffer"); pl.update_with_count(buf_len); @@ -412,7 +377,7 @@ impl ParSortPairs { // Let's merge the {partition_id -> [iterators]} maps of each worker .try_reduce( || (0..num_partitions).map(|_| Vec::new()).collect(), - |mut pair_partitions1: Vec>>, pair_partitions2: Vec>>| -> Result>>> { + |mut pair_partitions1: Vec>>, pair_partitions2: Vec>>| -> Result>>> { assert_eq!(pair_partitions1.len(), num_partitions); assert_eq!(pair_partitions2.len(), num_partitions); for (partition1, partition2) in pair_partitions1.iter_mut().zip(pair_partitions2.into_iter()) { @@ -448,27 +413,20 @@ impl ParSortPairs { } } -struct SorterThreadState> { +struct SorterThreadState { worker_id: usize, - sorted_pairs: Vec>>, - unsorted_buffers: Vec>>, + sorted_pairs: Vec>>, + unsorted_buffers: Vec>, } -pub(crate) fn flush_buffer< - L: Copy + Send + Sync, - S: BitSerializer, - D: BitDeserializer, ->( +pub(crate) fn flush_buffer( tmp_dir: &Path, - serializer: &S, - deserializer: D, + batch_codec: &C, worker_id: usize, partition_id: usize, - sorted_pairs: &mut Vec>, - buf: &mut Vec>, + sorted_pairs: &mut Vec>, + buf: &mut Vec<((usize, usize), C::Label)>, ) -> Result<()> { - buf.radix_sort_unstable(); - let path = tmp_dir.join(format!( "sorted_batch_{worker_id}_{partition_id}_{}", sorted_pairs.len() @@ -480,9 +438,15 @@ pub(crate) fn flush_buffer< "Can't create temporary file {}, it already exists", path.display() ); + + batch_codec + .encode_batch(&path, buf) + .with_context(|| format!("Could not write sorted batch to {}", path.display()))?; sorted_pairs.push( - BatchIterator::new_from_vec_sorted_labeled(&path, buf, serializer, deserializer) - .with_context(|| format!("Could not write sorted batch to {}", path.display()))?, + batch_codec + .decode_batch(&path) + .with_context(|| format!("Could not read sorted batch from {}", path.display()))? + .into_iter(), ); buf.clear(); Ok(()) diff --git a/webgraph/src/utils/sort_pairs.rs b/webgraph/src/utils/sort_pairs.rs index 8891497d..da201249 100644 --- a/webgraph/src/utils/sort_pairs.rs +++ b/webgraph/src/utils/sort_pairs.rs @@ -1,6 +1,7 @@ /* * SPDX-FileCopyrightText: 2023 Inria * SPDX-FileCopyrightText: 2023 Sebastiano Vigna + * SPDX-FileCopyrightText: 2025 Tommaso Fontana * * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later */ @@ -8,65 +9,23 @@ //! Facilities to sort externally pairs of nodes with an associated label. #![allow(clippy::non_canonical_partial_ord_impl)] -use super::{ArcMmapHelper, MmapHelper}; +use super::ArcMmapHelper; use crate::{ - traits::{BitDeserializer, BitSerializer, SortedIterator}, - utils::MemoryUsage, + traits::SortedIterator, + utils::{BatchCodec, CodecIter, DefaultBatchCodec, MemoryUsage}, }; use anyhow::{anyhow, Context}; use dary_heap::PeekMut; use dsi_bitstream::prelude::*; -use log::debug; -use mmap_rs::MmapFlags; -use rdst::*; use std::{ fs::File, io::BufWriter, path::{Path, PathBuf}, - sync::Arc, }; pub type BitWriter = BufBitWriter>>; pub type BitReader = BufBitReader>>; -/// An arc expressed as a pair of nodes and the associated label. -/// -/// Equality and order are defined only (lexicographically) on the pair of -/// nodes. -#[derive(Clone, Debug, Copy)] -pub struct Triple { - pub pair: [usize; 2], - pub label: L, -} - -impl RadixKey for Triple { - const LEVELS: usize = 16; - - fn get_level(&self, level: usize) -> u8 { - (self.pair[1 - level / 8] >> ((level % 8) * 8)) as u8 - } -} - -impl PartialEq for Triple { - fn eq(&self, other: &Self) -> bool { - self.pair == other.pair - } -} - -impl Eq for Triple {} - -impl PartialOrd for Triple { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.pair.cmp(&other.pair)) - } -} - -impl Ord for Triple { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.pair.cmp(&other.pair) - } -} - /// A struct that provides external sorting for pairs of nodes with an /// associated label. /// @@ -85,13 +44,8 @@ impl Ord for Triple { /// the limitations of your OS regarding memory-mapping (e.g., /// `/proc/sys/vm/max_map_count` under Linux). /// -/// The structure accept as type parameter a [`BitSerializer`] and a -/// [`BitDeserializer`] that are used to serialize and deserialize the labels. -/// In case they are both `()`, the structure behaves as if there is no label. -/// -/// The bit deserializer must be [`Clone`] because we need one for each -/// [`BatchIterator`], and there are possible scenarios in which the -/// deserializer might be stateful. +/// The structure accept as type parameter a [`BatchCodec`] is used to serialize +/// and deserialize the triples. /// /// You can use this structure in two ways: either create an instance with /// [`new_labeled`](SortPairs::new_labeled) and add labeled pairs using @@ -107,31 +61,23 @@ impl Ord for Triple { /// the [resulting iterator](SortPairs::iter) is labeled, and returns pairs /// labeled with `()`. Use [`Left`](crate::prelude::proj::Left) to project away /// the labels if needed. -pub struct SortPairs< - S: BitSerializer = (), - D: BitDeserializer + Clone = (), -> where - S::SerType: Send + Sync + Copy, -{ +pub struct SortPairs { /// The batch size. batch_size: usize, /// Where we are going to store the batches. tmp_dir: PathBuf, - /// A stateful serializer we will pass to batch iterators to serialize + /// A potentially stateful serializer and deserializer we will pass to batch iterators to serialize /// the labels to a bitstream. - serializer: S, - /// A stateful deserializer we will pass to batch iterators to deserialize - /// the labels from a bitstream. - deserializer: D, + batch_codec: C, /// Keeps track of how many batches we created. num_batches: usize, /// The length of the last batch, which might be smaller than [`SortPairs::batch_size`]. last_batch_len: usize, /// The batch of triples we are currently building. - batch: Vec>, + batch: Vec<((usize, usize), C::Label)>, } -impl SortPairs<(), ()> { +impl SortPairs { /// Creates a new `SortPairs` without labels. /// /// The `tmp_dir` must be empty, and in particular it must not be shared @@ -144,7 +90,7 @@ impl SortPairs<(), ()> { /// `dir.path()`) because otherwise [the directory will be deleted too /// soon](https://github.com/Stebalien/tempfile/issues/115). pub fn new>(memory_usage: MemoryUsage, tmp_dir: P) -> anyhow::Result { - Self::new_labeled(memory_usage, tmp_dir, (), ()) + Self::new_labeled(memory_usage, tmp_dir, DefaultBatchCodec::default()) } /// Adds a unlabeled pair to the graph. pub fn push(&mut self, x: usize, y: usize) -> anyhow::Result<()> { @@ -159,7 +105,7 @@ impl SortPairs<(), ()> { pub fn sort( &mut self, pairs: impl IntoIterator, - ) -> anyhow::Result, ()>> { + ) -> anyhow::Result>> { self.try_sort::(pairs.into_iter().map(Ok)) } @@ -171,7 +117,7 @@ impl SortPairs<(), ()> { pub fn try_sort>( &mut self, pairs: impl IntoIterator>, - ) -> anyhow::Result, ()>> { + ) -> anyhow::Result, ()>> { for pair in pairs { let (x, y) = pair.map_err(Into::into)?; self.push(x, y)?; @@ -180,10 +126,7 @@ impl SortPairs<(), ()> { } } -impl, D: BitDeserializer + Clone> SortPairs -where - S::SerType: Send + Sync + Copy, -{ +impl SortPairs { /// Creates a new `SortPairs` with labels. /// /// The `dir` must be empty, and in particular it must not be shared @@ -193,8 +136,7 @@ where pub fn new_labeled>( memory_usage: MemoryUsage, dir: P, - serializer: S, - deserializer: D, + batch_codec: C, ) -> anyhow::Result { let dir = dir.as_ref(); let mut dir_entries = @@ -202,12 +144,11 @@ where if dir_entries.next().is_some() { Err(anyhow!("{} is not empty", dir.display())) } else { - let batch_size = memory_usage.batch_size::<(usize, usize)>(); + let batch_size = memory_usage.batch_size::<(usize, usize, C::Label)>(); Ok(SortPairs { batch_size, - serializer, + batch_codec, tmp_dir: dir.to_owned(), - deserializer, num_batches: 0, last_batch_len: 0, batch: Vec::with_capacity(batch_size), @@ -216,11 +157,8 @@ where } /// Adds a labeled pair to the graph. - pub fn push_labeled(&mut self, x: usize, y: usize, t: S::SerType) -> anyhow::Result<()> { - self.batch.push(Triple { - pair: [x, y], - label: t, - }); + pub fn push_labeled(&mut self, x: usize, y: usize, t: C::Label) -> anyhow::Result<()> { + self.batch.push(((x, y), t)); if self.batch.len() >= self.batch_size { self.dump()?; } @@ -234,14 +172,15 @@ where return Ok(()); } - // Creates a batch file where to dump - let batch_name = self.tmp_dir.join(format!("{:06x}", self.num_batches)); - BatchIterator::new_from_vec_labeled( - batch_name, - &mut self.batch, - &self.serializer, - self.deserializer.clone(), - )?; + let batch_path = self.tmp_dir.join(format!("{:06x}", self.num_batches)); + let bit_size = self.batch_codec.encode_batch(batch_path, &mut self.batch)?; + log::info!( + "Dumped batch {} with {} arcs ({} bits, {:.2} bits / arc)", + self.num_batches, + self.batch.len(), + bit_size, + bit_size as f64 / self.batch.len() as f64 + ); self.last_batch_len = self.batch.len(); self.batch.clear(); self.num_batches += 1; @@ -249,19 +188,14 @@ where } /// Returns an iterator over the labeled pairs, lexicographically sorted. - pub fn iter(&mut self) -> anyhow::Result, D::DeserType>> { + pub fn iter(&mut self) -> anyhow::Result, C::Label>> { self.dump()?; Ok(KMergeIters::new((0..self.num_batches).map(|batch_idx| { - BatchIterator::new_labeled( - self.tmp_dir.join(format!("{batch_idx:06x}")), - if batch_idx == self.num_batches - 1 { - self.last_batch_len - } else { - self.batch_size - }, - self.deserializer.clone(), - ) - .unwrap() + let batch_path = self.tmp_dir.join(format!("{batch_idx:06x}")); + self.batch_codec + .decode_batch(batch_path) + .unwrap() + .into_iter() }))) } @@ -273,8 +207,8 @@ where /// [`iter`](SortPairs::iter). pub fn sort_labeled( &mut self, - pairs: impl IntoIterator, - ) -> anyhow::Result, D::DeserType>> { + pairs: impl IntoIterator, + ) -> anyhow::Result, C::Label>> { self.try_sort_labeled::(pairs.into_iter().map(Ok)) } @@ -286,8 +220,8 @@ where /// [`iter`](SortPairs::iter). pub fn try_sort_labeled>( &mut self, - pairs: impl IntoIterator>, - ) -> anyhow::Result, D::DeserType>> { + pairs: impl IntoIterator>, + ) -> anyhow::Result, C::Label>> { for pair in pairs { let ((x, y), label) = pair.map_err(Into::into)?; self.push_labeled(x, y, label)?; @@ -296,189 +230,6 @@ where } } -/// An iterator that can read the batch files generated by [`SortPairs`]. -pub struct BatchIterator = ()> { - stream: BitReader, - len: usize, - current: usize, - prev_src: usize, - prev_dst: usize, - deserializer: D, -} - -impl BatchIterator<()> { - /// Sorts the given unlabeled pairs in memory, dumps them in `file_path` and - /// return an iterator over them. - #[inline] - pub fn new_from_vec>( - file_path: P, - batch: &mut [(usize, usize)], - ) -> anyhow::Result { - Self::new_from_vec_labeled( - file_path, - unsafe { core::mem::transmute::<&mut [(usize, usize)], &mut [Triple<()>]>(batch) }, - &(), - (), - ) - } - /// Dump the given triples in `file_path` and return an iterator over - /// them, assuming they are already sorted. - pub fn new_from_vec_sorted>( - file_path: P, - batch: &[(usize, usize)], - ) -> anyhow::Result { - Self::new_from_vec_sorted_labeled( - file_path, - unsafe { core::mem::transmute::<&[(usize, usize)], &[Triple<()>]>(batch) }, - &(), - (), - ) - } -} - -impl> BatchIterator { - /// Sort the given labeled pairs in memory, dump them in `file_path` and - /// return an iterator over them. - #[inline] - pub fn new_from_vec_labeled>( - file_path: impl AsRef, - batch: &mut [Triple], - serializer: &S, - deserializer: D, - ) -> anyhow::Result - where - S::SerType: Send + Sync + Copy, - { - let start = std::time::Instant::now(); - batch.radix_sort_unstable(); - debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed()); - Self::new_from_vec_sorted_labeled(file_path, batch, serializer, deserializer) - } - - /// Dump the given labeled pairs in `file_path` and return an iterator - /// over them, assuming they are already sorted. - pub fn new_from_vec_sorted_labeled>( - file_path: impl AsRef, - batch: &[Triple], - serializer: &S, - deserializer: D, - ) -> anyhow::Result - where - S::SerType: Send + Sync + Copy, - { - // create a batch file where to dump - let file_path = file_path.as_ref(); - let file = std::io::BufWriter::with_capacity( - 1 << 16, - std::fs::File::create(file_path).with_context(|| { - format!( - "Could not create BatchIterator temporary file {}", - file_path.display() - ) - })?, - ); - // create a bitstream to write to the file - let mut stream = >::new(>::new(file)); - // dump the triples to the bitstream - let (mut prev_src, mut prev_dst) = (0, 0); - for Triple { - pair: [src, dst], - label, - } in batch.iter() - { - // write the source gap as gamma - stream - .write_gamma((src - prev_src) as _) - .with_context(|| format!("Could not write {src} after {prev_src}"))?; - if *src != prev_src { - // Reset prev_y - prev_dst = 0; - } - // write the destination gap as gamma - stream - .write_gamma((dst - prev_dst) as _) - .with_context(|| format!("Could not write {dst} after {prev_dst}"))?; - // write the label - serializer - .serialize(label, &mut stream) - .context("Could not serialize label")?; - (prev_src, prev_dst) = (*src, *dst); - } - // flush the stream and reset the buffer - stream.flush().context("Could not flush stream")?; - - Self::new_labeled(file_path, batch.len(), deserializer) - } - - /// Creates a new iterator over the triples previously serialized in `file_path`. - pub fn new_labeled>( - file_path: P, - len: usize, - deserializer: D, - ) -> anyhow::Result { - let stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( - MmapHelper::mmap( - file_path.as_ref(), - MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL, - ) - .with_context(|| format!("Could not mmap {}", file_path.as_ref().display()))?, - )))); - Ok(BatchIterator { - stream, - len, - current: 0, - prev_src: 0, - prev_dst: 0, - deserializer, - }) - } -} - -impl + Clone> Clone for BatchIterator { - fn clone(&self) -> Self { - BatchIterator { - stream: self.stream.clone(), - len: self.len, - current: self.current, - prev_src: self.prev_src, - prev_dst: self.prev_dst, - deserializer: self.deserializer.clone(), - } - } -} - -unsafe impl> SortedIterator for BatchIterator {} - -impl> Iterator for BatchIterator { - type Item = ((usize, usize), D::DeserType); - fn next(&mut self) -> Option { - if self.current == self.len { - return None; - } - let src = self.prev_src + self.stream.read_gamma().unwrap() as usize; - if src != self.prev_src { - // Reset prev_y - self.prev_dst = 0; - } - let dst = self.prev_dst + self.stream.read_gamma().unwrap() as usize; - let label = self.deserializer.deserialize(&mut self.stream).unwrap(); - self.prev_src = src; - self.prev_dst = dst; - self.current += 1; - Some(((src, dst), label)) - } - - fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) - } -} - -impl> ExactSizeIterator for BatchIterator { - fn len(&self) -> usize { - self.len - } -} - #[derive(Clone, Debug)] /// Private struct that can be used to sort labeled pairs based only on the pair of /// nodes and ignoring the label. @@ -685,8 +436,12 @@ impl> Extend> for K #[cfg(test)] mod tests { use super::*; + use crate::{ + traits::{BitDeserializer, BitSerializer}, + utils::gaps::GapsCodec, + }; - #[derive(Clone, Debug)] + #[derive(Clone, Debug, Default)] struct MyDessert; impl BitDeserializer for MyDessert { @@ -715,8 +470,12 @@ mod tests { use tempfile::Builder; let dir = Builder::new().prefix("test_sort_pairs_").tempdir()?; - let mut sp = - SortPairs::new_labeled(MemoryUsage::BatchSize(10), dir.path(), MyDessert, MyDessert)?; + let mut sp = SortPairs::new_labeled( + MemoryUsage::BatchSize(10), + dir.path(), + GapsCodec::::default(), + )?; + let n = 25; for i in 0..n { sp.push_labeled(i, i + 1, i + 2)?; @@ -759,8 +518,11 @@ mod tests { // Test labeled sort let dir2 = Builder::new().prefix("test_sort_labeled_").tempdir()?; - let mut sp2 = - SortPairs::new_labeled(MemoryUsage::BatchSize(5), dir2.path(), MyDessert, MyDessert)?; + let mut sp2 = SortPairs::new_labeled( + MemoryUsage::BatchSize(5), + dir2.path(), + GapsCodec::::default(), + )?; let labeled_pairs = vec![ ((3, 4), 7), diff --git a/webgraph/tests/test_transpose.rs b/webgraph/tests/test_transpose.rs index 0b28b2b6..f56b8481 100644 --- a/webgraph/tests/test_transpose.rs +++ b/webgraph/tests/test_transpose.rs @@ -12,6 +12,7 @@ use webgraph::graphs::vec_graph::LabeledVecGraph; use webgraph::prelude::{transpose, transpose_labeled, transpose_split}; use webgraph::traits::labels::SequentialLabeling; use webgraph::traits::{graph, BitDeserializer, BitSerializer}; +use webgraph::utils::gaps::GapsCodec; use webgraph::utils::sort_pairs::{BitReader, BitWriter}; use webgraph::utils::MemoryUsage; @@ -88,10 +89,24 @@ fn test_transpose_labeled() -> anyhow::Result<()> { ]; let g = LabeledVecGraph::::from_arcs(arcs); - let trans = transpose_labeled(&g, MemoryUsage::BatchSize(3), BS {}, BD {})?; + let trans = transpose_labeled( + &g, + MemoryUsage::BatchSize(3), + GapsCodec::<_, _> { + serializer: BS {}, + deserializer: BD {}, + }, + )?; let g2 = LabeledVecGraph::::from_lender(trans.iter()); - let trans = transpose_labeled(&g2, MemoryUsage::BatchSize(3), BS {}, BD {})?; + let trans = transpose_labeled( + &g2, + MemoryUsage::BatchSize(3), + GapsCodec::<_, _> { + serializer: BS {}, + deserializer: BD {}, + }, + )?; let g3 = LabeledVecGraph::::from_lender(trans.iter()); let g4 = LabeledVecGraph::from_lender(g.iter()); From 01f88b4ee1af6badf9baf146e7090c1e52991ac8 Mon Sep 17 00:00:00 2001 From: Tommaso Fontana Date: Sun, 12 Oct 2025 14:32:45 +0200 Subject: [PATCH 2/4] BatchCodec accepts Endianness --- webgraph/examples/bench_sort_pairs.rs | 7 +- webgraph/src/utils/batch_codec/gaps.rs | 106 ++++++++++++++---- .../src/utils/batch_codec/grouped_gaps.rs | 105 +++++++++++++---- webgraph/src/utils/batch_codec/mod.rs | 4 +- webgraph/src/utils/sort_pairs.rs | 50 +++++---- webgraph/tests/test_transpose.rs | 38 +++---- 6 files changed, 220 insertions(+), 90 deletions(-) diff --git a/webgraph/examples/bench_sort_pairs.rs b/webgraph/examples/bench_sort_pairs.rs index 2ad3236e..b0041498 100644 --- a/webgraph/examples/bench_sort_pairs.rs +++ b/webgraph/examples/bench_sort_pairs.rs @@ -11,7 +11,7 @@ use anyhow::Result; use clap::Parser; use dsi_bitstream::traits::BitRead; use dsi_bitstream::traits::BitWrite; -use dsi_bitstream::traits::Endianness; +use dsi_bitstream::traits::{Endianness, BE}; use dsi_progress_logger::prelude::{ProgressLog, ProgressLogger}; use rand::rngs::SmallRng; use rand::RngCore; @@ -66,10 +66,7 @@ pub fn main() -> Result<()> { let mut sp = SortPairs::new_labeled( MemoryUsage::BatchSize(args.batch), dir.path(), - GapsCodec::<_, _> { - serializer: Mock(), - deserializer: Mock(), - }, + GapsCodec::::new(Mock(), Mock()), )?; let mut r = SmallRng::seed_from_u64(0); diff --git a/webgraph/src/utils/batch_codec/gaps.rs b/webgraph/src/utils/batch_codec/gaps.rs index 0a06a57d..e12eadc3 100644 --- a/webgraph/src/utils/batch_codec/gaps.rs +++ b/webgraph/src/utils/batch_codec/gaps.rs @@ -21,7 +21,7 @@ use dsi_bitstream::prelude::*; use mmap_rs::MmapFlags; use rdst::*; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] /// A codec for encoding and decoding batches of triples using gap compression. /// /// This codec encodes triples of the form `(src, dst, label)` by encoding the @@ -81,26 +81,67 @@ use rdst::*; /// therefore the recommended default is `Gamma` for src gaps and `Delta` for /// dst gaps as they are universal codes. pub struct GapsCodec< - S: BitSerializer = (), - D: BitDeserializer + Clone = (), + E: Endianness = NE, + S: BitSerializer> = (), + D: BitDeserializer, DeserType = S::SerType> + Clone = (), const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA }, -> { +> where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, +{ /// Serializer for the labels pub serializer: S, /// Deserializer for the labels pub deserializer: D, + /// Marker for the endianness + pub _marker: std::marker::PhantomData, } -impl BatchCodec - for GapsCodec +impl GapsCodec where - S: BitSerializer + Send + Sync, - D: BitDeserializer + Send + Sync + Clone, + E: Endianness, + S: BitSerializer> + Send + Sync, + D: BitDeserializer, DeserType = S::SerType> + Send + Sync + Clone, + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, +{ + /// Creates a new `GapsCodec` with the given serializer and deserializer. + pub fn new(serializer: S, deserializer: D) -> Self { + Self { + serializer, + deserializer, + _marker: std::marker::PhantomData, + } + } +} + +impl core::default::Default + for GapsCodec +where + E: Endianness, + S: BitSerializer> + Send + Sync, + D: BitDeserializer, DeserType = S::SerType> + Send + Sync + Clone, + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, +{ + fn default() -> Self { + Self::new(Default::default(), Default::default()) + } +} + +impl BatchCodec + for GapsCodec +where + E: Endianness, + S: BitSerializer> + Send + Sync, + D: BitDeserializer, DeserType = S::SerType> + Send + Sync + Clone, S::SerType: Send + Sync + Copy + 'static + core::fmt::Debug, // needed by radix sort + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { type Label = S::SerType; - type DecodedBatch = GapsIterator; + type DecodedBatch = GapsIterator; fn encode_batch( &self, @@ -131,7 +172,7 @@ where })?, ); // create a bitstream to write to the file - let mut stream = >::new(>::new(file)); + let mut stream = >::new(>::new(file)); // prefix the stream with the length of the batch // we use a delta code since it'll be a big number most of the time @@ -170,7 +211,7 @@ where fn decode_batch(&self, path: impl AsRef) -> Result { // open the file - let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( + let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( MmapHelper::mmap( path.as_ref(), MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL, @@ -196,14 +237,18 @@ where #[derive(Clone, Debug)] /// An iterator over triples encoded with gaps, this is returned by [`GapsCodec`]. pub struct GapsIterator< - D: BitDeserializer = (), + E: Endianness = NE, + D: BitDeserializer> = (), const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, -> { +> where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, +{ /// Deserializer for the labels deserializer: D, /// Bitstream to read from - stream: BitReader, + stream: BitReader, /// Length of the iterator (number of triples) len: usize, /// Current position in the iterator @@ -214,13 +259,27 @@ pub struct GapsIterator< prev_dst: usize, } -unsafe impl, const SRC_CODE: usize, const DST_CODE: usize> - SortedIterator for GapsIterator +unsafe impl< + E: Endianness, + D: BitDeserializer>, + const SRC_CODE: usize, + const DST_CODE: usize, + > SortedIterator for GapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { } -impl, const SRC_CODE: usize, const DST_CODE: usize> Iterator - for GapsIterator +impl< + E: Endianness, + D: BitDeserializer>, + const SRC_CODE: usize, + const DST_CODE: usize, + > Iterator for GapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { type Item = ((usize, usize), D::DeserType); @@ -245,8 +304,15 @@ impl, const SRC_CODE: usize, const DST_CODE: u } } -impl, const SRC_CODE: usize, const DST_CODE: usize> - ExactSizeIterator for GapsIterator +impl< + E: Endianness, + D: BitDeserializer>, + const SRC_CODE: usize, + const DST_CODE: usize, + > ExactSizeIterator for GapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { fn len(&self) -> usize { self.len - self.current diff --git a/webgraph/src/utils/batch_codec/grouped_gaps.rs b/webgraph/src/utils/batch_codec/grouped_gaps.rs index 8c70d0d4..fda103bb 100644 --- a/webgraph/src/utils/batch_codec/grouped_gaps.rs +++ b/webgraph/src/utils/batch_codec/grouped_gaps.rs @@ -19,7 +19,7 @@ use dsi_bitstream::prelude::*; use mmap_rs::MmapFlags; use rdst::*; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] /// A codec for encoding and decoding batches of triples using grouped gap compression. /// /// This codec encodes triples of the form `(src, dst, label)` by grouping edges with the same source node, @@ -94,27 +94,76 @@ use rdst::*; /// so the recommended defaults are `Gamma` for src gaps, `ExpGolomb3` for outdegree, and `Delta` for dst gaps, /// as they are universal codes. pub struct GroupedGapsCodec< - S: BitSerializer = (), - D: BitDeserializer + Clone = (), + E: Endianness = BE, + S: BitSerializer> = (), + D: BitDeserializer, DeserType = S::SerType> + Clone = (), const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 }, const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA }, -> { +> where + BitReader: BitRead, + BitWriter: BitWrite, +{ /// Serializer for the labels pub serializer: S, /// Deserializer for the labels pub deserializer: D, + + pub _marker: core::marker::PhantomData, } -impl BatchCodec - for GroupedGapsCodec +impl + GroupedGapsCodec where - S: BitSerializer + Send + Sync, - D: BitDeserializer + Send + Sync + Clone, + E: Endianness, + S: BitSerializer> + Send + Sync, + D: BitDeserializer, DeserType = S::SerType> + Send + Sync + Clone, + BitReader: BitRead, + BitWriter: BitWrite, +{ + /// Creates a new `GroupedGapsCodec` with the given serializer and deserializer. + pub fn new(serializer: S, deserializer: D) -> Self { + Self { + serializer, + deserializer, + _marker: core::marker::PhantomData, + } + } +} + +impl< + E: Endianness, + S: BitSerializer> + Default, + D: BitDeserializer, DeserType = S::SerType> + Clone + Default, + const OUTDEGREE_CODE: usize, + const SRC_CODE: usize, + const DST_CODE: usize, + > Default for GroupedGapsCodec +where + BitReader: BitRead, + BitWriter: BitWrite, +{ + fn default() -> Self { + Self { + serializer: S::default(), + deserializer: D::default(), + _marker: core::marker::PhantomData, + } + } +} + +impl BatchCodec + for GroupedGapsCodec +where + E: Endianness, + S: BitSerializer> + Send + Sync, + D: BitDeserializer, DeserType = S::SerType> + Send + Sync + Clone, S::SerType: Send + Sync + Copy + 'static, // needed by radix sort + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { type Label = S::SerType; - type DecodedBatch = GroupedGapsIterator; + type DecodedBatch = GroupedGapsIterator; fn encode_batch( &self, @@ -145,7 +194,7 @@ where })?, ); // create a bitstream to write to the file - let mut stream = >::new(>::new(file)); + let mut stream = >::new(>::new(file)); // prefix the stream with the length of the batch // we use a delta code since it'll be a big number most of the time @@ -196,7 +245,7 @@ where fn decode_batch(&self, path: impl AsRef) -> Result { // open the file - let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( + let mut stream = >::new(MemWordReader::new(ArcMmapHelper(Arc::new( MmapHelper::mmap( path.as_ref(), MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL, @@ -223,15 +272,19 @@ where #[derive(Clone, Debug)] /// An iterator over triples encoded with gaps, this is returned by [`GroupedGapsCodec`]. pub struct GroupedGapsIterator< - D: BitDeserializer = (), + E: Endianness = NE, + D: BitDeserializer> = (), const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, -> { +> where + BitReader: BitRead, + BitWriter: BitWrite, +{ /// Deserializer for the labels deserializer: D, /// Bitstream to read from - stream: BitReader, + stream: BitReader, /// Length of the iterator (number of triples) len: usize, /// Current position in the iterator @@ -245,20 +298,28 @@ pub struct GroupedGapsIterator< } unsafe impl< - D: BitDeserializer, + E: Endianness, + D: BitDeserializer>, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize, - > SortedIterator for GroupedGapsIterator + > SortedIterator for GroupedGapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { } impl< - D: BitDeserializer, + E: Endianness, + D: BitDeserializer>, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize, - > Iterator for GroupedGapsIterator + > Iterator for GroupedGapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { type Item = ((usize, usize), D::DeserType); fn next(&mut self) -> Option { @@ -288,11 +349,15 @@ impl< } impl< - D: BitDeserializer, + E: Endianness, + D: BitDeserializer>, const OUTDEGREE_CODE: usize, const SRC_CODE: usize, const DST_CODE: usize, - > ExactSizeIterator for GroupedGapsIterator + > ExactSizeIterator for GroupedGapsIterator +where + BitReader: BitRead + CodesRead, + BitWriter: BitWrite + CodesWrite, { fn len(&self) -> usize { self.len - self.current diff --git a/webgraph/src/utils/batch_codec/mod.rs b/webgraph/src/utils/batch_codec/mod.rs index e66fd525..5c1f0f12 100644 --- a/webgraph/src/utils/batch_codec/mod.rs +++ b/webgraph/src/utils/batch_codec/mod.rs @@ -19,8 +19,8 @@ pub mod grouped_gaps; /// The recommended default batch codec for unlabelled batches. pub type DefaultBatchCodec = grouped_gaps::GroupedGapsCodec; -pub type BitWriter = BufBitWriter>>; -pub type BitReader = BufBitReader>>; +pub type BitWriter = BufBitWriter>>; +pub type BitReader = BufBitReader>>; /// A trait for encoding and decoding batches of sorted triples. pub trait BatchCodec: Send + Sync { diff --git a/webgraph/src/utils/sort_pairs.rs b/webgraph/src/utils/sort_pairs.rs index da201249..83a9cfe5 100644 --- a/webgraph/src/utils/sort_pairs.rs +++ b/webgraph/src/utils/sort_pairs.rs @@ -9,22 +9,13 @@ //! Facilities to sort externally pairs of nodes with an associated label. #![allow(clippy::non_canonical_partial_ord_impl)] -use super::ArcMmapHelper; use crate::{ traits::SortedIterator, utils::{BatchCodec, CodecIter, DefaultBatchCodec, MemoryUsage}, }; use anyhow::{anyhow, Context}; use dary_heap::PeekMut; -use dsi_bitstream::prelude::*; -use std::{ - fs::File, - io::BufWriter, - path::{Path, PathBuf}, -}; - -pub type BitWriter = BufBitWriter>>; -pub type BitReader = BufBitReader>>; +use std::path::{Path, PathBuf}; /// A struct that provides external sorting for pairs of nodes with an /// associated label. @@ -438,29 +429,46 @@ mod tests { use super::*; use crate::{ traits::{BitDeserializer, BitSerializer}, - utils::gaps::GapsCodec, + utils::{gaps::GapsCodec, BitReader, BitWriter}, }; + use dsi_bitstream::prelude::*; + + #[derive(Clone, Debug)] + struct MyDessert { + _marker: std::marker::PhantomData, + } - #[derive(Clone, Debug, Default)] - struct MyDessert; + impl Default for MyDessert { + fn default() -> Self { + MyDessert { + _marker: std::marker::PhantomData, + } + } + } - impl BitDeserializer for MyDessert { + impl BitDeserializer> for MyDessert + where + BitReader: BitRead + CodesRead, + { type DeserType = usize; fn deserialize( &self, - bitstream: &mut BitReader, - ) -> Result>::Error> { + bitstream: &mut BitReader, + ) -> Result as BitRead>::Error> { bitstream.read_delta().map(|x| x as usize) } } - impl BitSerializer for MyDessert { + impl BitSerializer> for MyDessert + where + BitWriter: BitWrite + CodesWrite, + { type SerType = usize; fn serialize( &self, value: &Self::SerType, - bitstream: &mut BitWriter, - ) -> Result>::Error> { + bitstream: &mut BitWriter, + ) -> Result as BitWrite>::Error> { bitstream.write_delta(*value as u64) } } @@ -473,7 +481,7 @@ mod tests { let mut sp = SortPairs::new_labeled( MemoryUsage::BatchSize(10), dir.path(), - GapsCodec::::default(), + GapsCodec::, MyDessert>::default(), )?; let n = 25; @@ -521,7 +529,7 @@ mod tests { let mut sp2 = SortPairs::new_labeled( MemoryUsage::BatchSize(5), dir2.path(), - GapsCodec::::default(), + GapsCodec::, MyDessert>::default(), )?; let labeled_pairs = vec![ diff --git a/webgraph/tests/test_transpose.rs b/webgraph/tests/test_transpose.rs index f56b8481..36777448 100644 --- a/webgraph/tests/test_transpose.rs +++ b/webgraph/tests/test_transpose.rs @@ -6,15 +6,15 @@ */ use dsi_bitstream::codes::{GammaRead, GammaWrite}; -use dsi_bitstream::traits::NE; use dsi_bitstream::traits::{BitRead, BitWrite}; +use dsi_bitstream::traits::{Endianness, BE}; use webgraph::graphs::vec_graph::LabeledVecGraph; use webgraph::prelude::{transpose, transpose_labeled, transpose_split}; use webgraph::traits::labels::SequentialLabeling; use webgraph::traits::{graph, BitDeserializer, BitSerializer}; use webgraph::utils::gaps::GapsCodec; -use webgraph::utils::sort_pairs::{BitReader, BitWriter}; use webgraph::utils::MemoryUsage; +use webgraph::utils::{BitReader, BitWriter}; #[test] fn test_transpose() -> anyhow::Result<()> { @@ -37,19 +37,19 @@ fn test_transpose_labeled() -> anyhow::Result<()> { #[derive(Clone, Copy, PartialEq, Debug)] struct Payload(f64); - #[derive(Clone, Copy, PartialEq, Debug)] - struct BD {} + #[derive(Clone, Copy, PartialEq, Debug, Default)] + struct BD; - impl BitDeserializer for BD + impl BitDeserializer> for BD where - BitReader: GammaRead, + BitReader: GammaRead, { type DeserType = Payload; fn deserialize( &self, - bitstream: &mut BitReader, - ) -> Result>::Error> { + bitstream: &mut BitReader, + ) -> Result as BitRead>::Error> { let mantissa = bitstream.read_gamma()?; let exponent = bitstream.read_gamma()?; let result = f64::from_bits((exponent << 53) | mantissa); @@ -57,20 +57,20 @@ fn test_transpose_labeled() -> anyhow::Result<()> { } } - #[derive(Clone, Copy, PartialEq, Debug)] - struct BS {} + #[derive(Clone, Copy, PartialEq, Debug, Default)] + struct BS; - impl BitSerializer for BS + impl BitSerializer> for BS where - BitWriter: GammaWrite, + BitWriter: GammaWrite, { type SerType = Payload; fn serialize( &self, value: &Self::SerType, - bitstream: &mut BitWriter, - ) -> Result>::Error> { + bitstream: &mut BitWriter, + ) -> Result as BitWrite>::Error> { let value = value.0.to_bits(); let mantissa = value & ((1 << 53) - 1); let exponent = value >> 53; @@ -92,20 +92,14 @@ fn test_transpose_labeled() -> anyhow::Result<()> { let trans = transpose_labeled( &g, MemoryUsage::BatchSize(3), - GapsCodec::<_, _> { - serializer: BS {}, - deserializer: BD {}, - }, + GapsCodec::::new(BS::default(), BD::default()), )?; let g2 = LabeledVecGraph::::from_lender(trans.iter()); let trans = transpose_labeled( &g2, MemoryUsage::BatchSize(3), - GapsCodec::<_, _> { - serializer: BS {}, - deserializer: BD {}, - }, + GapsCodec::::new(BS::default(), BD::default()), )?; let g3 = LabeledVecGraph::::from_lender(trans.iter()); From c64056634167a223579f803df71138ae8ed1f6f7 Mon Sep 17 00:00:00 2001 From: Sebastiano Vigna Date: Mon, 13 Oct 2025 09:39:39 +0200 Subject: [PATCH 3/4] Docs review --- webgraph/src/utils/batch_codec/gaps.rs | 41 +---------- .../src/utils/batch_codec/grouped_gaps.rs | 71 ++++--------------- webgraph/src/utils/batch_codec/mod.rs | 53 ++++++++++---- 3 files changed, 55 insertions(+), 110 deletions(-) diff --git a/webgraph/src/utils/batch_codec/gaps.rs b/webgraph/src/utils/batch_codec/gaps.rs index e12eadc3..91f3581b 100644 --- a/webgraph/src/utils/batch_codec/gaps.rs +++ b/webgraph/src/utils/batch_codec/gaps.rs @@ -27,17 +27,15 @@ use rdst::*; /// This codec encodes triples of the form `(src, dst, label)` by encoding the /// gaps between consecutive sources and destinations using a specified code. /// -/// ## Type Parameters +/// # Type Parameters +/// /// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type. /// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type. /// - `SRC_CODE`: Code used for encoding source gaps (default: gamma). /// - `DST_CODE`: Code used for encoding destination gaps (default: gamma). /// -/// ## Fields -/// - `serializer`: The label serializer. -/// - `deserializer`: The label deserializer. +/// # Encoding Format /// -/// ## Encoding Format /// 1. The batch length is written using delta coding. /// 2. For each group of triples with the same source: /// - The gap from the previous source is encoded. @@ -47,39 +45,6 @@ use rdst::*; /// The bit deserializer must be [`Clone`] because we need one for each /// [`GapsIterator`], and there are possible scenarios in which the /// deserializer might be stateful. -/// -/// ## Choosing the codes -/// -/// These are the top 10 codes for src and dst gaps when transposing `enwiki-2024`. -/// ```ignore -/// Src codes: -/// Code: Unary Size: 179553432 -/// Code: Golomb(1) Size: 179553432 -/// Code: Rice(0) Size: 179553432 -/// Code: Gamma Size: 185374984 -/// Code: Zeta(1) Size: 185374984 -/// Code: ExpGolomb(0) Size: 185374984 -/// Code: Omega Size: 185439656 -/// Code: Delta Size: 191544794 -/// Code: Golomb(2) Size: 345986198 -/// Code: Rice(1) Size: 345986198 -/// Dst codes: -/// Code: Pi(2) Size: 2063880685 -/// Code: Pi(3) Size: 2074138948 -/// Code: Zeta(3) Size: 2122730298 -/// Code: Zeta(4) Size: 2123948774 -/// Code: Zeta(5) Size: 2169131998 -/// Code: Pi(4) Size: 2176097847 -/// Code: Zeta(2) Size: 2226573622 -/// Code: Zeta(6) Size: 2237680403 -/// Code: Delta Size: 2272691460 -/// Code: Zeta(7) Size: 2305354857 -/// ``` -/// -/// So the best combination is `Unary` for src gaps and `Pi(2)` for dst gaps. -/// But, `Unary` can behave poorly if the distribution of your data changes, -/// therefore the recommended default is `Gamma` for src gaps and `Delta` for -/// dst gaps as they are universal codes. pub struct GapsCodec< E: Endianness = NE, S: BitSerializer> = (), diff --git a/webgraph/src/utils/batch_codec/grouped_gaps.rs b/webgraph/src/utils/batch_codec/grouped_gaps.rs index fda103bb..fa37f38c 100644 --- a/webgraph/src/utils/batch_codec/grouped_gaps.rs +++ b/webgraph/src/utils/batch_codec/grouped_gaps.rs @@ -22,22 +22,21 @@ use rdst::*; #[derive(Clone, Debug)] /// A codec for encoding and decoding batches of triples using grouped gap compression. /// -/// This codec encodes triples of the form `(src, dst, label)` by grouping edges with the same source node, -/// and encoding the gaps between consecutive sources and destinations using a specified code (default: gamma). -/// The outdegree (number of edges for each source) is also encoded using the specified code. +/// This codec encodes triples of the form `(src, dst, label)` by grouping edges +/// with the same source node, and encoding the gaps between consecutive sources +/// and destinations using a specified code (default: gamma). The outdegree +/// (number of edges for each source) is also encoded using the specified code. +/// +/// # Type Parameters /// -/// ## Type Parameters /// - `S`: Serializer for the labels, implementing [`BitSerializer`] for the label type. /// - `D`: Deserializer for the labels, implementing [`BitDeserializer`] for the label type. -/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: gamma). -/// - `SRC_CODE`: Code used for encoding source gaps (default: gamma). -/// - `DST_CODE`: Code used for encoding destination gaps (default: gamma). +/// - `OUTDEGREE_CODE`: Code used for encoding outdegrees (default: [ɣ](dsi_bitstream::codes::gamma)). +/// - `SRC_CODE`: Code used for encoding source gaps (default: [ɣ](dsi_bitstream::codes::gamma)). +/// - `DST_CODE`: Code used for encoding destination gaps (default: [ɣ](dsi_bitstream::codes::gamma)). /// -/// ## Fields -/// - `serializer`: The label serializer. -/// - `deserializer`: The label deserializer. +/// # Encoding Format /// -/// ## Encoding Format /// 1. The batch length is written using delta coding. /// 2. For each group of triples with the same source: /// - The gap from the previous source is encoded. @@ -49,52 +48,8 @@ use rdst::*; /// The bit deserializer must be [`Clone`] because we need one for each /// [`GroupedGapsIterator`], and there are possible scenarios in which the /// deserializer might be stateful. -/// -/// ## Choosing the codes -/// -/// When transposing `enwiki-2024`, these are the top 10 codes for src gaps, outdegree, and dst gaps: -/// ```ignore -/// Outdegree stats -/// Code: ExpGolomb(3) Size: 34004796 -/// Code: ExpGolomb(2) Size: 34101784 -/// Code: ExpGolomb(4) Size: 36036394 -/// Code: Zeta(2) Size: 36231582 -/// Code: ExpGolomb(1) Size: 36369750 -/// Code: Zeta(3) Size: 36893285 -/// Code: Pi(2) Size: 37415701 -/// Code: Zeta(4) Size: 38905267 -/// Code: Golomb(20) Size: 38963840 -/// Code: Golomb(19) Size: 39118201 -/// Src stats -/// Code: Golomb(2) Size: 12929998 -/// Code: Rice(1) Size: 12929998 -/// Code: Unary Size: 13025332 -/// Code: Golomb(1) Size: 13025332 -/// Code: Rice(0) Size: 13025332 -/// Code: ExpGolomb(1) Size: 13319930 -/// Code: Golomb(4) Size: 18732384 -/// Code: Rice(2) Size: 18732384 -/// Code: Golomb(3) Size: 18736573 -/// Code: ExpGolomb(2) Size: 18746122 -/// Dst stats -/// Code: Pi(2) Size: 2063880685 -/// Code: Pi(3) Size: 2074138948 -/// Code: Zeta(3) Size: 2122730298 -/// Code: Zeta(4) Size: 2123948774 -/// Code: Zeta(5) Size: 2169131998 -/// Code: Pi(4) Size: 2176097847 -/// Code: Zeta(2) Size: 2226573622 -/// Code: Zeta(6) Size: 2237680403 -/// Code: Delta Size: 2272691460 -/// Code: Zeta(7) Size: 2305354857 -/// ``` -/// -/// The best codes are `Golomb(2)` for src gaps, `ExpGolomb(3)` for outdegree, and `Pi(2)` for dst gaps. -/// However, `Golomb` can perform poorly if the data don't follow the expected distribution, -/// so the recommended defaults are `Gamma` for src gaps, `ExpGolomb3` for outdegree, and `Delta` for dst gaps, -/// as they are universal codes. pub struct GroupedGapsCodec< - E: Endianness = BE, + E: Endianness = NE, S: BitSerializer> = (), D: BitDeserializer, DeserType = S::SerType> + Clone = (), const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 }, @@ -104,9 +59,9 @@ pub struct GroupedGapsCodec< BitReader: BitRead, BitWriter: BitWrite, { - /// Serializer for the labels + /// Serializer for the labels. pub serializer: S, - /// Deserializer for the labels + /// Deserializer for the labels. pub deserializer: D, pub _marker: core::marker::PhantomData, diff --git a/webgraph/src/utils/batch_codec/mod.rs b/webgraph/src/utils/batch_codec/mod.rs index 5c1f0f12..ac6ecddd 100644 --- a/webgraph/src/utils/batch_codec/mod.rs +++ b/webgraph/src/utils/batch_codec/mod.rs @@ -4,6 +4,19 @@ * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later */ +//! Traits and implementations to encode and decode batches of sorted triples +//! to/from disk. +//! +//! The traits and implementations in this module are used to customize the +//! encoding of batches of sorted triples to/from disk. They are used by +//! [`SortPairs`](crate::utils::sort_pairs::SortPairs) and other utilities built +//! on that (e.g., +//! [`ParSortPairs`](crate::utils::par_sort_pairs::ParSortPairs)). +//! +//! They usually do not need to be accessed or modified by the end users, albeit +//! in some specific cases where performance or on-disk occupation is critical +//! they can be customized. + use anyhow::Result; use super::ArcMmapHelper; @@ -40,15 +53,19 @@ pub trait BatchCodec: Send + Sync { IntoIter: Send + Sync + Clone, >; - /// Given a batch of sorted triples, encodes them to disk and returns the number of bits written. + /// Given a batch of sorted triples, encodes them to disk and returns the + /// number of bits written. + /// + /// Note that the input batch must be already sorted. Use + /// [`encode_batch`](Self::encode_batch) otherwise. fn encode_sorted_batch( &self, path: impl AsRef, batch: &[((usize, usize), Self::Label)], ) -> Result; - /// Given a batch of triples, encodes them to disk and returns the number of bits written. - /// The batch needs a mutable reference to allow the coded to sort-in-place if needed. + /// Given a batch of triples, sort them, encodes them to disk, and returns + /// the number of bits written. fn encode_batch( &self, path: impl AsRef, @@ -56,11 +73,13 @@ pub trait BatchCodec: Send + Sync { ) -> Result; /// Decodes a batch of triples from disk. + /// /// The returned type's iterator yields the serialized triples in sorted order. fn decode_batch(&self, path: impl AsRef) -> Result; } -/// Convenience alias to extract the iterator type of the decoded batch from a [`BatchCodec`]. +/// Convenience alias to extract the iterator type of the decoded batch from a +/// [`BatchCodec`]. pub type CodecIter = <::DecodedBatch as IntoIterator>::IntoIter; /// An arc expressed as a pair of nodes and the associated label. @@ -68,27 +87,33 @@ pub type CodecIter = <::DecodedBatch as IntoIterator>::IntoI /// Equality and order are defined only (lexicographically) on the pair of /// nodes. /// -/// Since we use this to sort a batch of `(usize, usize, L)` triples, in order to -/// safely transmute between the two types, Triple HAS TO be `repr(transparent)` -/// of the same tuple type. +/// Since we use this to sort a batch of `(usize, usize, L)` triples, in order +/// to safely transmute between the two types, Triple has to be +/// `repr(transparent)` of the same tuple type. /// /// We use this to implement `RadixKey` for sorting batches of triples -/// using `rdst`. +/// using the [`rdst`](https://crates.io/crates/rdst) crate. #[derive(Clone, Copy, Debug)] #[repr(transparent)] pub struct Triple(((usize, usize), L)); impl Triple { - /// Converts a mutable batch of `((usize, usize), L)` triples into a mutable slice of `Triple`. + /// slice of `Triple`. /// - /// This is safe because `Triple` is `repr(transparent)` of the same tuple type. - pub fn cast_batch_mut(batch: &mut [((usize, usize), L)]) -> &mut [Triple] { + /// The conversion is safe because `Triple` is `repr(transparent)` of the + /// same tuple type. + pub fn cast_batch(batch: &[((usize, usize), L)]) -> &[Triple] { + // SAFETY: `Triple` is `repr(transparent)` of the same tuple type. unsafe { std::mem::transmute(batch) } } - /// Converts a batch of `((usize, usize), L)` triples into a slice of `Triple`. + + /// Converts a mutable reference to a slice of `((usize, usize), L)` triples + /// into a mutable reference to a slice of `Triple`. /// - /// This is safe because `Triple` is `repr(transparent)` of the same tuple type. - pub fn cast_batch(batch: &[((usize, usize), L)]) -> &[Triple] { + /// The conversion is safe because `Triple` is `repr(transparent)` of the + /// same tuple type. + pub fn cast_batch_mut(batch: &mut [((usize, usize), L)]) -> &mut [Triple] { + // SAFETY: `Triple` is `repr(transparent)` of the same tuple type. unsafe { std::mem::transmute(batch) } } } From 9e3a1f73014f3b5ebc9d8264a048afad9af5dcd5 Mon Sep 17 00:00:00 2001 From: Tommaso Fontana Date: Mon, 10 Nov 2025 18:33:15 +0100 Subject: [PATCH 4/4] BatchCodec print stats and encoding time --- webgraph/src/utils/batch_codec/gaps.rs | 58 ++++++++++++++--- .../src/utils/batch_codec/grouped_gaps.rs | 62 +++++++++++++++---- webgraph/src/utils/batch_codec/mod.rs | 9 ++- webgraph/src/utils/mod.rs | 16 +++++ webgraph/src/utils/sort_pairs.rs | 9 ++- 5 files changed, 128 insertions(+), 26 deletions(-) diff --git a/webgraph/src/utils/batch_codec/gaps.rs b/webgraph/src/utils/batch_codec/gaps.rs index 91f3581b..3f7831d0 100644 --- a/webgraph/src/utils/batch_codec/gaps.rs +++ b/webgraph/src/utils/batch_codec/gaps.rs @@ -11,7 +11,7 @@ use crate::traits::SortedIterator; use crate::utils::{ArcMmapHelper, MmapHelper, Triple}; use crate::{ traits::{BitDeserializer, BitSerializer}, - utils::BatchCodec, + utils::{humanize, BatchCodec}, }; use std::sync::Arc; @@ -95,6 +95,37 @@ where } } +#[derive(Debug, Clone, Copy)] +/// Statistics about the encoding performed by [`GapsCodec`]. +pub struct GapsStats { + /// Total number of triples encoded + pub total_triples: usize, + /// Number of bits used for source gaps + pub src_bits: usize, + //// Number of bits used for destination gaps + pub dst_bits: usize, + /// Number of bits used for labels + pub labels_bits: usize, +} + +impl core::fmt::Display for GapsStats { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let total_bits = self.src_bits + self.dst_bits + self.labels_bits; + write!( + f, + "src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc), total: {}B ({:.3} bits / arc)", + humanize(self.src_bits as f64 / 8.0), + self.src_bits as f64 / self.total_triples as f64, + humanize(self.dst_bits as f64 / 8.0), + self.dst_bits as f64 / self.total_triples as f64, + humanize(self.labels_bits as f64 / 8.0), + self.labels_bits as f64 / self.total_triples as f64, + humanize(total_bits as f64 / 8.0), + total_bits as f64 / self.total_triples as f64, + ) + } +} + impl BatchCodec for GapsCodec where @@ -107,12 +138,13 @@ where { type Label = S::SerType; type DecodedBatch = GapsIterator; + type EncodedBatchStats = GapsStats; fn encode_batch( &self, path: impl AsRef, batch: &mut [((usize, usize), Self::Label)], - ) -> Result { + ) -> Result<(usize, Self::EncodedBatchStats)> { let start = std::time::Instant::now(); Triple::cast_batch_mut(batch).radix_sort_unstable(); log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed()); @@ -123,7 +155,7 @@ where &self, path: impl AsRef, batch: &[((usize, usize), Self::Label)], - ) -> Result { + ) -> Result<(usize, Self::EncodedBatchStats)> { debug_assert!(Triple::cast_batch(batch).is_sorted()); // create a batch file where to dump let file_path = path.as_ref(); @@ -145,12 +177,17 @@ where .write_delta(batch.len() as u64) .context("Could not write length")?; - // dump the triples to the bitstream + let mut stats = GapsStats { + total_triples: batch.len(), + src_bits: 0, + dst_bits: 0, + labels_bits: 0, + }; + // dump the triples to the bitstrea let (mut prev_src, mut prev_dst) = (0, 0); - let mut written_bits = 0; for ((src, dst), label) in batch.iter() { // write the source gap as gamma - written_bits += ConstCode:: + stats.src_bits += ConstCode:: .write(&mut stream, (src - prev_src) as u64) .with_context(|| format!("Could not write {src} after {prev_src}"))?; if *src != prev_src { @@ -158,20 +195,21 @@ where prev_dst = 0; } // write the destination gap as gamma - written_bits += ConstCode:: + stats.dst_bits += ConstCode:: .write(&mut stream, (dst - prev_dst) as u64) .with_context(|| format!("Could not write {dst} after {prev_dst}"))?; // write the label - written_bits += self + stats.labels_bits += self .serializer .serialize(label, &mut stream) .context("Could not serialize label")?; (prev_src, prev_dst) = (*src, *dst); } // flush the stream and reset the buffer - written_bits += stream.flush().context("Could not flush stream")?; + stream.flush().context("Could not flush stream")?; - Ok(written_bits) + let total_bits = stats.src_bits + stats.dst_bits + stats.labels_bits; + Ok((total_bits, stats)) } fn decode_batch(&self, path: impl AsRef) -> Result { diff --git a/webgraph/src/utils/batch_codec/grouped_gaps.rs b/webgraph/src/utils/batch_codec/grouped_gaps.rs index fa37f38c..430c7b8f 100644 --- a/webgraph/src/utils/batch_codec/grouped_gaps.rs +++ b/webgraph/src/utils/batch_codec/grouped_gaps.rs @@ -9,7 +9,7 @@ use crate::traits::SortedIterator; use crate::utils::{ArcMmapHelper, MmapHelper, Triple}; use crate::{ traits::{BitDeserializer, BitSerializer}, - utils::BatchCodec, + utils::{humanize, BatchCodec}, }; use std::sync::Arc; @@ -52,7 +52,7 @@ pub struct GroupedGapsCodec< E: Endianness = NE, S: BitSerializer> = (), D: BitDeserializer, DeserType = S::SerType> + Clone = (), - const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::EXP_GOLOMB3 }, + const OUTDEGREE_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const SRC_CODE: usize = { dsi_bitstream::dispatch::code_consts::GAMMA }, const DST_CODE: usize = { dsi_bitstream::dispatch::code_consts::DELTA }, > where @@ -107,6 +107,38 @@ where } } +#[derive(Debug, Clone, Copy)] +/// Statistics about the encoding performed by [`GapsCodec`]. +pub struct GroupedGapsStats { + /// Total number of triples encoded + pub total_triples: usize, + /// Number of bits used for outdegrees + pub outdegree_bits: usize, + /// Number of bits used for source gaps + pub src_bits: usize, + //// Number of bits used for destination gaps + pub dst_bits: usize, + /// Number of bits used for labels + pub labels_bits: usize, +} + +impl core::fmt::Display for GroupedGapsStats { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "outdegree: {}B ({:.3} bits / arc), src: {}B ({:.3} bits / arc), dst: {}B ({:.3} bits / arc), labels: {}B ({:.3} bits / arc)", + humanize(self.outdegree_bits as f64 / 8.0), + self.outdegree_bits as f64 / self.total_triples as f64, + humanize(self.src_bits as f64 / 8.0), + self.src_bits as f64 / self.total_triples as f64, + humanize(self.dst_bits as f64 / 8.0), + self.dst_bits as f64 / self.total_triples as f64, + humanize(self.labels_bits as f64 / 8.0), + self.labels_bits as f64 / self.total_triples as f64, + ) + } +} + impl BatchCodec for GroupedGapsCodec where @@ -119,12 +151,13 @@ where { type Label = S::SerType; type DecodedBatch = GroupedGapsIterator; + type EncodedBatchStats = GroupedGapsStats; fn encode_batch( &self, path: impl AsRef, batch: &mut [((usize, usize), Self::Label)], - ) -> Result { + ) -> Result<(usize, Self::EncodedBatchStats)> { let start = std::time::Instant::now(); Triple::cast_batch_mut(batch).radix_sort_unstable(); log::debug!("Sorted {} arcs in {:?}", batch.len(), start.elapsed()); @@ -135,7 +168,7 @@ where &self, path: impl AsRef, batch: &[((usize, usize), Self::Label)], - ) -> Result { + ) -> Result<(usize, Self::EncodedBatchStats)> { debug_assert!(Triple::cast_batch(batch).is_sorted(), "Batch is not sorted"); // create a batch file where to dump let file_path = path.as_ref(); @@ -157,20 +190,26 @@ where .write_delta(batch.len() as u64) .context("Could not write length")?; + let mut stats = GroupedGapsStats { + total_triples: batch.len(), + outdegree_bits: 0, + src_bits: 0, + dst_bits: 0, + labels_bits: 0, + }; // dump the triples to the bitstream let mut prev_src = 0; - let mut written_bits = 0; let mut i = 0; while i < batch.len() { let ((src, _), _) = batch[i]; // write the source gap as gamma - written_bits += ConstCode:: + stats.src_bits += ConstCode:: .write(&mut stream, (src - prev_src) as _) .with_context(|| format!("Could not write {src} after {prev_src}"))?; // figure out how many edges have this source let outdegree = batch[i..].iter().take_while(|t| t.0 .0 == src).count(); // write the outdegree - written_bits += ConstCode:: + stats.outdegree_bits += ConstCode:: .write(&mut stream, outdegree as _) .with_context(|| format!("Could not write outdegree {outdegree} for {src}"))?; @@ -179,11 +218,11 @@ where for _ in 0..outdegree { let ((_, dst), label) = &batch[i]; // write the destination gap as gamma - written_bits += ConstCode:: + stats.dst_bits += ConstCode:: .write(&mut stream, (dst - prev_dst) as _) .with_context(|| format!("Could not write {dst} after {prev_dst}"))?; // write the label - written_bits += self + stats.labels_bits += self .serializer .serialize(label, &mut stream) .context("Could not serialize label")?; @@ -193,9 +232,10 @@ where prev_src = src; } // flush the stream and reset the buffer - written_bits += stream.flush().context("Could not flush stream")?; + stream.flush().context("Could not flush stream")?; - Ok(written_bits) + let total_bits = stats.outdegree_bits + stats.src_bits + stats.dst_bits + stats.labels_bits; + Ok((total_bits, stats)) } fn decode_batch(&self, path: impl AsRef) -> Result { diff --git a/webgraph/src/utils/batch_codec/mod.rs b/webgraph/src/utils/batch_codec/mod.rs index ac6ecddd..e65870b2 100644 --- a/webgraph/src/utils/batch_codec/mod.rs +++ b/webgraph/src/utils/batch_codec/mod.rs @@ -20,6 +20,7 @@ use anyhow::Result; use super::ArcMmapHelper; +use core::fmt::Display; use dsi_bitstream::prelude::*; use rdst::*; use std::fs::File; @@ -53,6 +54,10 @@ pub trait BatchCodec: Send + Sync { IntoIter: Send + Sync + Clone, >; + /// A type representing statistics about the encoded batch. + /// This type has to implement `Display` so that we can log it. + type EncodedBatchStats: Display; + /// Given a batch of sorted triples, encodes them to disk and returns the /// number of bits written. /// @@ -62,7 +67,7 @@ pub trait BatchCodec: Send + Sync { &self, path: impl AsRef, batch: &[((usize, usize), Self::Label)], - ) -> Result; + ) -> Result<(usize, Self::EncodedBatchStats)>; /// Given a batch of triples, sort them, encodes them to disk, and returns /// the number of bits written. @@ -70,7 +75,7 @@ pub trait BatchCodec: Send + Sync { &self, path: impl AsRef, batch: &mut [((usize, usize), Self::Label)], - ) -> Result; + ) -> Result<(usize, Self::EncodedBatchStats)>; /// Decodes a batch of triples from disk. /// diff --git a/webgraph/src/utils/mod.rs b/webgraph/src/utils/mod.rs index 993e939b..e761978b 100644 --- a/webgraph/src/utils/mod.rs +++ b/webgraph/src/utils/mod.rs @@ -218,6 +218,22 @@ impl MemoryUsage { } } +/// Writes a human-readable representation of a large number using SI prefixes units. +pub fn humanize(value: f64) -> String { + const UNITS: &[&str] = &["", "K", "M", "G", "T", "P", "E"]; + let mut v = value; + let mut unit: usize = 0; + while v >= 1000.0 && unit + 1 < UNITS.len() { + v /= 1000.0; + unit += 1; + } + if unit == 0 { + format!("{:.0}{}", v, UNITS[unit]) + } else { + format!("{:.3}{}", v, UNITS[unit]) + } +} + /// A structure holding partition iterators and their boundaries. /// /// This type holds a list of iterators and a list of boundaries, one more diff --git a/webgraph/src/utils/sort_pairs.rs b/webgraph/src/utils/sort_pairs.rs index 83a9cfe5..8f957b9f 100644 --- a/webgraph/src/utils/sort_pairs.rs +++ b/webgraph/src/utils/sort_pairs.rs @@ -164,13 +164,16 @@ impl SortPairs { } let batch_path = self.tmp_dir.join(format!("{:06x}", self.num_batches)); - let bit_size = self.batch_codec.encode_batch(batch_path, &mut self.batch)?; + let start = std::time::Instant::now(); + let (bit_size, stats) = self.batch_codec.encode_batch(batch_path, &mut self.batch)?; log::info!( - "Dumped batch {} with {} arcs ({} bits, {:.2} bits / arc)", + "Dumped batch {} with {} arcs ({} bits, {:.2} bits / arc) in {:.3} seconds, stats: {}", self.num_batches, self.batch.len(), bit_size, - bit_size as f64 / self.batch.len() as f64 + bit_size as f64 / self.batch.len() as f64, + start.elapsed().as_secs_f64(), + stats ); self.last_batch_len = self.batch.len(); self.batch.clear();