Skip to content

Commit

Permalink
complete coverage for concurrent_block_cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Jan 21, 2024
1 parent e147164 commit b2587ea
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 35 deletions.
83 changes: 49 additions & 34 deletions odbc-api/src/concurrent_block_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ where
/// * `block_cursor`: Taking a BlockCursor instead of a Cursor allows for better resource
/// stealing if constructing starting from a sequential Cursor, as we do not need to undbind
/// and bind the cursor.
pub fn new(
block_cursor: BlockCursor<C, ColumnarAnyBuffer>,
) -> Result<Self, Error> {
pub fn new(block_cursor: BlockCursor<C, ColumnarAnyBuffer>) -> Result<Self, Error> {
let (send_buffer, receive_buffer) = sync_channel(1);
let (send_batch, receive_batch) = sync_channel(1);

Expand Down Expand Up @@ -69,7 +67,6 @@ where
// Wait for the application thread to give us a buffer to fill.
match receive_buffer.recv() {
Err(_) => {
todo!();
// Application thread dropped sender and does not want more buffers to be
// filled. Let's stop this thread and return the cursor
break Ok(cursor);
Expand All @@ -90,11 +87,11 @@ where
}

pub fn into_cursor(self) -> Result<C, Error> {
// Fetch thread should never be blocked for a long time in receiving buffers. Yet it could
// wait for a long time on the application logic to receive an arrow buffer using next. We
// drop the receiver here explicitly in order to be always able to join the fetch thread,
// even if the iterator has not been consumed to completion.
drop(self.receive_batch);
// Dropping the send buffer is necessary to avoid deadlocks, in case there would not be any
// buffer in the channel waiting for the fetch thread. Since we consume the cursor here, it
// is also impossible for the application to send another buffer.
drop(self.send_buffer);
if let Some(cursor) = self.cursor {
Ok(cursor)
} else {
Expand All @@ -104,32 +101,15 @@ where
}

impl<C> ConcurrentBlockCursor<C> {
/// Fetches values from the ODBC datasource into buffer. Values are streamed batch by batch in
/// order to avoid reallocation of the buffers used for tranistion.
///
/// # Parameters
///
/// * `buffer`: A columnar any buffer which can bind to the cursor wrapped by this instance.
/// After the method call the reference will not point to the same instance which had been
/// passed into the function call, but to the one which was bound to the cursor in order to
/// fetch the last batch. The buffer passed into this method, is then used to fetch the next
/// batch. As such this method is ideal to implement concurrent fetching using two buffers.
/// One which is written to, and one that is read, which flip their roles between batches.
/// Also called double buffering.
///
/// # Return
///
/// * `true`: Fetched a batch from the data source. The contents of that batch are now in
/// `buffer`.
/// * `false`: No batch could be fetched. The result set is consumed completly.
pub fn fetch_into(&mut self, buffer: &mut ColumnarAnyBuffer) -> Result<bool, Error> {
/// Receive the current batch and take ownership of its buffer. `None` if the cursor is already
/// consumed, or had an error previously. This method blocks until a new batch available. In
/// order for new batches available new buffers must be send to the thread in order for it to
/// fill them. So calling fetch repeatedly without calling [`Self::fill`] in between may
/// deadlock.
pub fn fetch(&mut self) -> Result<Option<ColumnarAnyBuffer>, Error> {
match self.receive_batch.recv() {
// We successfully fetched a batch from the database.
Ok(mut batch) => {
swap(buffer, &mut batch);
let _ = self.send_buffer.send(batch);
Ok(true)
}
Ok(batch) => Ok(Some(batch)),
// Fetch thread stopped sending batches. Either because we consumed the result set
// completly or we hit an error.
Err(_receive_error) => {
Expand All @@ -138,14 +118,49 @@ impl<C> ConcurrentBlockCursor<C> {
// will raise it.
self.cursor = Some(join_handle.join().unwrap()?);
// We ran out of batches in the result set. End the stream.
Ok(false)
Ok(None)
} else {
// This only happen if this method is called after it returned either `false` or
// `Err` once. Let us treat this scenario like a result set which is consumed
// completly.
Ok(false)
Ok(None)
}
}
}
}

/// Send a buffer to the thread fetching in order for it to be filled and to be retrieved later
/// using either `fetch`, or `fetch_into`.
pub fn fill(&mut self, buffer: ColumnarAnyBuffer) {
let _ = self.send_buffer.send(buffer);
}

/// Fetches values from the ODBC datasource into buffer. Values are streamed batch by batch in
/// order to avoid reallocation of the buffers used for tranistion. This call blocks until a new
/// batch is ready. This method combines both [`Self::fetch`] and [`Self::fill`].
///
/// # Parameters
///
/// * `buffer`: A columnar any buffer which can bind to the cursor wrapped by this instance.
/// After the method call the reference will not point to the same instance which had been
/// passed into the function call, but to the one which was bound to the cursor in order to
/// fetch the last batch. The buffer passed into this method, is then used to fetch the next
/// batch. As such this method is ideal to implement concurrent fetching using two buffers.
/// One which is written to, and one that is read, which flip their roles between batches.
/// Also called double buffering.
///
/// # Return
///
/// * `true`: Fetched a batch from the data source. The contents of that batch are now in
/// `buffer`.
/// * `false`: No batch could be fetched. The result set is consumed completly.
pub fn fetch_into(&mut self, buffer: &mut ColumnarAnyBuffer) -> Result<bool, Error> {
if let Some(mut batch) = self.fetch()? {
swap(buffer, &mut batch);
self.fill(batch);
Ok(true)
} else {
Ok(false)
}
}
}
69 changes: 68 additions & 1 deletion odbc-api/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4089,7 +4089,7 @@ fn fetch_decimals_to_int(profile: &Profile) {
#[test_case(MARIADB; "Maria DB")]
#[test_case(SQLITE_3; "SQLite 3")]
#[test_case(POSTGRES; "PostgreSQL")]
fn concurrent_bulk_fetch_into_result(profile: &Profile) {
fn concurrent_bulk_fetch_double_buffered(profile: &Profile) {
// Given
let table_name = table_name!();
let (conn, table) = profile.given(&table_name, &["INT"]).unwrap();
Expand Down Expand Up @@ -4120,6 +4120,73 @@ fn concurrent_bulk_fetch_into_result(profile: &Profile) {
assert!(!has_another_batch);
}

/// Bulf fetch in a dedicated system thread. Usually so the application can process the last batch
/// while the next one is fetched.
#[test_case(MSSQL; "Microsoft SQL Server")]
#[test_case(MARIADB; "Maria DB")]
#[test_case(SQLITE_3; "SQLite 3")]
#[test_case(POSTGRES; "PostgreSQL")]
fn concurrent_bulk_fetch_single_buffer(profile: &Profile) {
// Given
let table_name = table_name!();
let (conn, table) = profile.given(&table_name, &["INT"]).unwrap();
conn.execute(&format!("INSERT INTO {table_name} (a) VALUES (1), (2)"), ())
.unwrap();

// When
let buffer = ColumnarAnyBuffer::from_descs(1, [BufferDesc::I32 { nullable: false }]);
let cursor = conn
.into_cursor(&table.sql_all_ordered_by_id(), ())
.unwrap()
.unwrap();
let block_cursor = cursor.bind_buffer(buffer).unwrap();
let mut concurrent_block_cursor = ConcurrentBlockCursor::new(block_cursor).unwrap();

let batch = concurrent_block_cursor.fetch().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(1i32, batch.column(0).as_slice().unwrap()[0]);
concurrent_block_cursor.fill(batch);

let batch = concurrent_block_cursor.fetch().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(2i32, batch.column(0).as_slice().unwrap()[0]);
concurrent_block_cursor.fill(batch);

let all_batches_consumed = concurrent_block_cursor.fetch().unwrap().is_none();
assert!(all_batches_consumed);
}

/// Catch edge cases, there we stop the thread, while there are still batches to consume
#[test_case(MSSQL; "Microsoft SQL Server")]
#[test_case(MARIADB; "Maria DB")]
#[test_case(SQLITE_3; "SQLite 3")]
#[test_case(POSTGRES; "PostgreSQL")]
fn concurrent_bulk_fetch_fetch_one_batch(profile: &Profile) {
// Given
let table_name = table_name!();
let (conn, table) = profile.given(&table_name, &["INT"]).unwrap();
conn.execute(&format!("INSERT INTO {table_name} (a) VALUES (1), (2)"), ())
.unwrap();

// When
let buffer = ColumnarAnyBuffer::from_descs(1, [BufferDesc::I32 { nullable: false }]);
let cursor = conn
.into_cursor(&table.sql_all_ordered_by_id(), ())
.unwrap()
.unwrap();
let block_cursor = cursor.bind_buffer(buffer).unwrap();
let mut concurrent_block_cursor = ConcurrentBlockCursor::new(block_cursor).unwrap();
let _ = concurrent_block_cursor.fetch().unwrap().unwrap();
// Now instead of sending a buffer and fetching a next one, we interrupt the fetch thread while
// it does not own a buffer.
let cursor = concurrent_block_cursor.into_cursor().unwrap();
// Now we can deterministically fetch the second batch in the main thread again. Since the fetch
// thread has only ever seen one buffer, it could have only fetched one batch.

let actual = cursor_to_string(cursor);
assert_eq!("2", actual);
}

/// Bulk fetch in a dedicated system thread. Usually so the application can process the last batch
/// while the next one is fetched.
#[test_case(MSSQL; "Microsoft SQL Server")]
Expand Down

0 comments on commit b2587ea

Please sign in to comment.