Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ let package = Package(
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOTLS", package: "swift-nio"),
.product(name: "NIOSSL", package: "swift-nio-ssl"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ extension PostgresConnection: PostgresDatabase {
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return rows.all().map { allrows in
let r = allrows.map { psqlRow -> PostgresRow in
let columns = psqlRow.data.columns.map {
let columns = psqlRow.data.map {
PostgresMessage.DataRow.Column(value: $0)
}
return PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
Expand Down Expand Up @@ -112,7 +112,7 @@ extension PSQLRowStream {

func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
self.onRow { psqlRow in
let columns = psqlRow.data.columns.map {
let columns = psqlRow.data.map {
PostgresMessage.DataRow.Column(value: $0)
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/PostgresNIO/Data/PostgresData+Array.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ extension PostgresData {
return nil
}
assert(b == 0, "Array b field did not equal zero")
guard let type = value.readInteger(as: PostgresDataType.self) else {
guard let type = value.readRawRepresentableInteger(as: PostgresDataType.self) else {
return nil
}
guard isNotEmpty == 1 else {
Expand Down
2 changes: 1 addition & 1 deletion Sources/PostgresNIO/Message/PostgresMessage+Error.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ extension PostgresMessage {
/// Parses an instance of this message type from a byte buffer.
public static func parse(from buffer: inout ByteBuffer) throws -> Error {
var fields: [Field: String] = [:]
while let field = buffer.readInteger(as: Field.self) {
while let field = buffer.readRawRepresentableInteger(as: Field.self) {
guard let string = buffer.readNullTerminatedString() else {
throw PostgresError.protocol("Could not read error response string.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extension PostgresMessage {
/// Parses an instance of this message type from a byte buffer.
public static func parse(from buffer: inout ByteBuffer) throws -> ParameterDescription {
guard let dataTypes = try buffer.read(array: PostgresDataType.self, { buffer in
guard let dataType = buffer.readInteger(as: PostgresDataType.self) else {
guard let dataType = buffer.readRawRepresentableInteger(as: PostgresDataType.self) else {
throw PostgresError.protocol("Could not parse data type integer in parameter description message.")
}
return dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extension PostgresMessage {
guard let columnAttributeNumber = buffer.readInteger(as: Int16.self) else {
throw PostgresError.protocol("Could not read row description field column attribute number")
}
guard let dataType = buffer.readInteger(as: PostgresDataType.self) else {
guard let dataType = buffer.readRawRepresentableInteger(as: PostgresDataType.self) else {
throw PostgresError.protocol("Could not read row description field data type")
}
guard let dataTypeSize = buffer.readInteger(as: Int16.self) else {
Expand All @@ -29,7 +29,7 @@ extension PostgresMessage {
guard let dataTypeModifier = buffer.readInteger(as: Int32.self) else {
throw PostgresError.protocol("Could not read row description field data type modifier")
}
guard let formatCode = buffer.readInteger(as: PostgresFormatCode.self) else {
guard let formatCode = buffer.readRawRepresentableInteger(as: PostgresFormatCode.self) else {
throw PostgresError.protocol("Could not read row description field format code")
}
return .init(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ struct ConnectionStateMachine {
case sendParseDescribeBindExecuteSync(query: String, binds: [PSQLEncodable])
case sendBindExecuteSync(statementName: String, binds: [PSQLEncodable])
case failQuery(ExtendedQueryContext, with: PSQLError, cleanupContext: CleanUpContext?)
case succeedQuery(ExtendedQueryContext, columns: [PSQLBackendMessage.RowDescription.Column])
case succeedQuery(ExtendedQueryContext, columns: [RowDescription.Column])
case succeedQueryNoRowsComming(ExtendedQueryContext, commandTag: String)

// --- streaming actions
// actions if query has requested next row but we are waiting for backend
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
case forwardRows([DataRow])
case forwardStreamComplete([DataRow], commandTag: String)
case forwardStreamError(PSQLError, read: Bool, cleanupContext: CleanUpContext?)

// Prepare statement actions
case sendParseDescribeSync(name: String, query: String)
case succeedPreparedStatementCreation(PrepareStatementContext, with: PSQLBackendMessage.RowDescription?)
case succeedPreparedStatementCreation(PrepareStatementContext, with: RowDescription?)
case failPreparedStatementCreation(PrepareStatementContext, with: PSQLError, cleanupContext: CleanUpContext?)

// Close actions
Expand Down Expand Up @@ -713,7 +713,7 @@ struct ConnectionStateMachine {
}
}

mutating func rowDescriptionReceived(_ description: PSQLBackendMessage.RowDescription) -> ConnectionAction {
mutating func rowDescriptionReceived(_ description: RowDescription) -> ConnectionAction {
switch self.state {
case .extendedQuery(var queryState, let connectionContext) where !queryState.isComplete:
return self.avoidingStateMachineCoW { machine -> ConnectionAction in
Expand Down Expand Up @@ -791,7 +791,7 @@ struct ConnectionStateMachine {
}
}

mutating func dataRowReceived(_ dataRow: PSQLBackendMessage.DataRow) -> ConnectionAction {
mutating func dataRowReceived(_ dataRow: DataRow) -> ConnectionAction {
guard case .extendedQuery(var queryState, let connectionContext) = self.state, !queryState.isComplete else {
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.dataRow(dataRow)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ struct ExtendedQueryStateMachine {

case parseCompleteReceived(ExtendedQueryContext)
case parameterDescriptionReceived(ExtendedQueryContext)
case rowDescriptionReceived(ExtendedQueryContext, [PSQLBackendMessage.RowDescription.Column])
case rowDescriptionReceived(ExtendedQueryContext, [RowDescription.Column])
case noDataMessageReceived(ExtendedQueryContext)

/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
/// used after receiving a `bindComplete` message
case bindCompleteReceived(ExtendedQueryContext)
case streaming([PSQLBackendMessage.RowDescription.Column], RowStreamStateMachine)
case streaming([RowDescription.Column], RowStreamStateMachine)

case commandComplete(commandTag: String)
case error(PSQLError)
Expand All @@ -28,13 +28,13 @@ struct ExtendedQueryStateMachine {

// --- general actions
case failQuery(ExtendedQueryContext, with: PSQLError)
case succeedQuery(ExtendedQueryContext, columns: [PSQLBackendMessage.RowDescription.Column])
case succeedQuery(ExtendedQueryContext, columns: [RowDescription.Column])
case succeedQueryNoRowsComming(ExtendedQueryContext, commandTag: String)

// --- streaming actions
// actions if query has requested next row but we are waiting for backend
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
case forwardRows([DataRow])
case forwardStreamComplete([DataRow], commandTag: String)
case forwardStreamError(PSQLError, read: Bool)

case read
Expand Down Expand Up @@ -105,7 +105,7 @@ struct ExtendedQueryStateMachine {
}
}

mutating func rowDescriptionReceived(_ rowDescription: PSQLBackendMessage.RowDescription) -> Action {
mutating func rowDescriptionReceived(_ rowDescription: RowDescription) -> Action {
guard case .parameterDescriptionReceived(let queryContext) = self.state else {
return self.setAndFireError(.unexpectedBackendMessage(.rowDescription(rowDescription)))
}
Expand All @@ -119,7 +119,7 @@ struct ExtendedQueryStateMachine {

// In Postgres extended queries we always request the response rows to be returned in
// `.binary` format.
let columns = rowDescription.columns.map { column -> PSQLBackendMessage.RowDescription.Column in
let columns = rowDescription.columns.map { column -> RowDescription.Column in
var column = column
column.format = .binary
return column
Expand Down Expand Up @@ -155,12 +155,12 @@ struct ExtendedQueryStateMachine {
}
}

mutating func dataRowReceived(_ dataRow: PSQLBackendMessage.DataRow) -> Action {
mutating func dataRowReceived(_ dataRow: DataRow) -> Action {
switch self.state {
case .streaming(let columns, var demandStateMachine):
// When receiving a data row, we must ensure that the data row column count
// matches the previously received row description column count.
guard dataRow.columns.count == columns.count else {
guard dataRow.columnCount == columns.count else {
return self.setAndFireError(.unexpectedBackendMessage(.dataRow(dataRow)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct PrepareStatementStateMachine {

enum Action {
case sendParseDescribeSync(name: String, query: String)
case succeedPreparedStatementCreation(PrepareStatementContext, with: PSQLBackendMessage.RowDescription?)
case succeedPreparedStatementCreation(PrepareStatementContext, with: RowDescription?)
case failPreparedStatementCreation(PrepareStatementContext, with: PSQLError)

case read
Expand Down Expand Up @@ -72,7 +72,7 @@ struct PrepareStatementStateMachine {
return .succeedPreparedStatementCreation(queryContext, with: nil)
}

mutating func rowDescriptionReceived(_ rowDescription: PSQLBackendMessage.RowDescription) -> Action {
mutating func rowDescriptionReceived(_ rowDescription: RowDescription) -> Action {
guard case .parameterDescriptionReceived(let queryContext) = self.state else {
return self.setAndFireError(.unexpectedBackendMessage(.rowDescription(rowDescription)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,28 @@ struct RowStreamStateMachine {

private enum State {
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
case waitingForRows(CircularBuffer<PSQLBackendMessage.DataRow>)
case waitingForRows([DataRow])
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
/// empty. It is preserved for performance reasons.
case waitingForReadOrDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
case waitingForReadOrDemand([DataRow])
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
case waitingForRead(CircularBuffer<PSQLBackendMessage.DataRow>)
case waitingForRead([DataRow])
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
/// preserved for performance reasons.
case waitingForDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
case waitingForDemand([DataRow])

case modifying
}

private var state: State

init() {
self.state = .waitingForRows(CircularBuffer(initialCapacity: 32))
var buffer = [DataRow]()
buffer.reserveCapacity(32)
self.state = .waitingForRows(buffer)
}

mutating func receivedRow(_ newRow: PSQLBackendMessage.DataRow) {
mutating func receivedRow(_ newRow: DataRow) {
switch self.state {
case .waitingForRows(var buffer):
self.state = .modifying
Expand Down Expand Up @@ -66,7 +68,7 @@ struct RowStreamStateMachine {
}
}

mutating func channelReadComplete() -> CircularBuffer<PSQLBackendMessage.DataRow>? {
mutating func channelReadComplete() -> [DataRow]? {
switch self.state {
case .waitingForRows(let buffer):
if buffer.isEmpty {
Expand Down Expand Up @@ -139,7 +141,7 @@ struct RowStreamStateMachine {
}
}

mutating func end() -> CircularBuffer<PSQLBackendMessage.DataRow> {
mutating func end() -> [DataRow] {
switch self.state {
case .waitingForRows(let buffer):
return buffer
Expand Down
60 changes: 30 additions & 30 deletions Sources/PostgresNIO/New/Data/Array+PSQLCodable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,81 +2,81 @@ import NIOCore
import struct Foundation.UUID

/// A type, of which arrays can be encoded into and decoded from a postgres binary format
protocol PSQLArrayElement: PSQLCodable {
public protocol PSQLArrayElement: PSQLCodable {
static var psqlArrayType: PSQLDataType { get }
static var psqlArrayElementType: PSQLDataType { get }
}

extension Bool: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .boolArray }
static var psqlArrayElementType: PSQLDataType { .bool }
public static var psqlArrayType: PSQLDataType { .boolArray }
public static var psqlArrayElementType: PSQLDataType { .bool }
}

extension ByteBuffer: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .byteaArray }
static var psqlArrayElementType: PSQLDataType { .bytea }
public static var psqlArrayType: PSQLDataType { .byteaArray }
public static var psqlArrayElementType: PSQLDataType { .bytea }
}

extension UInt8: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .charArray }
static var psqlArrayElementType: PSQLDataType { .char }
public static var psqlArrayType: PSQLDataType { .charArray }
public static var psqlArrayElementType: PSQLDataType { .char }
}

extension Int16: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .int2Array }
static var psqlArrayElementType: PSQLDataType { .int2 }
public static var psqlArrayType: PSQLDataType { .int2Array }
public static var psqlArrayElementType: PSQLDataType { .int2 }
}

extension Int32: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .int4Array }
static var psqlArrayElementType: PSQLDataType { .int4 }
public static var psqlArrayType: PSQLDataType { .int4Array }
public static var psqlArrayElementType: PSQLDataType { .int4 }
}

extension Int64: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .int8Array }
static var psqlArrayElementType: PSQLDataType { .int8 }
public static var psqlArrayType: PSQLDataType { .int8Array }
public static var psqlArrayElementType: PSQLDataType { .int8 }
}

extension Int: PSQLArrayElement {
#if (arch(i386) || arch(arm))
static var psqlArrayType: PSQLDataType { .int4Array }
static var psqlArrayElementType: PSQLDataType { .int4 }
public static var psqlArrayType: PSQLDataType { .int4Array }
public static var psqlArrayElementType: PSQLDataType { .int4 }
#else
static var psqlArrayType: PSQLDataType { .int8Array }
static var psqlArrayElementType: PSQLDataType { .int8 }
public static var psqlArrayType: PSQLDataType { .int8Array }
public static var psqlArrayElementType: PSQLDataType { .int8 }
#endif
}

extension Float: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .float4Array }
static var psqlArrayElementType: PSQLDataType { .float4 }
public static var psqlArrayType: PSQLDataType { .float4Array }
public static var psqlArrayElementType: PSQLDataType { .float4 }
}

extension Double: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .float8Array }
static var psqlArrayElementType: PSQLDataType { .float8 }
public static var psqlArrayType: PSQLDataType { .float8Array }
public static var psqlArrayElementType: PSQLDataType { .float8 }
}

extension String: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .textArray }
static var psqlArrayElementType: PSQLDataType { .text }
public static var psqlArrayType: PSQLDataType { .textArray }
public static var psqlArrayElementType: PSQLDataType { .text }
}

extension UUID: PSQLArrayElement {
static var psqlArrayType: PSQLDataType { .uuidArray }
static var psqlArrayElementType: PSQLDataType { .uuid }
public static var psqlArrayType: PSQLDataType { .uuidArray }
public static var psqlArrayElementType: PSQLDataType { .uuid }
}

extension Array: PSQLEncodable where Element: PSQLArrayElement {
var psqlType: PSQLDataType {
public var psqlType: PSQLDataType {
Element.psqlArrayType
}

var psqlFormat: PSQLFormat {
public var psqlFormat: PSQLFormat {
.binary
}

func encode(into buffer: inout ByteBuffer, context: PSQLEncodingContext) throws {
public func encode(into buffer: inout ByteBuffer, context: PSQLEncodingContext) throws {
// 0 if empty, 1 if not
buffer.writeInteger(self.isEmpty ? 0 : 1, as: UInt32.self)
// b
Expand All @@ -102,7 +102,7 @@ extension Array: PSQLEncodable where Element: PSQLArrayElement {

extension Array: PSQLDecodable where Element: PSQLArrayElement {

static func decode(from buffer: inout ByteBuffer, type: PSQLDataType, format: PSQLFormat, context: PSQLDecodingContext) throws -> Array<Element> {
public static func decode(from buffer: inout ByteBuffer, type: PSQLDataType, format: PSQLFormat, context: PSQLDecodingContext) throws -> Array<Element> {
guard case .binary = format else {
// currently we only support decoding arrays in binary format.
throw PSQLCastingError.failure(targetType: Self.self, type: type, postgresData: buffer, context: context)
Expand All @@ -116,7 +116,7 @@ extension Array: PSQLDecodable where Element: PSQLArrayElement {
throw PSQLCastingError.failure(targetType: Self.self, type: type, postgresData: buffer, context: context)
}

guard let elementType = buffer.readInteger(as: PSQLDataType.self) else {
guard let elementType = buffer.readRawRepresentableInteger(as: PSQLDataType.self) else {
throw PSQLCastingError.failure(targetType: Self.self, type: type, postgresData: buffer, context: context)
}

Expand Down
Loading