Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ version: 2
jobs:
build:
docker:
- image: rust:1.40.0
- image: rust:1.41.0
environment:
RUSTFLAGS: -D warnings
- image: sfackler/rust-postgres-test:6
Expand Down
2 changes: 1 addition & 1 deletion postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.5.3", path = "../tokio-postgres" }

tokio = { version = "0.2", features = ["rt-core"] }
tokio = { version = "0.2", features = ["rt-core", "time"] }
log = "0.4"

[dev-dependencies]
Expand Down
22 changes: 13 additions & 9 deletions postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Utilities for working with the PostgreSQL binary copy format.

use crate::connection::ConnectionRef;
use crate::types::{ToSql, Type};
use crate::{CopyInWriter, CopyOutReader, Error, Rt};
use crate::{CopyInWriter, CopyOutReader, Error};
use fallible_iterator::FallibleIterator;
use futures::StreamExt;
use std::pin::Pin;
Expand All @@ -13,7 +14,7 @@ use tokio_postgres::binary_copy::{self, BinaryCopyOutStream};
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
pub struct BinaryCopyInWriter<'a> {
runtime: Rt<'a>,
connection: ConnectionRef<'a>,
sink: Pin<Box<binary_copy::BinaryCopyInWriter>>,
}

Expand All @@ -26,7 +27,7 @@ impl<'a> BinaryCopyInWriter<'a> {
.expect("writer has already been written to");

BinaryCopyInWriter {
runtime: writer.runtime,
connection: writer.connection,
sink: Box::pin(binary_copy::BinaryCopyInWriter::new(stream, types)),
}
}
Expand All @@ -37,7 +38,7 @@ impl<'a> BinaryCopyInWriter<'a> {
///
/// Panics if the number of values provided does not match the number expected.
pub fn write(&mut self, values: &[&(dyn ToSql + Sync)]) -> Result<(), Error> {
self.runtime.block_on(self.sink.as_mut().write(values))
self.connection.block_on(self.sink.as_mut().write(values))
}

/// A maximally-flexible version of `write`.
Expand All @@ -50,20 +51,21 @@ impl<'a> BinaryCopyInWriter<'a> {
I: IntoIterator<Item = &'b dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
self.runtime.block_on(self.sink.as_mut().write_raw(values))
self.connection
.block_on(self.sink.as_mut().write_raw(values))
}

/// Completes the copy, returning the number of rows added.
///
/// This method *must* be used to complete the copy process. If it is not, the copy will be aborted.
pub fn finish(mut self) -> Result<u64, Error> {
self.runtime.block_on(self.sink.as_mut().finish())
self.connection.block_on(self.sink.as_mut().finish())
}
}

/// An iterator of rows deserialized from the PostgreSQL binary copy format.
pub struct BinaryCopyOutIter<'a> {
runtime: Rt<'a>,
connection: ConnectionRef<'a>,
stream: Pin<Box<BinaryCopyOutStream>>,
}

Expand All @@ -76,7 +78,7 @@ impl<'a> BinaryCopyOutIter<'a> {
.expect("reader has already been read from");

BinaryCopyOutIter {
runtime: reader.runtime,
connection: reader.connection,
stream: Box::pin(BinaryCopyOutStream::new(stream, types)),
}
}
Expand All @@ -87,6 +89,8 @@ impl FallibleIterator for BinaryCopyOutIter<'_> {
type Error = Error;

fn next(&mut self) -> Result<Option<BinaryCopyOutRow>, Error> {
self.runtime.block_on(self.stream.next()).transpose()
let stream = &mut self.stream;
self.connection
.block_on(async { stream.next().await.transpose() })
}
}
85 changes: 33 additions & 52 deletions postgres/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,21 @@
use crate::connection::Connection;
use crate::{
CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction,
TransactionBuilder,
CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
ToStatement, Transaction, TransactionBuilder,
};
use std::ops::{Deref, DerefMut};
use tokio::runtime::Runtime;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::types::{ToSql, Type};
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};

pub(crate) struct Rt<'a>(pub &'a mut Runtime);

// no-op impl to extend the borrow until drop
impl Drop for Rt<'_> {
fn drop(&mut self) {}
}

impl Deref for Rt<'_> {
type Target = Runtime;

#[inline]
fn deref(&self) -> &Runtime {
self.0
}
}

impl DerefMut for Rt<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut Runtime {
self.0
}
}

/// A synchronous PostgreSQL client.
pub struct Client {
runtime: Runtime,
connection: Connection,
client: tokio_postgres::Client,
}

impl Client {
pub(crate) fn new(runtime: Runtime, client: tokio_postgres::Client) -> Client {
Client { runtime, client }
pub(crate) fn new(connection: Connection, client: tokio_postgres::Client) -> Client {
Client { connection, client }
}

/// A convenience function which parses a configuration string into a `Config` and then connects to the database.
Expand All @@ -62,10 +38,6 @@ impl Client {
Config::new()
}

fn rt(&mut self) -> Rt<'_> {
Rt(&mut self.runtime)
}

/// Executes a statement, returning the number of rows modified.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
Expand Down Expand Up @@ -104,7 +76,7 @@ impl Client {
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.client.execute(query, params))
self.connection.block_on(self.client.execute(query, params))
}

