Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async/await prepared statement API #390

Merged
merged 13 commits into from
Aug 18, 2023

Conversation

mariosangiorgio
Copy link
Contributor

Implementation of async/await API for prepared statements (See also #385).

This PR adds a new PreparedStatement protocol to represent prepared statements and an execute function on PostgresConnection to prepare and execute statements.

To implement the features the PR also introduces a PreparedStatementStateMachine that keeps track of the state of a prepared statement at the connection level. This ensures that, for each connection, each statement is prepared once at time of first use and then subsequent uses are going to skip the preparation step and just execute it.

Example usage

First define the struct to represent the prepared statement:

 struct ExamplePreparedStatement: PreparedStatement {
     static var sql = "SELECT pid, datname FROM pg_stat_activity WHERE state = $1"
     typealias Row = (Int, String)
     var state: String
     
    func makeBindings() -> PostgresBindings {
        var bindings = PostgresBindings()
        bindings.append(.init(string: self.state))
        return bindings
    }

    func decodeRow(_ row: PostgresNIO.PostgresRow) throws -> Row {
        try row.decode(Row.self)
    }
}

then, assuming you already have a PostgresConnection you can execute it:

let preparedStatement = ExamplePreparedStatement(state: "active")
let results = try await connection.execute(preparedStatement, logger: logger)
for (pid, database) in results {
    print("PID: \(pid), database: \(database)")
}

public func execute<P: PreparedStatement>(
_ preparedStatement: P,
logger: Logger
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return type isn't exactly nice. Any advice on how I could improve it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some AsyncSequence where Element == P.Row?

context: context
)
}
promise.futureResult.whenFailure { error in
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this I want to propagate failures happening during preparation to all the pending queries we have.
I am not 100% convinced by the way it looks (especially by the need to do error as! PSQLError) but I cannot find any better alternative.

Do you have any advice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I think we should use whenComplete here instead of whenSuccess and whenFailure. Reason for this is, we can save an allocation...

Second, we must check if the error we get is actually a PSQLError, if it is not, we need to wrap error with a PSQLError.

logger: statement.logger,
promise: statement.promise
)))
self.run(action, with: context)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here (and in a couple other places) I need to execute all the pending statement executions.
To do that I enqueue and run many actions at the same time. Is this okay?

Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review pass

public func execute<P: PreparedStatement>(
_ preparedStatement: P,
logger: Logger
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some AsyncSequence where Element == P.Row?

@@ -448,6 +448,27 @@ extension PostgresConnection {
self.channel.write(task, promise: nil)
}
}

/// Execute a prepared statement, taking care of the preparation when necessary
public func execute<P: PreparedStatement>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename P to Statement

