Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix fetch_optional for sqlite #852

Merged
merged 1 commit into from
Nov 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions sqlx-core/src/sqlite/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::sqlite::{
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use libsqlite3_sys::sqlite3_last_insert_rowid;
use std::borrow::Cow;
use std::sync::Arc;
Expand Down Expand Up @@ -144,21 +143,60 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {

fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
mut query: E,
) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let mut s = self.fetch_many(query);
let sql = query.sql();
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
let arguments = query.take_arguments();
let persistent = query.persistent() && arguments.is_some();

Box::pin(async move {
while let Some(v) = s.try_next().await? {
if let Either::Right(r) = v {
return Ok(Some(r));
let SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
ref mut worker,
..
} = self;

// prepare statement object (or checkout from cache)
let virtual_stmt = prepare(statements, statement, sql, persistent)?;

// keep track of how many arguments we have bound
let mut num_arguments = 0;

while let Some((stmt, columns, column_names, last_row_values)) =
virtual_stmt.prepare(conn)?
{
// bind values to the statement
num_arguments += bind(stmt, &arguments, num_arguments)?;

// save the rows from the _current_ position on the statement
// and send them to the still-live row object
SqliteRow::inflate_if_needed(stmt, &*columns, last_row_values.take());

// invoke [sqlite3_step] on the dedicated worker thread
// this will move us forward one row or finish the statement
match worker.step(*stmt).await? {
Either::Left(_) => (),

Either::Right(()) => {
let (row, weak_values_ref) =
SqliteRow::current(*stmt, columns, column_names);

*last_row_values = Some(weak_values_ref);

logger.increment_rows();

virtual_stmt.reset();
return Ok(Some(row));
}
}
}

Ok(None)
})
}
Expand Down