Skip to content

Commit

Permalink
minimal viable doc for Concurrent Fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Jan 21, 2024
1 parent 192ba53 commit abdf0bf
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions odbc-api/src/concurrent_block_cursor.rs
Expand Up @@ -6,6 +6,44 @@ use std::{

use crate::{buffers::ColumnarAnyBuffer, BlockCursor, Cursor, Error};

/// A wrapper around block cursors which fetches data in a dedicated system thread. Intended to
/// fetch data batch by batch while the application processes the batch last fetched. Works best
/// with a double buffer strategy using two fetch buffers.
///
/// # Example
///
/// ```no_run
/// use odbc_api::{
/// Environment, buffers::{ColumnarAnyBuffer, BufferDesc}, Cursor, ConcurrentBlockCursor
/// };
/// use std::sync::OnceLock;
///
/// // We want to use the ODBC environment from another system thread without scope => Therefore it
/// // needs to be static.
/// static ENV: OnceLock<Environment> = OnceLock::new();
/// let env = Environment::new()?;
///
/// let conn = ENV.get_or_init(|| env).connect_with_connection_string(
/// "Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;",
/// Default::default())?;
///
/// // We must use into_cursor to create a statement handle with static lifetime, which also owns
/// // the connection. This way we can send it to another thread safely.
/// let cursor = conn.into_cursor("SELECT * FROM very_big_table", ())?.unwrap();
///
/// // Batch size and buffer description. Here we assume there is only one integer column
/// let buffer_a = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
/// let mut buffer_b = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
/// // And now we have a sendable block cursor with static lifetime
/// let block_cursor = cursor.bind_buffer(buffer_a)?;
///
/// let mut cbc = ConcurrentBlockCursor::from_block_cursor(block_cursor)?;
/// while cbc.fetch_into(&mut buffer_b)? {
/// // Proccess batch in buffer b asynchronously to fetching it
/// }
///
/// # Ok::<_, odbc_api::Error>(())
/// ```
pub struct ConcurrentBlockCursor<C> {
/// In order to avoid reallocating buffers over and over again, we use this channel to send the
/// buffers back to the fetch thread after we copied their contents into arrow arrays.
Expand Down Expand Up @@ -88,6 +126,7 @@ where
})
}

/// Join fetch thread and yield the cursor back.
pub fn into_cursor(self) -> Result<C, Error> {
drop(self.receive_batch);
// Dropping the send buffer is necessary to avoid deadlocks, in case there would not be any
Expand Down

0 comments on commit abdf0bf

Please sign in to comment.