public func execute<P: PreparedStatement>(
_ preparedStatement: P,
logger: Logger
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also let's overload this method with a method where Statement.Row == () where we only return the command tag?

Comment on lines 242 to 245
switch self.preparedStatementState.lookup(
name: preparedStatement.name,
context: preparedStatement
) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assign a variable here and then switch over the variable?

Comment on lines 10 to 17
enum Action {
case prepareStatement
case waitForAlreadyInFlightPreparation
case executePendingStatements([PreparedStatementContext], RowDescription?)
case returnError([PreparedStatementContext], PSQLError)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning the same Action for all mutations is what I did in the past, but what we generally frown upon by now. Can we get specific Actions for each mutation?

self.preparedStatements[name] = .preparing(statements)
return .waitForAlreadyInFlightPreparation
case .prepared(let rowDescription):
return .executePendingStatements([context], rowDescription)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't allocate here :P

///
/// Structs conforming to this protocol can then be used with `PostgresConnection.execute(_ preparedStatement:, logger:)`,
/// which will take care of preparing the statement on the server side and executing it.
public protocol PreparedStatement {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should require that this is Sendable :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett
Copy link
Collaborator

fabianfett commented Aug 9, 2023

Fixes #385

@fabianfett fabianfett added this to the 2.0.0 milestone Aug 9, 2023
@codecov-commenter
Copy link

codecov-commenter commented Aug 10, 2023

Codecov Report

Merging #390 (da87ce0) into main (5217ba7) will increase coverage by 1.44%.
The diff coverage is 83.33%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #390      +/-   ##
==========================================
+ Coverage   47.53%   48.98%   +1.44%     
==========================================
  Files         109      110       +1     
  Lines        8764     8954     +190     
==========================================
+ Hits         4166     4386     +220     
+ Misses       4598     4568      -30     
Files Changed Coverage Δ
Sources/PostgresNIO/New/PostgresQuery.swift 81.29% <0.00%> (+2.77%) ⬆️
...es/PostgresNIO/Connection/PostgresConnection.swift 42.35% <52.08%> (+1.23%) ⬆️
... State Machine/PreparedStatementStateMachine.swift 91.83% <91.83%> (ø)
...urces/PostgresNIO/New/PostgresChannelHandler.swift 84.45% <98.82%> (+6.23%) ⬆️
Sources/PostgresNIO/New/PSQLTask.swift 78.12% <100.00%> (+16.58%) ⬆️

... and 6 files with indirect coverage changes

Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better and better. For unit test examples have a look at PostgresConnectionTests.testSimpleListen

Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated Show resolved Hide resolved
case prepareStatement
case waitForAlreadyInFlightPreparation
case executeStatement(RowDescription?)
case executePendingStatements([PreparedStatementContext], RowDescription?)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can drop this one, as it isn't used, can't we?

Comment on lines 42 to 45
enum PreparationCompleteAction {
case executePendingStatements([PreparedStatementContext], RowDescription?)
}

mutating func preparationComplete(
name: String,
rowDescription: RowDescription?
) -> PreparationCompleteAction {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this work better as a simple struct?

Comment on lines 36 to 37
self.listenState = ListenStateMachine()
self.preparedStatementState = PreparedStatementStateMachine()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just init those two directly in the declaration...

context: context
)
}
promise.futureResult.whenFailure { error in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I think we should use whenComplete here instead of whenSuccess and whenFailure. Reason for this is, we can save an allocation...

Second, we must check if the error we get is actually a PSQLError, if it is not, we need to wrap error with a PSQLError.

Comment on lines 240 to 259
let action = self.preparedStatementState.lookup(
name: preparedStatement.name,
context: preparedStatement
)
switch action {
case .prepareStatement:
let promise = self.eventLoop.makePromise(of: RowDescription?.self)
promise.futureResult.whenSuccess { rowDescription in
self.prepareStatementComplete(
name: preparedStatement.name,
rowDescription: rowDescription,
context: context
)
}
promise.futureResult.whenFailure { error in
self.prepareStatementFailed(
name: preparedStatement.name,
error: error as! PSQLError,
context: context
)
}
psqlTask = .extendedQuery(.init(
name: preparedStatement.name,
query: preparedStatement.sql,
logger: preparedStatement.logger,
promise: promise
))
case .waitForAlreadyInFlightPreparation:
// The state machine already keeps track of this
// and will execute the statement as soon as it's prepared
return
case .executeStatement(let rowDescription):
psqlTask = .extendedQuery(.init(
executeStatement: .init(
name: preparedStatement.name,
binds: preparedStatement.bindings,
rowDescription: rowDescription),
logger: preparedStatement.logger,
promise: preparedStatement.promise
))
case .executePendingStatements(let pendingStatements, let rowDescription):
for statement in pendingStatements {
let action = self.state.enqueue(task: .extendedQuery(.init(
executeStatement: .init(
name: statement.name,
binds: statement.bindings,
rowDescription: rowDescription),
logger: statement.logger,
promise: statement.promise
)))
self.run(action, with: context)
}
return
case .returnError(let pendingStatements, let error):
for statement in pendingStatements {
statement.promise.fail(error)
}
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into its own method? we should likely do the same with all the other longer ones above.

Comment on lines 739 to 741
guard case .executePendingStatements(let statements, let rowDescription) = action else {
preconditionFailure("Expected to have pending statements to execute")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should reconsider the return type from the statemachine here :). No enum needed.

Comment on lines 766 to 768
guard case .returnError(let statements, let error) = action else {
preconditionFailure("Expected to have pending statements to execute")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above.

@fabianfett fabianfett added the semver-minor Adds new public API. label Aug 11, 2023
///
/// func makeBindings() -> PostgresBindings {
/// var bindings = PostgresBindings()
/// bindings.append(.init(string: self.state))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// bindings.append(.init(string: self.state))
/// bindings.append(self.state)

does this work as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I get back an error: Cannot convert value of type 'String' to expected argument type 'PostgresData'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we shouldn't create PostgresData. The problem here is that we better names for those methods, so that users can use them without Interpolation as well.

struct PostgresBindings {
    @inlinable
    public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Value) throws {
        try self.binds.append(value, context: .default)
        self.sql.append(contentsOf: "$\(self.binds.count)")
    }

    @inlinable
    public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Optional<Value>) throws {
        switch value {
        case .none:
            self.binds.appendNull()
        case .some(let value):
            try self.binds.append(value, context: .default)
        }

        self.sql.append(contentsOf: "$\(self.binds.count)")
    }
}

name: String,
rowDescription: RowDescription?
) -> PreparationCompleteAction {
guard case .preparing(let statements) = self.preparedStatements[name] else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we switch over all cases here?

}

mutating func errorHappened(name: String, error: PSQLError) -> ErrorHappenedAction {
guard case .preparing(let statements) = self.preparedStatements[name] else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we switch over all cases here?

///
/// func makeBindings() -> PostgresBindings {
/// var bindings = PostgresBindings()
/// bindings.append(.init(string: self.state))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we shouldn't create PostgresData. The problem here is that we better names for those methods, so that users can use them without Interpolation as well.

struct PostgresBindings {
    @inlinable
    public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Value) throws {
        try self.binds.append(value, context: .default)
        self.sql.append(contentsOf: "$\(self.binds.count)")
    }

    @inlinable
    public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Optional<Value>) throws {
        switch value {
        case .none:
            self.binds.appendNull()
        case .some(let value):
            try self.binds.append(value, context: .default)
        }

        self.sql.append(contentsOf: "$\(self.binds.count)")
    }
}

@mariosangiorgio mariosangiorgio marked this pull request as ready for review August 16, 2023 15:24
Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks so good. A few nits.

Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/Connection/PostgresConnection.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PostgresChannelHandler.swift Outdated Show resolved Hide resolved
Sources/PostgresNIO/New/PreparedStatement.swift Outdated Show resolved Hide resolved
}
}

