Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always flush when we remove a sink #1830

Merged
merged 4 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(get_recording_id, m)?)?;
m.add_function(wrap_pyfunction!(set_recording_id, m)?)?;

m.add_function(wrap_pyfunction!(init, m)?)?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add_function(wrap_pyfunction!(serve, m)?)?;
m.add_function(wrap_pyfunction!(shutdown, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(set_enabled, m)?)?;
m.add_function(wrap_pyfunction!(disconnect, m)?)?;
m.add_function(wrap_pyfunction!(flush, m)?)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new API here. The rest are sorting this block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorting ❤️

m.add_function(wrap_pyfunction!(init, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(save, m)?)?;
m.add_function(wrap_pyfunction!(serve, m)?)?;
m.add_function(wrap_pyfunction!(set_enabled, m)?)?;
m.add_function(wrap_pyfunction!(shutdown, m)?)?;

m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?;
m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?;
Expand Down Expand Up @@ -329,15 +330,10 @@ fn serve(open_browser: bool) -> PyResult<()> {

#[pyfunction]
fn shutdown(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
re_log::debug!("Shutting down the Rerun SDK");
let mut session = python_session();
session.drop_msgs_if_disconnected();
session.flush();
session.disconnect();
});
re_log::debug!("Shutting down the Rerun SDK");
// Disconnect the current sink which ensures that
// it flushes and cleans up.
disconnect(py);
}

/// Is logging enabled in the global session?
Expand All @@ -355,13 +351,27 @@ fn set_enabled(enabled: bool) {
python_session().set_enabled(enabled);
}

/// Block until outstanding data has been flushed to the sink
#[pyfunction]
fn flush(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
python_session().flush();
});
}

/// Disconnect from remote server (if any).
///
/// Subsequent log messages will be buffered and either sent on the next call to `connect`,
/// or shown with `show`.
#[pyfunction]
fn disconnect() {
python_session().disconnect();
fn disconnect(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
python_session().disconnect();
});
}

#[pyfunction]
Expand Down
21 changes: 15 additions & 6 deletions rerun_py/src/python_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,23 @@ impl PythonSession {
/// If the previous sink is [`rerun::sink::BufferedSink`] (the default),
/// it will be drained and sent to the new sink.
pub fn set_sink(&mut self, sink: Box<dyn LogSink>) {
// Capture the backlog (should only apply if this was a `BufferedSink`)
let backlog = self.sink.drain_backlog();

// Before changing the sink, we set drop_if_disconnected and
// flush. This ensures that any messages that are currently
// buffered will be sent.
self.sink.drop_msgs_if_disconnected();
self.sink.flush();
self.sink = sink;
self.sink.send_all(backlog);

if backlog.is_empty() {
// If we had no backlog, we need to send the `BeginRecording` message to the new sink.
self.has_sent_begin_recording_msg = false;
} else {
// Otherwise the backlog should have had the `BeginRecording` message in it already.
self.sink.send_all(backlog);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}

/// Send log data to a remote viewer/server.
Expand Down Expand Up @@ -193,11 +207,6 @@ impl PythonSession {
self.sink.flush();
}

/// If the tcp session is disconnected, allow it to quit early and drop unsent messages
pub fn drop_msgs_if_disconnected(&mut self) {
self.sink.drop_msgs_if_disconnected();
}

/// Send a single [`DataRow`].
pub fn send_row(&mut self, row: DataRow) -> PyResult<()> {
let msg = row
Expand Down