-
-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathPostgreSQLConnection.swift
142 lines (114 loc) · 4.93 KB
/
PostgreSQLConnection.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/// A PostgreSQL frontend client.
public final class PostgreSQLConnection: DatabaseConnection, BasicWorker, DatabaseQueryable, SQLConnection {
/// See `DatabaseConnection`.
public typealias Database = PostgreSQLDatabase
/// See `BasicWorker`.
public var eventLoop: EventLoop {
return channel.eventLoop
}
/// If non-nil, will log queries.
public var logger: DatabaseLogger?
/// See `DatabaseConnection`.
public var isClosed: Bool
/// See `Extendable`.
public var extend: Extend
/// Handles enqueued PostgreSQL commands and responses.
internal let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
/// The channel
internal let channel: Channel
/// Previously fetched table name cache
internal var tableNameCache: TableNameCache?
/// In-flight `send(...)` futures.
private var currentSend: Promise<Void>?
/// The current query running, if one exists.
private var pipeline: Future<Void>
/// Creates a new PostgreSQL client on the provided data source and sink.
init(queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>, channel: Channel) {
self.queue = queue
self.channel = channel
self.isClosed = false
self.extend = [:]
self.pipeline = channel.eventLoop.newSucceededFuture(result: ())
channel.closeFuture.always {
self.isClosed = true
if let current = self.currentSend {
current.fail(error: closeError)
}
}
}
/// See `SQLConnection`.
public func decode<D>(_ type: D.Type, from row: [PostgreSQLColumn : PostgreSQLData], table: GenericSQLTableIdentifier<PostgreSQLIdentifier>?) throws -> D where D : Decodable {
if let table = table {
guard let cache = tableNameCache else {
throw PostgreSQLError(identifier: "tableNameCache", reason: "Cannot decode row from specific table without table name cache.")
}
return try PostgreSQLRowDecoder().decode(D.self, from: row, tableOID: cache.tableOID(name: table.identifier.string) ?? 0)
} else {
return try PostgreSQLRowDecoder().decode(D.self, from: row)
}
}
/// Sends `PostgreSQLMessage` to the server.
func send(_ message: [PostgreSQLMessage]) -> Future<[PostgreSQLMessage]> {
var responses: [PostgreSQLMessage] = []
return send(message) { response in
responses.append(response)
}.map(to: [PostgreSQLMessage].self) {
return responses
}
}
/// Sends `PostgreSQLMessage` to the server.
func send(_ messages: [PostgreSQLMessage], onResponse: @escaping (PostgreSQLMessage) throws -> ()) -> Future<Void> {
// if currentSend is not nil, previous send has not completed
assert(currentSend == nil, "Attempting to call `send(...)` again before previous invocation has completed.")
// ensure the connection is not closed
guard !isClosed else {
return eventLoop.newFailedFuture(error: closeError)
}
// create a new promise and store it
let promise = eventLoop.newPromise(Void.self)
currentSend = promise
// cascade this enqueue to the newly created promise
var error: Error?
queue.enqueue(messages) { message in
switch message {
case .readyForQuery:
if let e = error { throw e }
return true
case .error(let e): error = PostgreSQLError.errorResponse(e)
case .notice(let n): debugOnly { WARNING(n.description) }
default: try onResponse(message)
}
return false // request until ready for query
}.cascade(promise: promise)
// when the promise completes, remove the reference to it
promise.futureResult.always { self.currentSend = nil }
// return the promise's future result (same as `queue.enqueue`)
return promise.futureResult
}
/// Submits an async task to be pipelined.
internal func operation(_ work: @escaping () -> Future<Void>) -> Future<Void> {
/// perform this work when the current pipeline future is completed
let new = pipeline.then(work)
/// append this work to the pipeline, discarding errors as the pipeline
//// does not care about them
pipeline = new.catchMap { err in
return ()
}
/// return the newly enqueued work's future result
return new
}
/// Closes this client.
public func close() {
_ = executeCloseHandlersThenClose()
}
/// Executes close handlers before closing.
private func executeCloseHandlersThenClose() -> Future<Void> {
return channel.close(mode: .all)
}
/// Called when this class deinitializes.
deinit {
close()
}
}
// MARK: Private
private let closeError = PostgreSQLError(identifier: "closed", reason: "Connection is closed.")