func testPreparedStatement() async throws {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that these tests are somewhat painful to write but we should embrace them, as we can produce hard to reproduce scenarios here.

For this reason, please lets try to create those scenarios in extra tests here:

  • Sending the a prepared query twice, before the preparation step has succeeded. Is the order of the send statements still correct? Is the query just prepared once?
  • Sending the a prepared, after the the query of the same type has already succeeded. Is the query executed right away?
  • Failing a prepared statement in preparation are other executes failed in the same way right away?

Happy to help.

@fabianfett fabianfett requested a review from gwynne August 18, 2023 08:23
Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm extremely happy with this. Let's wait on the go from @gwynne and then let's merge it! Thanks @mariosangiorgio!

Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nits

Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated Show resolved Hide resolved
Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated Show resolved Hide resolved
Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated Show resolved Hide resolved
Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated Show resolved Hide resolved
Tests/PostgresNIOTests/New/PostgresConnectionTests.swift Outdated Show resolved Hide resolved
Co-authored-by: Fabian Fett <fabianfett@apple.com>
@fabianfett fabianfett merged commit d5c5258 into vapor:main Aug 18, 2023
13 checks passed
@mariosangiorgio mariosangiorgio deleted the async-prepared-statement branch August 18, 2023 10:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver-minor Adds new public API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants