Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 14, 2023
1 parent b5fb9ec commit 6ccfe18
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 77 deletions.
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn build_table(n: usize, packed: bool) -> DataTable {
// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, columns, None).unwrap();
}

table
Expand Down
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/arrow_util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use ahash::HashSet;
use arrow2::{
array::{
growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray,
Expand Down
100 changes: 57 additions & 43 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};

use re_data_store::log_db::collect_datatypes;
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d},
DataCell, LogMsg, TimeInt, TimePoint, Timeline,
};

thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
}
Expand Down Expand Up @@ -48,18 +54,13 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, RecordingId, RowId};
use re_log_types::{entity_path, DataRow, DataTable, RecordingId, RowId, TableId};

fn main() {
log_messages();
}

fn log_messages() {
use re_log_types::{
datagen::{build_frame_nr, build_some_point2d},
LogMsg, TimeInt, TimePoint, Timeline,
};

// Note: we use Box in this function so that we also count the "static"
// part of all the data, i.e. its `std::mem::size_of`.

Expand Down Expand Up @@ -89,6 +90,7 @@ fn log_messages() {
bytes_used
}

const NUM_ROWS: usize = 100_000;
const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
Expand All @@ -104,55 +106,67 @@ fn log_messages() {
drop(entity_path);
}

{
fn arrow_payload(recording_id: RecordingId, num_rows: usize, num_points: usize, packed: bool) {
println!("--- {num_rows} rows each containing {num_points} points (packed={packed}) ---");
let used_bytes_start = live_bytes();
let table = Box::new(
DataRow::from_cells1(
RowId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
1,
build_some_point2d(1),
)
.into_table(),
);
let table = Box::new(create_table(num_rows, num_points, packed));
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
table.to_arrow_msg().unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
println!(
"Arrow payload containing {num_points}x Pos2 uses {} bytes in RAM",
re_format::format_bytes(table_bytes as _)
);
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
size_decoded(&encoded), encoded.len()
"Arrow LogMsg containing {num_points}x Pos2 uses {}-{} bytes in RAM, and {} bytes encoded",
re_format::format_bytes(size_decoded(&encoded) as _),
re_format::format_bytes(log_msg_bytes as _),
re_format::format_bytes(encoded.len() as _),
);
println!();
}

let num_rows = [1, NUM_ROWS];
let num_points = [1, NUM_POINTS];
let packed = [false, true];

for (num_rows, num_points, packed) in num_rows
.into_iter()
.flat_map(|num_row| std::iter::repeat(num_row).zip(num_points))
.flat_map(|num_row| std::iter::repeat(num_row).zip(packed))
.map(|((a, b), c)| (a, b, c))
{
let used_bytes_start = live_bytes();
let table = Box::new(
DataRow::from_cells1(
RowId::random(),
entity_path!("points"),
[build_frame_nr(0.into())],
NUM_POINTS as _,
build_some_point2d(NUM_POINTS),
)
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
table.to_arrow_msg().unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
println!(
"Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded",
size_decoded(&encoded), encoded.len()
);
arrow_payload(recording_id, num_rows, num_points, packed);
}
}

fn create_table(num_rows: usize, num_points: usize, packed: bool) -> DataTable {
let rows = (0..num_rows).map(|i| {
DataRow::from_cells1(
RowId::random(),
entity_path!("points"),
[build_frame_nr((i as i64).into())],
num_points as _,
build_some_point2d(num_points),
)
});
let mut table = DataTable::from_rows(TableId::random(), rows);

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();

let mut datatypes = Default::default();
for column in columns.arrays() {
collect_datatypes(&mut datatypes, &**column);
}

table = DataTable::deserialize(TableId::ZERO, &schema, columns, Some(&datatypes)).unwrap();
}

table
}
Loading

0 comments on commit 6ccfe18

Please sign in to comment.