/// Executes a statement, returning the resulting rows.
Expand Down Expand Up @@ -140,7 +112,7 @@ impl Client {
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.client.query(query, params))
self.connection.block_on(self.client.query(query, params))
}

/// Executes a statement which returns a single row, returning it.
Expand Down Expand Up @@ -177,7 +149,8 @@ impl Client {
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.client.query_one(query, params))
self.connection
.block_on(self.client.query_one(query, params))
}

/// Executes a statement which returns zero or one rows, returning it.
Expand Down Expand Up @@ -223,7 +196,8 @@ impl Client {
where
T: ?Sized + ToStatement,
{
self.runtime.block_on(self.client.query_opt(query, params))
self.connection
.block_on(self.client.query_opt(query, params))
}

/// A maximally-flexible version of `query`.
Expand Down Expand Up @@ -289,9 +263,9 @@ impl Client {
I::IntoIter: ExactSizeIterator,
{
let stream = self
.runtime
.connection
.block_on(self.client.query_raw(query, params))?;
Ok(RowIter::new(self.rt(), stream))
Ok(RowIter::new(self.connection.as_ref(), stream))
}

/// Creates a new prepared statement.
Expand All @@ -318,7 +292,7 @@ impl Client {
/// # }
/// ```
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
self.runtime.block_on(self.client.prepare(query))
self.connection.block_on(self.client.prepare(query))
}

/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
Expand Down Expand Up @@ -349,7 +323,7 @@ impl Client {
/// # }
/// ```
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
self.runtime
self.connection
.block_on(self.client.prepare_typed(query, types))
}

Expand Down Expand Up @@ -380,8 +354,8 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let sink = self.runtime.block_on(self.client.copy_in(query))?;
Ok(CopyInWriter::new(self.rt(), sink))
let sink = self.connection.block_on(self.client.copy_in(query))?;
Ok(CopyInWriter::new(self.connection.as_ref(), sink))
}

/// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
Expand All @@ -408,8 +382,8 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.runtime.block_on(self.client.copy_out(query))?;
Ok(CopyOutReader::new(self.rt(), stream))
let stream = self.connection.block_on(self.client.copy_out(query))?;
Ok(CopyOutReader::new(self.connection.as_ref(), stream))
}

/// Executes a sequence of SQL statements using the simple query protocol.
Expand All @@ -428,7 +402,7 @@ impl Client {
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.runtime.block_on(self.client.simple_query(query))
self.connection.block_on(self.client.simple_query(query))
}

/// Executes a sequence of SQL statements using the simple query protocol.
Expand All @@ -442,7 +416,7 @@ impl Client {
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
self.runtime.block_on(self.client.batch_execute(query))
self.connection.block_on(self.client.batch_execute(query))
}

/// Begins a new database transaction.
Expand All @@ -466,8 +440,8 @@ impl Client {
/// # }
/// ```
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
let transaction = self.runtime.block_on(self.client.transaction())?;
Ok(Transaction::new(&mut self.runtime, transaction))
let transaction = self.connection.block_on(self.client.transaction())?;
Ok(Transaction::new(self.connection.as_ref(), transaction))
}

/// Returns a builder for a transaction with custom settings.
Expand All @@ -494,7 +468,14 @@ impl Client {
/// # }
/// ```
pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
TransactionBuilder::new(&mut self.runtime, self.client.build_transaction())
TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
}

/// Returns a structure providing access to asynchronous notifications.
///
/// Use the `LISTEN` command to register this connection for notifications.
pub fn notifications(&mut self) -> Notifications<'_> {
Notifications::new(self.connection.as_ref())
}

/// Constructs a cancellation token that can later be used to request
Expand All @@ -516,7 +497,7 @@ impl Client {
/// thread::spawn(move || {
/// // Abort the query after 5s.
/// thread::sleep(Duration::from_secs(5));
/// cancel_token.cancel_query(NoTls);
/// let _ = cancel_token.cancel_query(NoTls);
/// });
///
/// match client.simple_query("SELECT long_running_query()") {
Expand Down
14 changes: 3 additions & 11 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
//!
//! Requires the `runtime` Cargo feature (enabled by default).

use crate::connection::Connection;
use crate::Client;
use futures::FutureExt;
use log::error;
use std::fmt;
use std::path::Path;
use std::str::FromStr;
Expand Down Expand Up @@ -324,15 +323,8 @@ impl Config {

let (client, connection) = runtime.block_on(self.config.connect(tls))?;

// FIXME don't spawn this so error reporting is less weird.
let connection = connection.map(|r| {
if let Err(e) = r {
error!("postgres connection error: {}", e)
}
});
runtime.spawn(connection);

Ok(Client::new(runtime, client))
let connection = Connection::new(runtime, connection);
Ok(Client::new(connection, client))
}
}

Expand Down
Loading