Skip to content

Commit

Permalink
feat: expose timings in verbose state of OOC sort (#14979)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 11, 2024
1 parent 9ee7c9e commit 78dc628
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
21 changes: 12 additions & 9 deletions crates/polars-pipe/src/executors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod slice;
mod sort;
mod utils;

use std::sync::OnceLock;

pub(crate) use joins::*;
pub(crate) use ordered::*;
#[cfg(any(
Expand All @@ -26,15 +28,16 @@ pub(crate) use sort::*;
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
const HASHMAP_INIT_SIZE: usize = 64;

pub(crate) static POLARS_TEMP_DIR: &str = "POLARS_TEMP_DIR";

pub(crate) fn get_base_temp_dir() -> String {
let base_dir = std::env::var(POLARS_TEMP_DIR)
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());
pub(crate) static POLARS_TEMP_DIR: OnceLock<String> = OnceLock::new();

if polars_core::config::verbose() {
eprintln!("Temporary directory path in use: {}", base_dir);
}
pub(crate) fn get_base_temp_dir() -> &'static str {
POLARS_TEMP_DIR.get_or_init(|| {
let tmp = std::env::var("POLARS_TEMP_DIR")
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());

base_dir
if polars_core::config::verbose() {
eprintln!("Temporary directory path in use: {}", &tmp);
}
tmp
})
}
13 changes: 11 additions & 2 deletions crates/polars-pipe/src/executors/sinks/sort/ooc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::Path;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Instant;

use crossbeam_queue::SegQueue;
use polars_core::prelude::*;
Expand Down Expand Up @@ -104,6 +105,7 @@ impl PartitionSpiller {
}
}

#[allow(clippy::too_many_arguments)]
pub(super) fn sort_ooc(
io_thread: IOThread,
// these partitions are the samples
Expand All @@ -114,7 +116,9 @@ pub(super) fn sort_ooc(
slice: Option<(i64, usize)>,
verbose: bool,
memtrack: MemTracker,
ooc_start: Instant,
) -> PolarsResult<FinalizedSink> {
let now = Instant::now();
let samples = samples.to_physical_repr().into_owned();
// Try to use available memory. At least 32MB per spill.
let spill_size = std::cmp::max(
Expand Down Expand Up @@ -157,9 +161,12 @@ pub(super) fn sort_ooc(
PolarsResult::Ok(())
})
})?;
if verbose {
eprintln!("partitioning sort took: {:?}", now.elapsed());
}
partitions_spiller.spill_all(&io_thread);
if verbose {
eprintln!("finished partitioning sort files");
eprintln!("spilling all partitioned files took: {:?}", now.elapsed());
}

let files = std::fs::read_dir(dir)?
Expand All @@ -179,7 +186,9 @@ pub(super) fn sort_ooc(
})
.collect::<std::io::Result<Vec<_>>>()?;

let source = SortSource::new(files, idx, descending, slice, verbose, io_thread, memtrack);
let source = SortSource::new(
files, idx, descending, slice, verbose, io_thread, memtrack, ooc_start,
);
Ok(FinalizedSink::Source(Box::new(source)))
}

Expand Down
14 changes: 14 additions & 0 deletions crates/polars-pipe/src/executors/sinks/sort/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::any::Any;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use polars_core::config::verbose;
use polars_core::error::PolarsResult;
Expand Down Expand Up @@ -35,6 +36,8 @@ pub struct SortSink {
current_chunk_rows: usize,
// total bytes of tables in current chunks
current_chunks_size: usize,
// Start time of OOC phase.
ooc_start: Option<Instant>,
}

impl SortSink {
Expand All @@ -54,6 +57,7 @@ impl SortSink {
dist_sample: vec![],
current_chunk_rows: 0,
current_chunks_size: 0,
ooc_start: None,
};
if ooc {
eprintln!("OOC sort forced");
Expand All @@ -66,6 +70,7 @@ impl SortSink {
if verbose() {
eprintln!("OOC sort started");
}
self.ooc_start = Some(Instant::now());
self.ooc = true;

// start IO thread
Expand Down Expand Up @@ -161,6 +166,7 @@ impl Sink for SortSink {
dist_sample: vec![],
current_chunk_rows: 0,
current_chunks_size: 0,
ooc_start: self.ooc_start,
})
}

Expand All @@ -179,7 +185,14 @@ impl Sink for SortSink {
maintain_order: self.sort_args.maintain_order,
});

let instant = self.ooc_start.unwrap();
if context.verbose {
eprintln!("finished sinking into OOC sort in {:?}", instant.elapsed());
}
block_thread_until_io_thread_done(&io_thread);
if context.verbose {
eprintln!("full file dump of OOC sort took {:?}", instant.elapsed());
}

sort_ooc(
io_thread,
Expand All @@ -189,6 +202,7 @@ impl Sink for SortSink {
self.sort_args.slice,
context.verbose,
self.mem_track.clone(),
instant,
)
} else {
let chunks = std::mem::take(&mut self.chunks);
Expand Down
19 changes: 17 additions & 2 deletions crates/polars-pipe/src/executors/sinks/sort/source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::time::Instant;

use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
Expand All @@ -22,9 +23,14 @@ pub struct SortSource {
finished: bool,
io_thread: IOThread,
memtrack: MemTracker,
// Start of the Source phase
source_start: Instant,
// Start of the OOC sort operation.
ooc_start: Instant,
}

impl SortSource {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
mut files: Vec<(u32, PathBuf)>,
sort_idx: usize,
Expand All @@ -33,6 +39,7 @@ impl SortSource {
verbose: bool,
io_thread: IOThread,
memtrack: MemTracker,
ooc_start: Instant,
) -> Self {
if verbose {
eprintln!("started sort source phase");
Expand All @@ -53,6 +60,8 @@ impl SortSource {
finished: false,
io_thread,
memtrack,
source_start: Instant::now(),
ooc_start,
}
}
fn finish_batch(&mut self, dfs: Vec<DataFrame>) -> Vec<DataChunk> {
Expand All @@ -70,14 +79,20 @@ impl SortSource {
}

impl Source for SortSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult<SourceResult> {
// early return
if self.finished {
return Ok(SourceResult::Finished);
}

match self.files.next() {
None => Ok(SourceResult::Finished),
None => {
if context.verbose {
eprintln!("sort source phase took: {:?}", self.source_start.elapsed());
eprintln!("full ooc sort took: {:?}", self.ooc_start.elapsed());
}
Ok(SourceResult::Finished)
},
Some((_, mut path)) => {
let limit = self.memtrack.get_available() / 3;

Expand Down

0 comments on commit 78dc628

Please sign in to comment.