Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose timings in verbose state of OOC sort #14979

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions crates/polars-pipe/src/executors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod slice;
mod sort;
mod utils;

use std::sync::OnceLock;

pub(crate) use joins::*;
pub(crate) use ordered::*;
#[cfg(any(
Expand All @@ -26,15 +28,16 @@ pub(crate) use sort::*;
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
const HASHMAP_INIT_SIZE: usize = 64;

pub(crate) static POLARS_TEMP_DIR: &str = "POLARS_TEMP_DIR";

pub(crate) fn get_base_temp_dir() -> String {
let base_dir = std::env::var(POLARS_TEMP_DIR)
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());
pub(crate) static POLARS_TEMP_DIR: OnceLock<String> = OnceLock::new();

if polars_core::config::verbose() {
eprintln!("Temporary directory path in use: {}", base_dir);
}
pub(crate) fn get_base_temp_dir() -> &'static str {
POLARS_TEMP_DIR.get_or_init(|| {
let tmp = std::env::var("POLARS_TEMP_DIR")
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());

base_dir
if polars_core::config::verbose() {
eprintln!("Temporary directory path in use: {}", &tmp);
}
tmp
})
}
13 changes: 11 additions & 2 deletions crates/polars-pipe/src/executors/sinks/sort/ooc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::Path;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Instant;

use crossbeam_queue::SegQueue;
use polars_core::prelude::*;
Expand Down Expand Up @@ -104,6 +105,7 @@ impl PartitionSpiller {
}
}

#[allow(clippy::too_many_arguments)]
pub(super) fn sort_ooc(
io_thread: IOThread,
// these partitions are the samples
Expand All @@ -114,7 +116,9 @@ pub(super) fn sort_ooc(
slice: Option<(i64, usize)>,
verbose: bool,
memtrack: MemTracker,
ooc_start: Instant,
) -> PolarsResult<FinalizedSink> {
let now = Instant::now();
let samples = samples.to_physical_repr().into_owned();
// Try to use available memory. At least 32MB per spill.
let spill_size = std::cmp::max(memtrack.get_available() / (samples.len() * 2), 1 << 25) as u64;
Expand Down Expand Up @@ -154,9 +158,12 @@ pub(super) fn sort_ooc(
PolarsResult::Ok(())
})
})?;
if verbose {
eprintln!("partitioning sort took: {:?}", now.elapsed());
}
partitions_spiller.spill_all(&io_thread);
if verbose {
eprintln!("finished partitioning sort files");
eprintln!("spilling all partitioned files took: {:?}", now.elapsed());
}

let files = std::fs::read_dir(dir)?
Expand All @@ -176,7 +183,9 @@ pub(super) fn sort_ooc(
})
.collect::<std::io::Result<Vec<_>>>()?;

let source = SortSource::new(files, idx, descending, slice, verbose, io_thread, memtrack);
let source = SortSource::new(
files, idx, descending, slice, verbose, io_thread, memtrack, ooc_start,
);
Ok(FinalizedSink::Source(Box::new(source)))
}

Expand Down
14 changes: 14 additions & 0 deletions crates/polars-pipe/src/executors/sinks/sort/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::any::Any;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use polars_core::config::verbose;
use polars_core::error::PolarsResult;
Expand Down Expand Up @@ -35,6 +36,8 @@ pub struct SortSink {
current_chunk_rows: usize,
// total bytes of tables in current chunks
current_chunks_size: usize,
// Start time of OOC phase.
ooc_start: Option<Instant>,
}

impl SortSink {
Expand All @@ -54,6 +57,7 @@ impl SortSink {
dist_sample: vec![],
current_chunk_rows: 0,
current_chunks_size: 0,
ooc_start: None,
};
if ooc {
eprintln!("OOC sort forced");
Expand All @@ -66,6 +70,7 @@ impl SortSink {
if verbose() {
eprintln!("OOC sort started");
}
self.ooc_start = Some(Instant::now());
self.ooc = true;

// start IO thread
Expand Down Expand Up @@ -161,6 +166,7 @@ impl Sink for SortSink {
dist_sample: vec![],
current_chunk_rows: 0,
current_chunks_size: 0,
ooc_start: self.ooc_start,
})
}

Expand All @@ -179,7 +185,14 @@ impl Sink for SortSink {
maintain_order: self.sort_args.maintain_order,
});

let instant = self.ooc_start.unwrap();
if context.verbose {
eprintln!("finished sinking into OOC sort in {:?}", instant.elapsed());
}
block_thread_until_io_thread_done(&io_thread);
if context.verbose {
eprintln!("full file dump of OOC sort took {:?}", instant.elapsed());
}

sort_ooc(
io_thread,
Expand All @@ -189,6 +202,7 @@ impl Sink for SortSink {
self.sort_args.slice,
context.verbose,
self.mem_track.clone(),
instant,
)
} else {
let chunks = std::mem::take(&mut self.chunks);
Expand Down
19 changes: 17 additions & 2 deletions crates/polars-pipe/src/executors/sinks/sort/source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::time::Instant;

use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
Expand All @@ -22,9 +23,14 @@ pub struct SortSource {
finished: bool,
io_thread: IOThread,
memtrack: MemTracker,
// Start of the Source phase
source_start: Instant,
// Start of the OOC sort operation.
ooc_start: Instant,
}

impl SortSource {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
mut files: Vec<(u32, PathBuf)>,
sort_idx: usize,
Expand All @@ -33,6 +39,7 @@ impl SortSource {
verbose: bool,
io_thread: IOThread,
memtrack: MemTracker,
ooc_start: Instant,
) -> Self {
if verbose {
eprintln!("started sort source phase");
Expand All @@ -53,6 +60,8 @@ impl SortSource {
finished: false,
io_thread,
memtrack,
source_start: Instant::now(),
ooc_start,
}
}
fn finish_batch(&mut self, dfs: Vec<DataFrame>) -> Vec<DataChunk> {
Expand All @@ -70,14 +79,20 @@ impl SortSource {
}

impl Source for SortSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult<SourceResult> {
// early return
if self.finished {
return Ok(SourceResult::Finished);
}

match self.files.next() {
None => Ok(SourceResult::Finished),
None => {
if context.verbose {
eprintln!("sort source phase took: {:?}", self.source_start.elapsed());
eprintln!("full ooc sort took: {:?}", self.ooc_start.elapsed());
}
Ok(SourceResult::Finished)
},
Some((_, mut path)) => {
let limit = self.memtrack.get_available() / 3;

Expand Down
Loading