From 33f1d74b3e9da8ccfd9d92b4bf8c1c3b0b6a54e7 Mon Sep 17 00:00:00 2001 From: ritchie Date: Mon, 11 Mar 2024 09:30:15 +0100 Subject: [PATCH] feat: expose timings in verbose state of OOC sort --- crates/polars-pipe/src/executors/sinks/mod.rs | 21 +++++++++++-------- .../src/executors/sinks/sort/ooc.rs | 13 ++++++++++-- .../src/executors/sinks/sort/sink.rs | 14 +++++++++++++ .../src/executors/sinks/sort/source.rs | 19 +++++++++++++++-- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/mod.rs b/crates/polars-pipe/src/executors/sinks/mod.rs index d783e5728fed..86d46eb3082c 100644 --- a/crates/polars-pipe/src/executors/sinks/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/mod.rs @@ -9,6 +9,8 @@ mod slice; mod sort; mod utils; +use std::sync::OnceLock; + pub(crate) use joins::*; pub(crate) use ordered::*; #[cfg(any( @@ -26,15 +28,16 @@ pub(crate) use sort::*; // Overallocation seems a lot more expensive than resizing so we start reasonable small. const HASHMAP_INIT_SIZE: usize = 64; -pub(crate) static POLARS_TEMP_DIR: &str = "POLARS_TEMP_DIR"; - -pub(crate) fn get_base_temp_dir() -> String { - let base_dir = std::env::var(POLARS_TEMP_DIR) - .unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned()); +pub(crate) static POLARS_TEMP_DIR: OnceLock = OnceLock::new(); - if polars_core::config::verbose() { - eprintln!("Temporary directory path in use: {}", base_dir); - } +pub(crate) fn get_base_temp_dir() -> &'static str { + POLARS_TEMP_DIR.get_or_init(|| { + let tmp = std::env::var("POLARS_TEMP_DIR") + .unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned()); - base_dir + if polars_core::config::verbose() { + eprintln!("Temporary directory path in use: {}", &tmp); + } + tmp + }) } diff --git a/crates/polars-pipe/src/executors/sinks/sort/ooc.rs b/crates/polars-pipe/src/executors/sinks/sort/ooc.rs index a90a1827bb07..7f7210b50b87 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/ooc.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/ooc.rs @@ -1,5 +1,6 @@ use std::path::Path; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::time::Instant; use crossbeam_queue::SegQueue; use polars_core::prelude::*; @@ -104,6 +105,7 @@ impl PartitionSpiller { } } +#[allow(clippy::too_many_arguments)] pub(super) fn sort_ooc( io_thread: IOThread, // these partitions are the samples @@ -114,7 +116,9 @@ pub(super) fn sort_ooc( slice: Option<(i64, usize)>, verbose: bool, memtrack: MemTracker, + ooc_start: Instant, ) -> PolarsResult { + let now = Instant::now(); let samples = samples.to_physical_repr().into_owned(); // Try to use available memory. At least 32MB per spill. let spill_size = std::cmp::max(memtrack.get_available() / (samples.len() * 2), 1 << 25) as u64; @@ -154,9 +158,12 @@ pub(super) fn sort_ooc( PolarsResult::Ok(()) }) })?; + if verbose { + eprintln!("partitioning sort took: {:?}", now.elapsed()); + } partitions_spiller.spill_all(&io_thread); if verbose { - eprintln!("finished partitioning sort files"); + eprintln!("spilling all partitioned files took: {:?}", now.elapsed()); } let files = std::fs::read_dir(dir)? @@ -176,7 +183,9 @@ pub(super) fn sort_ooc( }) .collect::>>()?; - let source = SortSource::new(files, idx, descending, slice, verbose, io_thread, memtrack); + let source = SortSource::new( + files, idx, descending, slice, verbose, io_thread, memtrack, ooc_start, + ); Ok(FinalizedSink::Source(Box::new(source))) } diff --git a/crates/polars-pipe/src/executors/sinks/sort/sink.rs b/crates/polars-pipe/src/executors/sinks/sort/sink.rs index ac0109eed18e..1f0962b1031d 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/sink.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/sink.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::sync::{Arc, RwLock}; +use std::time::Instant; use polars_core::config::verbose; use polars_core::error::PolarsResult; @@ -35,6 +36,8 @@ pub struct SortSink { current_chunk_rows: usize, // total bytes of tables in current chunks current_chunks_size: usize, + // Start time of OOC phase. + ooc_start: Option, } impl SortSink { @@ -54,6 +57,7 @@ impl SortSink { dist_sample: vec![], current_chunk_rows: 0, current_chunks_size: 0, + ooc_start: None, }; if ooc { eprintln!("OOC sort forced"); @@ -66,6 +70,7 @@ impl SortSink { if verbose() { eprintln!("OOC sort started"); } + self.ooc_start = Some(Instant::now()); self.ooc = true; // start IO thread @@ -161,6 +166,7 @@ impl Sink for SortSink { dist_sample: vec![], current_chunk_rows: 0, current_chunks_size: 0, + ooc_start: self.ooc_start, }) } @@ -179,7 +185,14 @@ impl Sink for SortSink { maintain_order: self.sort_args.maintain_order, }); + let instant = self.ooc_start.unwrap(); + if context.verbose { + eprintln!("finished sinking into OOC sort in {:?}", instant.elapsed()); + } block_thread_until_io_thread_done(&io_thread); + if context.verbose { + eprintln!("full file dump of OOC sort took {:?}", instant.elapsed()); + } sort_ooc( io_thread, @@ -189,6 +202,7 @@ impl Sink for SortSink { self.sort_args.slice, context.verbose, self.mem_track.clone(), + instant, ) } else { let chunks = std::mem::take(&mut self.chunks); diff --git a/crates/polars-pipe/src/executors/sinks/sort/source.rs b/crates/polars-pipe/src/executors/sinks/sort/source.rs index 79c0bc436567..09b4ad50d1c5 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/source.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/source.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::time::Instant; use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df}; @@ -22,9 +23,14 @@ pub struct SortSource { finished: bool, io_thread: IOThread, memtrack: MemTracker, + // Start of the Source phase + source_start: Instant, + // Start of the OOC sort operation. + ooc_start: Instant, } impl SortSource { + #[allow(clippy::too_many_arguments)] pub(super) fn new( mut files: Vec<(u32, PathBuf)>, sort_idx: usize, @@ -33,6 +39,7 @@ impl SortSource { verbose: bool, io_thread: IOThread, memtrack: MemTracker, + ooc_start: Instant, ) -> Self { if verbose { eprintln!("started sort source phase"); @@ -53,6 +60,8 @@ impl SortSource { finished: false, io_thread, memtrack, + source_start: Instant::now(), + ooc_start, } } fn finish_batch(&mut self, dfs: Vec) -> Vec { @@ -70,14 +79,20 @@ impl SortSource { } impl Source for SortSource { - fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { + fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult { // early return if self.finished { return Ok(SourceResult::Finished); } match self.files.next() { - None => Ok(SourceResult::Finished), + None => { + if context.verbose { + eprintln!("sort source phase took: {:?}", self.source_start.elapsed()); + eprintln!("full ooc sort took: {:?}", self.ooc_start.elapsed()); + } + Ok(SourceResult::Finished) + }, Some((_, mut path)) => { let limit = self.memtrack.get_available() / 3;