Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor re_sdk::Session #1528

Merged
merged 37 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0c86719
Refactor: clean up Session slightly
emilk Mar 7, 2023
d426029
Simplify the API of RerunArgs
emilk Mar 7, 2023
7e9202f
CI: run `cargo check -p rerun --no-default-features --features sdk`
emilk Mar 7, 2023
326f069
Fix errors and warnings when using only the `sdk` feature of rerun
emilk Mar 7, 2023
1092f99
Make LogSink: Sync
emilk Mar 7, 2023
25623a3
Refactor: clean up the contents of Session
emilk Mar 7, 2023
5a9f46a
Move the tokio runtime out of Session
emilk Mar 7, 2023
7f20c0e
Clean up Session a bit
emilk Mar 7, 2023
c5919dc
Refactor sink names and modules
emilk Mar 7, 2023
1a07f7d
Clone Session as PythonSession
emilk Mar 7, 2023
67e616a
justfile: make sure our just-scripts use `set -euo pipefail`
emilk Mar 7, 2023
ad02d24
Add just rs-lint
emilk Mar 7, 2023
2ef7528
lint.py: white-list "./web_viewer/re_viewer_debug.js"
emilk Mar 7, 2023
0c73ba8
Run `typos` in `just lint`
emilk Mar 7, 2023
eb3d51b
Simplify `Session`, and add `SessionBuilder`
emilk Mar 7, 2023
4dcb345
Use --deny-warnings instead of setting RUSTFLAGS
emilk Mar 7, 2023
5a5e0fa
rust.yml: replace `-D` with more explicit `--deny`
emilk Mar 7, 2023
725d9d5
Make ahash a workspace dependency
emilk Mar 7, 2023
e4d30c8
Remove typos from just py-lint again since it runs on CI
emilk Mar 7, 2023
3a9f7d2
Make Session: Clone
emilk Mar 7, 2023
ec7b2f6
Make sure `Session` is `Send` and `Sync`
emilk Mar 7, 2023
9d8e3ac
Make `tracing` and `tracing-subscriber` workspace dependencies
emilk Mar 7, 2023
70bc204
Less `mut Session`
emilk Mar 7, 2023
de9c490
Remove lint of `dbg!` (clippy checks it now)
emilk Mar 7, 2023
41e1614
MsgSender::send can be used with both Session and LogSink
emilk Mar 7, 2023
ec2de06
bug fix
emilk Mar 7, 2023
f51b839
Cleanup
emilk Mar 7, 2023
c1c59d4
Add Session::sink to access the underlying sink
emilk Mar 8, 2023
16e6704
Document the built-in log level names
emilk Mar 8, 2023
b6a9d7a
Simplify TCP client by removing the `set_addr` method
emilk Mar 8, 2023
129f7b7
Update TcpClient documentation
emilk Mar 8, 2023
5545dc1
Use thiserror to report LogMsg encoding errors
emilk Mar 8, 2023
4ac05fd
Replace some more `anyhow` with `thiserror`
emilk Mar 8, 2023
1408470
fix copy-paste bug
emilk Mar 9, 2023
c069752
fix copy-paste bug
emilk Mar 9, 2023
718d0b3
Sleep longer
emilk Mar 9, 2023
8dfff29
Simplify code with SessionBuilder::buffered
emilk Mar 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ env:
# web_sys_unstable_apis is required to enable the web_sys clipboard API which egui_web uses
# https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Clipboard.html
# https://rustwasm.github.io/docs/wasm-bindgen/web-sys/unstable-apis.html
RUSTFLAGS: --cfg=web_sys_unstable_apis -D warnings
RUSTFLAGS: --cfg=web_sys_unstable_apis --deny warnings

# See https://github.com/ericseppanen/cargo-cranky/issues/8
RUSTDOCFLAGS: -D warnings -D rustdoc::missing_crate_level_docs
RUSTDOCFLAGS: --deny warnings --deny rustdoc::missing_crate_level_docs

