Skip to content

Commit

Permalink
SDK DataLoaders 7: stateful file logging (#5379)
Browse files Browse the repository at this point in the history
This makes the `log_file` APIs behave more like the standard `log` APIs;
i.e. they inherit the state of their associated `RecordingStream`
(app_id, rec_id, timepoint, etc...).

Also inherit the application ID while we're at it.

Makes the API much nicer to use _and_ much more consistent with the
rest.

Checks:
- [x] external loader ran manually (`python loader | rerun`)
- [x] external loader via rerun (`rerun xxx.py`)
- [x] log_file with external loader (`log_file xxx.py`)
- [x] external loader ran manually (`loader | rerun`)
- [x] external loader via rerun (`rerun xxx.rs`)
- [x] log_file with external loader (`log_file xxx.rs`)

---

Part of series of PR to expose configurable `DataLoader`s to our SDKs:
- #5327 
- #5328 
- #5330
- #5337
- #5351
- #5355
- #5379
- #5361
- #5388

---------

Co-authored-by: Andreas Reich <andreas@rerun.io>
  • Loading branch information
teh-cmc and Wumpf committed Mar 4, 2024
1 parent a037b41 commit 7b314d8
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 91 deletions.
29 changes: 26 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};
/// The loader is free to ignore some or all of these.
///
/// External [`DataLoader`]s will be passed the following CLI parameters:
/// * `--application-id <application_id>`
/// * `--opened-application-id <opened_application_id>` (if set)
/// * `--recording-id <store_id>`
/// * `--opened-recording-id <opened_store_id>` (if set)
/// * `--entity-path-prefix <entity_path_prefix>` (if set)
Expand All @@ -19,13 +21,21 @@ use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};
/// * `--sequence <timeline1>=<seq1> <timeline2>=<seq2> ...` (if `timepoint` contains sequence data)
#[derive(Debug, Clone)]
pub struct DataLoaderSettings {
/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
pub store_id: re_log_types::StoreId,
/// The recommended [`re_log_types::ApplicationId`] to log the data to, based on the surrounding context.
pub application_id: Option<re_log_types::ApplicationId>,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
/// The [`re_log_types::ApplicationId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_application_id: Option<re_log_types::ApplicationId>,

/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
///
/// Log data to this recording if you want it to appear in a new recording shared by all
/// data-loaders for the current loading session.
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,
Expand All @@ -41,6 +51,8 @@ impl DataLoaderSettings {
#[inline]
pub fn recommended(store_id: impl Into<re_log_types::StoreId>) -> Self {
Self {
application_id: Default::default(),
opened_application_id: Default::default(),
store_id: store_id.into(),
opened_store_id: Default::default(),
entity_path_prefix: Default::default(),
Expand All @@ -51,6 +63,8 @@ impl DataLoaderSettings {
/// Generates CLI flags from these settings, for external data loaders.
pub fn to_cli_args(&self) -> Vec<String> {
let Self {
application_id,
opened_application_id,
store_id,
opened_store_id,
entity_path_prefix,
Expand All @@ -59,8 +73,17 @@ impl DataLoaderSettings {

let mut args = Vec::new();

if let Some(application_id) = application_id {
args.extend(["--application-id".to_owned(), format!("{application_id}")]);
}
args.extend(["--recording-id".to_owned(), format!("{store_id}")]);

if let Some(opened_application_id) = opened_application_id {
args.extend([
"--opened-application-id".to_owned(),
format!("{opened_application_id}"),
]);
}
if let Some(opened_store_id) = opened_store_id {
args.extend([
"--opened-recording-id".to_owned(),
Expand Down
51 changes: 44 additions & 7 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,10 +1095,11 @@ impl RecordingStream {
#[cfg(feature = "data_loaders")]
pub fn log_file_from_path(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
entity_path_prefix: Option<EntityPath>,
timeless: bool,
) -> RecordingStreamResult<()> {
self.log_file(settings, filepath, None)
self.log_file(filepath, None, entity_path_prefix, timeless)
}

/// Logs the given `contents` using all [`re_data_source::DataLoader`]s available.
Expand All @@ -1112,20 +1113,27 @@ impl RecordingStream {
#[cfg(feature = "data_loaders")]
pub fn log_file_from_contents(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
contents: std::borrow::Cow<'_, [u8]>,
entity_path_prefix: Option<EntityPath>,
timeless: bool,
) -> RecordingStreamResult<()> {
self.log_file(settings, filepath, Some(contents))
self.log_file(filepath, Some(contents), entity_path_prefix, timeless)
}

#[cfg(feature = "data_loaders")]
fn log_file(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
entity_path_prefix: Option<EntityPath>,
timeless: bool,
) -> RecordingStreamResult<()> {
let Some(store_info) = self.store_info().clone() else {
re_log::warn!("Ignored call to log_file() because RecordingStream has not been properly initialized");
return Ok(());
};

let filepath = filepath.as_ref();
let has_contents = contents.is_some();

Expand All @@ -1134,16 +1142,45 @@ impl RecordingStream {
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let settings = crate::DataLoaderSettings {
application_id: Some(store_info.application_id.clone()),
opened_application_id: None,
store_id: store_info.store_id,
opened_store_id: None,
entity_path_prefix,
timepoint: (!timeless).then(|| {
self.with(|inner| {
// Get the current time on all timelines, for the current recording, on the current
// thread…
let mut now = self.now();

// …and then also inject the current recording tick into it.
let tick = inner
.tick
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
now.insert(Timeline::log_tick(), tick.into());

now
})
.unwrap_or_default()
}), // timepoint: self.time,
};

if let Some(contents) = contents {
re_data_source::load_from_file_contents(
settings,
&settings,
re_log_types::FileSource::Sdk,
filepath,
contents,
&tx,
)?;
} else {
re_data_source::load_from_path(settings, re_log_types::FileSource::Sdk, filepath, &tx)?;
re_data_source::load_from_path(
&settings,
re_log_types::FileSource::Sdk,
filepath,
&tx,
)?;
}
drop(tx);

Expand Down
28 changes: 4 additions & 24 deletions crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,20 +734,10 @@ fn rr_log_file_from_path_impl(
) -> Result<(), CError> {
let stream = recording_stream(stream)?;

let recording_id = stream
.store_info()
.ok_or_else(|| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
&format!("Couldn't load file {filepath:?}: no recording available"),
)
})?
.store_id;
let settings = re_sdk::DataLoaderSettings::recommended(recording_id);

let filepath = filepath.as_str("filepath")?;
stream
.log_file_from_path(&settings, filepath)
// TODO(cmc): expose settings
.log_file_from_path(filepath, None, true)
.map_err(|err| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
Expand Down Expand Up @@ -779,22 +769,12 @@ fn rr_log_file_from_contents_impl(
) -> Result<(), CError> {
let stream = recording_stream(stream)?;

let recording_id = stream
.store_info()
.ok_or_else(|| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
&format!("Couldn't load file {filepath:?}: no recording available"),
)
})?
.store_id;
let settings = re_sdk::DataLoaderSettings::recommended(recording_id);

let filepath = filepath.as_str("filepath")?;
let contents = contents.as_bytes("contents")?;

stream
.log_file_from_contents(&settings, filepath, std::borrow::Cow::Borrowed(contents))
// TODO(cmc): expose settings
.log_file_from_contents(filepath, std::borrow::Cow::Borrowed(contents), None, true)
.map_err(|err| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
Expand Down
8 changes: 6 additions & 2 deletions examples/python/external_data_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"""
)
parser.add_argument("filepath", type=str)
parser.add_argument("--application-id", type=str, help="optional recommended ID for the application")
parser.add_argument("--recording-id", type=str, help="optional recommended ID for the recording")
parser.add_argument("--entity-path-prefix", type=str, help="optional prefix for all entity paths")
parser.add_argument(
Expand Down Expand Up @@ -59,7 +60,10 @@ def main() -> None:
if not is_file or not is_python_file:
exit(rr.EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE)

rr.init("rerun_example_external_data_loader", recording_id=args.recording_id)
app_id = "rerun_example_external_data_loader"
if args.application_id is not None:
app_id = args.application_id
rr.init(app_id, recording_id=args.recording_id)
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
rr.stdout()

Expand All @@ -85,7 +89,7 @@ def set_time_from_args() -> None:
timeline_name, time = parts
rr.set_time_seconds(timeline_name, float(time))

for time_str in args.time:
for time_str in args.sequence:
parts = time_str.split("=")
if len(parts) != 2:
continue
Expand Down
10 changes: 9 additions & 1 deletion examples/rust/external_data_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ struct Args {
#[argh(positional)]
filepath: std::path::PathBuf,

/// optional recommended ID for the application
#[argh(option)]
application_id: Option<String>,

/// optional recommended ID for the recording
#[argh(option)]
recording_id: Option<String>,
Expand Down Expand Up @@ -74,7 +78,11 @@ fn main() -> anyhow::Result<()> {
let text = format!("## Some Rust code\n```rust\n{body}\n```\n");

let rec = {
let mut rec = rerun::RecordingStreamBuilder::new("rerun_example_external_data_loader");
let mut rec = rerun::RecordingStreamBuilder::new(
args.application_id
.as_deref()
.unwrap_or("rerun_example_external_data_loader"),
);
if let Some(recording_id) = args.recording_id.as_ref() {
rec = rec.recording_id(recording_id);
};
Expand Down
7 changes: 5 additions & 2 deletions examples/rust/log_file/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,23 @@ fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
let mut settings = rerun::DataLoaderSettings::recommended(rec.store_info().unwrap().store_id);
settings.entity_path_prefix = Some("log_file_example".into());

let prefix = Some("log_file_example".into());

for filepath in &args.filepaths {
let filepath = filepath.as_path();

if !args.from_contents {
// Either log the file using its path…
rec.log_file_from_path(&settings, filepath)?;
rec.log_file_from_path(filepath, prefix.clone(), true /* timeless */)?;
} else {
// …or using its contents if you already have them loaded for some reason.
if filepath.is_file() {
let contents = std::fs::read(filepath)?;
rec.log_file_from_contents(
&settings,
filepath,
std::borrow::Cow::Borrowed(&contents),
prefix.clone(),
true, /* timeless */
)?;
}
}
Expand Down
16 changes: 3 additions & 13 deletions rerun_py/rerun_sdk/rerun/_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,8 @@ def log_components(
def log_file_from_path(
file_path: str | Path,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
timeless: bool = False,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -308,14 +307,11 @@ def log_file_from_path(
file_path:
Path to the file to be logged.
recording_id:
The recommended `RecordingId` to log the data to.
entity_path_prefix:
What should the logged entity paths be prefixed with?
timeless:
Should the logged data be timeless?
Should the logged data be timeless? (default: False)
recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
Expand All @@ -326,7 +322,6 @@ def log_file_from_path(

bindings.log_file_from_path(
Path(file_path),
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
Expand All @@ -339,7 +334,6 @@ def log_file_from_contents(
file_path: str | Path,
file_contents: bytes,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
Expand All @@ -362,14 +356,11 @@ def log_file_from_contents(
file_contents:
Contents to be logged.
recording_id:
The recommended `RecordingId` to log the data to.
entity_path_prefix:
What should the logged entity paths be prefixed with?
timeless:
Should the logged data be timeless?
Should the logged data be timeless? (default: False)
recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
Expand All @@ -381,7 +372,6 @@ def log_file_from_contents(
bindings.log_file_from_contents(
Path(file_path),
file_contents,
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
Expand Down

0 comments on commit 7b314d8

Please sign in to comment.