Skip to content

Commit

Permalink
Introduce async cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Aug 23, 2022
1 parent 7fb8474 commit 2598f0a
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 37 deletions.
5 changes: 5 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.47.0

* `TextRowSet::from_max_str_lens` now takes `IntoIterator<Item=usize>` instead of `Iterator<Item=usize>`.
* trait method `RowSetBuffer::bind_to_cursor` now takes a `StatementRef` instead of a `Cursor`. This change is unlikely to affect user code, as so far I know all downstream crates use the provided `RowSetBuffer` implementations and do not implement their own versions.

## 0.46.0

* Minimal support for asynchronous code in the `handle` module.
Expand Down
17 changes: 8 additions & 9 deletions odbc-api/src/buffers/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
handles::{CDataMut, Statement, StatementRef},
parameter::WithDataType,
result_set_metadata::utf8_display_sizes,
Cursor, Error, ResultSetMetadata, RowSetBuffer,
Error, ResultSetMetadata, RowSetBuffer,
};

use super::{Indicator, TextColumn};
Expand All @@ -35,8 +35,9 @@ impl<C: ColumnBuffer> ColumnarBuffer<C> {
/// valid rows is considered to be zero).
///
/// You do not want to call this constructor directly unless you want to provide your own buffer
/// implentation. Most users of this crate may want to use the constructors on
/// [`crate::buffers::ColumnarAnyBuffer`] or [`crate::buffers::TextRowSet`] instead.
/// implentation. Most users of this crate may want to use the constructors like
/// [`crate::buffers::ColumnarAnyBuffer::from_description`] or
/// [`crate::buffers::TextRowSet::from_max_str_lens`] instead.
pub fn new(columns: Vec<(u16, C)>) -> Self {
// Assert capacity
let capacity = columns
Expand Down Expand Up @@ -110,12 +111,9 @@ where
self.num_rows.as_mut()
}

unsafe fn bind_to_cursor(&mut self, cursor: &mut impl Cursor) -> Result<(), Error> {
unsafe fn bind_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
for (col_number, column) in &mut self.columns {
cursor
.as_stmt_ref()
.bind_col(*col_number, column)
.into_result(&cursor.as_stmt_ref())?;
cursor.bind_col(*col_number, column).into_result(&cursor)?;
}
Ok(())
}
Expand Down Expand Up @@ -331,9 +329,10 @@ impl TextRowSet {
/// `max_str_lengths` of respective size.
pub fn from_max_str_lens(
row_capacity: usize,
max_str_lengths: impl Iterator<Item = usize>,
max_str_lengths: impl IntoIterator<Item = usize>,
) -> Result<Self, Error> {
let buffers = max_str_lengths
.into_iter()
.enumerate()
.map(|(index, max_str_len)| {
Ok((
Expand Down
60 changes: 57 additions & 3 deletions odbc-api/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<'s> CursorRow<'s> {
/// by either a prepared query or direct execution. Usually utilized through the [`crate::Cursor`]
/// trait.
pub struct CursorImpl<Stmt: AsStatementRef> {
/// A statement handle in cursor mode.
statement: Stmt,
}

Expand Down Expand Up @@ -257,7 +258,7 @@ where
})?;
stmt.set_num_rows_fetched(Some(row_set_buffer.mut_num_fetch_rows()))
.into_result(&stmt)?;
row_set_buffer.bind_to_cursor(&mut self)?;
row_set_buffer.bind_to_cursor(stmt)?;
}
Ok(RowSetCursor::new(row_set_buffer, self))
}
Expand Down Expand Up @@ -286,6 +287,59 @@ where
}
}

/// The asynchronous sibiling of [`CursorImpl`]. Use this to fetch results in asynchronous code.
///
/// Like [`CursorImpl`] this is an ODBC statement handle in cursor state. However unlike its
/// synchronous sibling this statement handle is in asynchronous polling mode.
pub struct AsyncCursor<Stmt: AsStatementRef> {
/// A statement handle in cursor state with asynchronous mode enabled.
statement: Stmt,
}

impl<S> AsyncCursor<S>
where
S: AsStatementRef,
{
/// Users of this library are encouraged not to call this constructor directly. This method is
/// pubilc so users with an understanding of the raw ODBC C-API have a way to create an
/// asynchronous cursor, after they left the safety rails of the Rust type System, in order to
/// implement a use case not covered yet, by the safe abstractions within this crate.
///
/// # Safety
///
/// `statement` must be in Cursor state, for the invariants of this type to hold. Preferable
/// `statement` should also have asynchrous mode enabled, otherwise constructing a synchronous
/// [`CursorImpl`] is more suitable.
pub unsafe fn new(statement: S) -> Self {
Self { statement }
}
}

impl<S> AsStatementRef for AsyncCursor<S>
where
S: AsStatementRef,
{
fn as_stmt_ref(&mut self) -> StatementRef<'_> {
self.statement.as_stmt_ref()
}
}

impl<S> Drop for AsyncCursor<S>
where
S: AsStatementRef,
{
fn drop(&mut self) {
let mut stmt = self.statement.as_stmt_ref();
if let Err(e) = stmt.close_cursor().into_result(&stmt) {
// Avoid panicking, if we already have a panic. We don't want to mask the original
// error.
if !panicking() {
panic!("Unexpected error closing cursor: {:?}", e)
}
}
}
}

/// A Row set buffer binds row, or column wise buffers to a cursor in order to fill them with row
/// sets with each call to fetch.
///
Expand Down Expand Up @@ -317,7 +371,7 @@ pub unsafe trait RowSetBuffer {
///
/// It's the implementations responsibility to ensure that all bound buffers are valid until
/// unbound or the statement handle is deleted.
unsafe fn bind_to_cursor(&mut self, cursor: &mut impl Cursor) -> Result<(), Error>;
unsafe fn bind_to_cursor(&mut self, cursor: StatementRef<'_>) -> Result<(), Error>;
}

unsafe impl<T: RowSetBuffer> RowSetBuffer for &mut T {
Expand All @@ -333,7 +387,7 @@ unsafe impl<T: RowSetBuffer> RowSetBuffer for &mut T {
(*self).mut_num_fetch_rows()
}

unsafe fn bind_to_cursor(&mut self, cursor: &mut impl Cursor) -> Result<(), Error> {
unsafe fn bind_to_cursor(&mut self, cursor: StatementRef<'_>) -> Result<(), Error> {
(*self).bind_to_cursor(cursor)
}
}
Expand Down
2 changes: 1 addition & 1 deletion odbc-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub mod parameter;
pub use self::{
columnar_bulk_inserter::{BoundInputSlice, ColumnarBulkInserter},
connection::{escape_attribute_value, Connection},
cursor::{Cursor, CursorImpl, CursorRow, RowSetBuffer, RowSetCursor},
cursor::{AsyncCursor, Cursor, CursorImpl, CursorRow, RowSetBuffer, RowSetCursor},
driver_complete_option::DriverCompleteOption,
environment::{DataSourceInfo, DriverInfo, Environment},
error::{Error, TooLargeBufferSize},
Expand Down
12 changes: 6 additions & 6 deletions odbc-api/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::iter::repeat;
use lazy_static::lazy_static;
use odbc_api::{
buffers::{self, TextColumn},
handles::{CDataMut, Statement},
handles::{CDataMut, Statement, StatementRef},
Connection, Cursor, Environment, Error, RowSetBuffer, U16Str,
};

Expand Down Expand Up @@ -239,11 +239,11 @@ where
self.num_rows_fetched.as_mut()
}

unsafe fn bind_to_cursor(&mut self, cursor: &mut impl Cursor) -> Result<(), odbc_api::Error> {
cursor
.as_stmt_ref()
.bind_col(1, &mut self.column)
.into_result(&cursor.as_stmt_ref())?;
unsafe fn bind_to_cursor(
&mut self,
mut cursor: StatementRef<'_>,
) -> Result<(), odbc_api::Error> {
cursor.bind_col(1, &mut self.column).into_result(&cursor)?;
Ok(())
}
}
42 changes: 24 additions & 18 deletions odbc-api/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use common::{cursor_to_string, Profile, SingleColumnRowSetBuffer, ENV};

use odbc_api::{
buffers::{
BufferDescription, BufferKind, ColumnarAnyBuffer, Indicator, Item, TextColumn, TextRowSet, CharColumn,
BufferDescription, BufferKind, ColumnarAnyBuffer, Indicator, Item, TextColumn, TextRowSet,
},
handles::{OutputStringBuffer, SqlResult, SqlText, Statement},
handles::{AsStatementRef, OutputStringBuffer, SqlResult, SqlText, Statement},
parameter::InputParameter,
parameter::{
Blob, BlobRead, BlobSlice, VarBinaryArray, VarCharArray, VarCharSlice, WithDataType,
},
sys, Bit, ColumnDescription, Cursor, DataType, Error, InOut, IntoParameter, Nullability,
Nullable, Out, ResultSetMetadata, U16Str, U16String
sys, AsyncCursor, Bit, ColumnDescription, Cursor, DataType, Error, InOut, IntoParameter,
Nullability, Nullable, Out, ResultSetMetadata, RowSetBuffer, U16Str, U16String,
};
use std::{
ffi::CString,
Expand Down Expand Up @@ -3343,12 +3343,15 @@ async fn async_bulk_fetch(profile: &Profile) {
let prepared = conn.prepare(&table.sql_insert()).unwrap();
let mut inserter = prepared.into_text_inserter(1000, [50]).unwrap();
for index in 0..1000 {
inserter.append([Some(index.to_string().as_bytes())].iter().copied()).unwrap();
inserter
.append([Some(index.to_string().as_bytes())].iter().copied())
.unwrap();
}
inserter.execute().unwrap();
let sleep = || tokio::time::sleep(Duration::from_millis(50));

// When
let mut sum_rows_fetched = 0;
let statement = conn.preallocate().unwrap();
let mut statement = statement.into_statement();
let query = table.sql_all_ordered_by_id();
Expand All @@ -3363,27 +3366,30 @@ async fn async_bulk_fetch(profile: &Profile) {
}
result.into_result(&statement).unwrap();
// Fetching results in ten batches
let mut cursor = AsyncCursor::new(statement);
let mut num_rows_fetched = 0;
let mut sum_rows_fetched = 0;
statement.set_row_bind_type(0).unwrap();
statement.set_row_array_size(100).unwrap();
statement.set_num_rows_fetched(Some(&mut num_rows_fetched));
let mut buffer = CharColumn::new(100, 50);
statement.bind_col(1, &mut buffer);
cursor.as_stmt_ref().set_row_bind_type(0).unwrap();
cursor.as_stmt_ref().set_row_array_size(100).unwrap();
cursor
.as_stmt_ref()
.set_num_rows_fetched(Some(&mut num_rows_fetched));
let mut buffer = TextRowSet::from_max_str_lens(100, [50usize]).unwrap();
buffer.bind_to_cursor(cursor.as_stmt_ref()).unwrap();
let mut has_batches = true; // `false` as soon as end is reached
result = statement.fetch();
result = cursor.as_stmt_ref().fetch();
while has_batches {
sum_rows_fetched += num_rows_fetched;
while result == SqlResult::StillExecuting {
sleep().await;
result = statement.fetch();
result = cursor.as_stmt_ref().fetch();
}
result = statement.fetch();
has_batches = result.into_result(&statement).unwrap();
result = cursor.as_stmt_ref().fetch();
has_batches = result.into_result(&cursor.as_stmt_ref()).unwrap();
}
assert_eq!(1000, sum_rows_fetched)
}


// Then
assert_eq!(1000, sum_rows_fetched)
}

/// This test is inspired by a bug caused from a fetch statement generating a lot of diagnostic
Expand Down Expand Up @@ -3420,7 +3426,7 @@ fn many_diagnostic_messages(profile: &Profile) {
statement.execute().unwrap();

let query_sql = format!("SELECT a FROM {}", table_name);
let buffer = TextRowSet::from_max_str_lens(batch_size, iter::once(1)).unwrap();
let buffer = TextRowSet::from_max_str_lens(batch_size, [1]).unwrap();
let cursor = conn.execute(&query_sql, ()).unwrap().unwrap();
let mut row_set_cursor = cursor.bind_buffer(buffer).unwrap();

Expand Down

0 comments on commit 2598f0a

Please sign in to comment.