Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK DataLoaders 7: stateful file logging #5379

Merged
merged 7 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 26 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};
/// The loader is free to ignore some or all of these.
///
/// External [`DataLoader`]s will be passed the following CLI parameters:
/// * `--application-id <application_id>`
/// * `--opened-application-id <opened_application_id>` (if set)
/// * `--recording-id <store_id>`
/// * `--opened-recording-id <opened_store_id>` (if set)
/// * `--entity-path-prefix <entity_path_prefix>` (if set)
Expand All @@ -19,13 +21,21 @@ use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};
/// * `--sequence <timeline1>=<seq1> <timeline2>=<seq2> ...` (if `timepoint` contains sequence data)
#[derive(Debug, Clone)]
pub struct DataLoaderSettings {
/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
pub store_id: re_log_types::StoreId,
/// The recommended [`re_log_types::ApplicationId`] to log the data to, based on the surrounding context.
pub application_id: Option<re_log_types::ApplicationId>,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
/// The [`re_log_types::ApplicationId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_application_id: Option<re_log_types::ApplicationId>,

/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
///
/// Log data to this recording if you want it to appear in a new recording shared by all
/// data-loaders for the current loading session.
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,
Expand All @@ -41,6 +51,8 @@ impl DataLoaderSettings {
#[inline]
pub fn recommended(store_id: impl Into<re_log_types::StoreId>) -> Self {
Self {
application_id: Default::default(),
opened_application_id: Default::default(),
store_id: store_id.into(),
opened_store_id: Default::default(),
entity_path_prefix: Default::default(),
Expand All @@ -51,6 +63,8 @@ impl DataLoaderSettings {
/// Generates CLI flags from these settings, for external data loaders.
pub fn to_cli_args(&self) -> Vec<String> {
let Self {
application_id,
opened_application_id,
store_id,
opened_store_id,
entity_path_prefix,
Expand All @@ -59,8 +73,17 @@ impl DataLoaderSettings {

let mut args = Vec::new();

if let Some(application_id) = application_id {
args.extend(["--application-id".to_owned(), format!("{application_id}")]);
}
args.extend(["--recording-id".to_owned(), format!("{store_id}")]);

if let Some(opened_application_id) = opened_application_id {
args.extend([
"--opened-application-id".to_owned(),
format!("{opened_application_id}"),
]);
}
if let Some(opened_store_id) = opened_store_id {
args.extend([
"--opened-recording-id".to_owned(),
Expand Down
51 changes: 44 additions & 7 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,10 +1095,11 @@ impl RecordingStream {
#[cfg(feature = "data_loaders")]
pub fn log_file_from_path(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
entity_path_prefix: Option<EntityPath>,
Comment on lines 1098 to +1099
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not first put the entity path like with all other logging functions 🤔

edit: right, because that messes up languages with default arguments

timeless: bool,
) -> RecordingStreamResult<()> {
self.log_file(settings, filepath, None)
self.log_file(filepath, None, entity_path_prefix, timeless)
}

/// Logs the given `contents` using all [`re_data_source::DataLoader`]s available.
Expand All @@ -1112,20 +1113,27 @@ impl RecordingStream {
#[cfg(feature = "data_loaders")]
pub fn log_file_from_contents(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
contents: std::borrow::Cow<'_, [u8]>,
entity_path_prefix: Option<EntityPath>,
timeless: bool,
) -> RecordingStreamResult<()> {
self.log_file(settings, filepath, Some(contents))
self.log_file(filepath, Some(contents), entity_path_prefix, timeless)
}

#[cfg(feature = "data_loaders")]
fn log_file(
&self,
settings: &re_data_source::DataLoaderSettings,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
entity_path_prefix: Option<EntityPath>,
timeless: bool,
) -> RecordingStreamResult<()> {
let Some(store_info) = self.store_info().clone() else {
re_log::warn!("Ignored call to log_file() because RecordingStream has not been properly initialized");
return Ok(());
};

let filepath = filepath.as_ref();
let has_contents = contents.is_some();

Expand All @@ -1134,16 +1142,45 @@ impl RecordingStream {
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let settings = crate::DataLoaderSettings {
application_id: Some(store_info.application_id.clone()),
opened_application_id: None,
store_id: store_info.store_id,
opened_store_id: None,
entity_path_prefix,
timepoint: (!timeless).then(|| {
self.with(|inner| {
// Get the current time on all timelines, for the current recording, on the current
// thread…
let mut now = self.now();

// …and then also inject the current recording tick into it.
let tick = inner
.tick
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
now.insert(Timeline::log_tick(), tick.into());

now
})
.unwrap_or_default()
}), // timepoint: self.time,
};

if let Some(contents) = contents {
re_data_source::load_from_file_contents(
settings,
&settings,
re_log_types::FileSource::Sdk,
filepath,
contents,
&tx,
)?;
} else {
re_data_source::load_from_path(settings, re_log_types::FileSource::Sdk, filepath, &tx)?;
re_data_source::load_from_path(
&settings,
re_log_types::FileSource::Sdk,
filepath,
&tx,
)?;
}
drop(tx);

Expand Down
28 changes: 4 additions & 24 deletions crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,20 +734,10 @@ fn rr_log_file_from_path_impl(
) -> Result<(), CError> {
let stream = recording_stream(stream)?;

let recording_id = stream
.store_info()
.ok_or_else(|| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
&format!("Couldn't load file {filepath:?}: no recording available"),
)
})?
.store_id;
let settings = re_sdk::DataLoaderSettings::recommended(recording_id);

let filepath = filepath.as_str("filepath")?;
stream
.log_file_from_path(&settings, filepath)
// TODO(cmc): expose settings
.log_file_from_path(filepath, None, true)
.map_err(|err| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
Expand Down Expand Up @@ -779,22 +769,12 @@ fn rr_log_file_from_contents_impl(
) -> Result<(), CError> {
let stream = recording_stream(stream)?;

let recording_id = stream
.store_info()
.ok_or_else(|| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
&format!("Couldn't load file {filepath:?}: no recording available"),
)
})?
.store_id;
let settings = re_sdk::DataLoaderSettings::recommended(recording_id);

let filepath = filepath.as_str("filepath")?;
let contents = contents.as_bytes("contents")?;

stream
.log_file_from_contents(&settings, filepath, std::borrow::Cow::Borrowed(contents))
// TODO(cmc): expose settings
.log_file_from_contents(filepath, std::borrow::Cow::Borrowed(contents), None, true)
.map_err(|err| {
CError::new(
CErrorCode::RecordingStreamRuntimeFailure,
Expand Down
8 changes: 6 additions & 2 deletions examples/python/external_data_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"""
)
parser.add_argument("filepath", type=str)
parser.add_argument("--application-id", type=str, help="optional recommended ID for the application")
parser.add_argument("--recording-id", type=str, help="optional recommended ID for the recording")
parser.add_argument("--entity-path-prefix", type=str, help="optional prefix for all entity paths")
parser.add_argument(
Expand Down Expand Up @@ -59,7 +60,10 @@ def main() -> None:
if not is_file or not is_python_file:
exit(rr.EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE)

rr.init("rerun_example_external_data_loader", recording_id=args.recording_id)
app_id = "rerun_example_external_data_loader"
if args.application_id is not None:
app_id = args.application_id
rr.init(app_id, recording_id=args.recording_id)
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
rr.stdout()

Expand All @@ -85,7 +89,7 @@ def set_time_from_args() -> None:
timeline_name, time = parts
rr.set_time_seconds(timeline_name, float(time))

for time_str in args.time:
for time_str in args.sequence:
parts = time_str.split("=")
if len(parts) != 2:
continue
Expand Down
10 changes: 9 additions & 1 deletion examples/rust/external_data_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ struct Args {
#[argh(positional)]
filepath: std::path::PathBuf,

/// optional recommended ID for the application
#[argh(option)]
application_id: Option<String>,

/// optional recommended ID for the recording
#[argh(option)]
recording_id: Option<String>,
Expand Down Expand Up @@ -74,7 +78,11 @@ fn main() -> anyhow::Result<()> {
let text = format!("## Some Rust code\n```rust\n{body}\n```\n");

let rec = {
let mut rec = rerun::RecordingStreamBuilder::new("rerun_example_external_data_loader");
let mut rec = rerun::RecordingStreamBuilder::new(
args.application_id
.as_deref()
.unwrap_or("rerun_example_external_data_loader"),
);
if let Some(recording_id) = args.recording_id.as_ref() {
rec = rec.recording_id(recording_id);
};
Expand Down
7 changes: 5 additions & 2 deletions examples/rust/log_file/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,23 @@ fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {
let mut settings = rerun::DataLoaderSettings::recommended(rec.store_info().unwrap().store_id);
settings.entity_path_prefix = Some("log_file_example".into());

let prefix = Some("log_file_example".into());

for filepath in &args.filepaths {
let filepath = filepath.as_path();

if !args.from_contents {
// Either log the file using its path…
rec.log_file_from_path(&settings, filepath)?;
rec.log_file_from_path(filepath, prefix.clone(), true /* timeless */)?;
} else {
// …or using its contents if you already have them loaded for some reason.
if filepath.is_file() {
let contents = std::fs::read(filepath)?;
rec.log_file_from_contents(
&settings,
filepath,
std::borrow::Cow::Borrowed(&contents),
prefix.clone(),
true, /* timeless */
)?;
}
}
Expand Down
16 changes: 3 additions & 13 deletions rerun_py/rerun_sdk/rerun/_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,8 @@ def log_components(
def log_file_from_path(
file_path: str | Path,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
timeless: bool = False,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -308,14 +307,11 @@ def log_file_from_path(
file_path:
Path to the file to be logged.

recording_id:
The recommended `RecordingId` to log the data to.

entity_path_prefix:
What should the logged entity paths be prefixed with?

timeless:
Should the logged data be timeless?
Should the logged data be timeless? (default: False)

recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
Expand All @@ -326,7 +322,6 @@ def log_file_from_path(

bindings.log_file_from_path(
Path(file_path),
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
Expand All @@ -339,7 +334,6 @@ def log_file_from_contents(
file_path: str | Path,
file_contents: bytes,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
Expand All @@ -362,14 +356,11 @@ def log_file_from_contents(
file_contents:
Contents to be logged.

recording_id:
The recommended `RecordingId` to log the data to.

entity_path_prefix:
What should the logged entity paths be prefixed with?

timeless:
Should the logged data be timeless?
Should the logged data be timeless? (default: False)

recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
Expand All @@ -381,7 +372,6 @@ def log_file_from_contents(
bindings.log_file_from_contents(
Path(file_path),
file_contents,
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
Expand Down