permissions:
# deployments permission to deploy GitHub pages website
Expand Down Expand Up @@ -122,14 +122,21 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: cranky
args: --all-targets --all-features -- -D warnings
args: --all-targets --all-features -- --deny warnings

- name: Check no default features
uses: actions-rs/cargo@v1
with:
command: check
args: --locked --no-default-features --features __ci --lib

# Check a few important permutations of the feature flags for our `rerun` library:
- name: Check rerun with --features sdk
uses: actions-rs/cargo@v1
with:
command: check
args: --locked --no-default-features --features sdk

- name: Test doc-tests
uses: actions-rs/cargo@v1
with:
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ re_web_viewer_server = { path = "crates/re_web_viewer_server", version = "0.3.0"
re_ws_comms = { path = "crates/re_ws_comms", version = "0.3.0" }
rerun = { path = "crates/rerun", version = "0.3.0" }

ahash = "0.8"
anyhow = "1.0"
arrow2 = "0.16"
arrow2_convert = "0.4.2"
Expand Down Expand Up @@ -77,6 +78,8 @@ puffin = "0.14"
thiserror = "1.0"
time = { version = "0.3", features = ["wasm-bindgen"] }
tokio = "1.24"
tracing = "0.1"
tracing-subscriber = "0.3"
wgpu = { version = "0.15", default-features = false }
wgpu-core = { version = "0.15", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion crates/re_analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ web-sys = { version = "0.3.58", features = ["Storage"] }


[dev-dependencies]
tracing-subscriber = "0.3"
tracing-subscriber.workspace = true


[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ re_log_types.workspace = true
re_log.workspace = true

# External dependencies:
ahash = "0.8"
ahash.workspace = true
anyhow.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ re_log.workspace = true
re_smart_channel.workspace = true
re_string_interner.workspace = true

ahash = "0.8"
ahash.workspace = true
anyhow = "1.0"
document-features = "0.2"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions crates/re_log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ all-features = true

[dependencies]
log-once = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }

# web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ re_string_interner.workspace = true
re_tuid.workspace = true

# External
ahash = "0.8"
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2_convert.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions crates/re_log_types/src/component_types/text_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ use crate::msg_bundle::Component;
#[derive(Clone, Debug, ArrowField, ArrowSerialize, ArrowDeserialize, PartialEq, Eq)]
pub struct TextEntry {
pub body: String,

// Recommended to be one of:
// * `"CRITICAL"`
// * `"ERROR"`
// * `"WARN"`
// * `"INFO"`
// * `"DEBUG"`
// * `"TRACE"`
pub level: Option<String>,
}

Expand Down
37 changes: 25 additions & 12 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,75 @@ use crate::LogMsg;
#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
mod encoder {
use anyhow::Context as _;
use std::io::Write as _;

use crate::LogMsg;

/// On failure to encode or serialize a [`LogMsg`].
#[derive(thiserror::Error, Debug)]
pub enum EncodeError {
#[error("Failed to write: {0}")]
Write(std::io::Error),

#[error("Zstd error: {0}")]
Zstd(std::io::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::encode::Error),
}

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
zstd_encoder: zstd::stream::Encoder<'static, W>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> anyhow::Result<Self> {
pub fn new(mut write: W) -> Result<Self, EncodeError> {
let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION"));

write.write_all(b"RRF0").context("header")?;
write.write_all(b"RRF0").map_err(EncodeError::Write)?;
write
.write_all(&rerun_version.to_bytes())
.context("header")?;
.map_err(EncodeError::Write)?;

let level = 3;
let zstd_encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;
let zstd_encoder =
zstd::stream::Encoder::new(write, level).map_err(EncodeError::Zstd)?;

Ok(Self {
zstd_encoder,
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> anyhow::Result<()> {
pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> {
let Self {
zstd_encoder,
buffer,
} = self;

buffer.clear();
rmp_serde::encode::write_named(buffer, message).context("MessagePack encoding")?;
rmp_serde::encode::write_named(buffer, message)?;

zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
zstd_encoder.write_all(buffer).context("zstd write")?;
.map_err(EncodeError::Zstd)?;
zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?;

Ok(())
}

pub fn finish(self) -> anyhow::Result<()> {
self.zstd_encoder.finish().context("zstd finish")?;
pub fn finish(self) -> Result<(), EncodeError> {
self.zstd_encoder.finish().map_err(EncodeError::Zstd)?;
Ok(())
}
}

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
) -> anyhow::Result<()> {
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(message)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/re_memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ all-features = true
re_format.workspace = true
re_log.workspace = true

ahash = "0.8"
ahash.workspace = true
backtrace = { version = "0.3" }
emath.workspace = true
instant = { version = "0.1", features = ["wasm-bindgen"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/re_renderer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ serde = ["dep:serde"]
re_error.workspace = true
re_log.workspace = true

ahash = "0.8"
ahash.workspace = true
anyhow.workspace = true
bitflags = "1.3"
bytemuck = { version = "1.12", features = ["derive"] }
Expand Down Expand Up @@ -92,7 +92,7 @@ instant = { version = "0.1", features = ["wasm-bindgen"] }
log = "0.4"
pollster = "0.3"
rand = "0.8"
tracing = "0.1"
tracing.workspace = true
winit = "0.28.1"
zip = { version = "0.6", default-features = false, features = ["deflate"] }

Expand Down
16 changes: 3 additions & 13 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@ demo = []
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell", "dep:parking_lot"]
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

# Add a tokio runtime to the `Session` type.
tokio_runtime = ["dep:tokio"]


[dependencies]
re_build_info.workspace = true
re_error.workspace = true
re_log_types.workspace = true
re_log_types = { workspace = true, features = ["save"] }
re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }
Expand All @@ -50,18 +47,11 @@ crossbeam = "0.8"
document-features = "0.2"
lazy_static.workspace = true
nohash-hasher = "0.2"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }
parking_lot = { version = "0.12", optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, optional = true, features = [
"macros",
"rt-multi-thread",
] }


[dev-dependencies]
Expand Down
80 changes: 80 additions & 0 deletions crates/re_sdk/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{path::PathBuf, sync::mpsc::Sender};

use parking_lot::Mutex;

use re_log_types::LogMsg;

/// Errors that can occur when creating a [`FileSink`].
#[derive(thiserror::Error, Debug)]
pub enum FileSinkError {
/// Error creating the file.
#[error("Failed to create file {0}: {1}")]
CreateFile(PathBuf, std::io::Error),

/// Error spawning the file writer thread.
#[error("Failed to spawn thread: {0}")]
SpawnThread(std::io::Error),

/// Error encoding a log message.
#[error("Failed to encode LogMsg: {0}")]
LogMsgEncode(#[from] re_log_types::encoding::EncodeError),
}

/// Stream log messages to an `.rrd` file.
pub struct FileSink {
// None = quit
tx: Mutex<Sender<Option<LogMsg>>>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for FileSink {
fn drop(&mut self) {
self.tx.lock().send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl FileSink {
/// Start writing log messages to a file at the given path.
pub fn new(path: impl Into<std::path::PathBuf>) -> Result<Self, FileSinkError> {
let (tx, rx) = std::sync::mpsc::channel();

let path = path.into();

re_log::debug!("Saving file to {path:?}…");

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_types::encoding::Encoder::new(file)?;

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
if let Err(err) = encoder.finish() {
re_log::error!("Failed to save log stream to {path:?}: {err}");
} else {
re_log::debug!("Log stream saved to {path:?}");
}
})
.map_err(FileSinkError::SpawnThread)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
})
}
}

impl crate::sink::LogSink for FileSink {
fn send(&self, msg: LogMsg) {
self.tx.lock().send(Some(msg)).ok();
}
}
Loading