-
-
Notifications
You must be signed in to change notification settings - Fork 54
/
Copy pathFluentPostgresDatabase.swift
142 lines (122 loc) · 5.73 KB
/
FluentPostgresDatabase.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import FluentKit
import FluentSQL
import Logging
import PostgresKit
import PostgresNIO
import SQLKit
struct _FluentPostgresDatabase<E: PostgresJSONEncoder, D: PostgresJSONDecoder> {
let database: any SQLDatabase
let context: DatabaseContext
let encodingContext: PostgresEncodingContext<E>
let decodingContext: PostgresDecodingContext<D>
let inTransaction: Bool
}
extension _FluentPostgresDatabase: Database {
func execute(
query: DatabaseQuery,
onOutput: @escaping @Sendable (any DatabaseOutput) -> ()
) -> EventLoopFuture<Void> {
var expression = SQLQueryConverter(delegate: PostgresConverterDelegate()).convert(query)
/// For `.create` query actions, we want to return the generated IDs, unless the `customIDKey` is the
/// empty string, which we use as a very hacky signal for "we don't implement this for composite IDs yet".
if case .create = query.action, query.customIDKey != .some(.string("")) {
expression = SQLKit.SQLList([expression, SQLReturning(.init((query.customIDKey ?? .id).description))], separator: SQLRaw(" "))
}
return self.execute(sql: expression, { onOutput($0.databaseOutput()) })
}
func execute(schema: DatabaseSchema) -> EventLoopFuture<Void> {
let expression = SQLSchemaConverter(delegate: PostgresConverterDelegate()).convert(schema)
return self.execute(sql: expression,
// N.B.: Don't fatalError() here; what're users supposed to do about it?
{ self.logger.debug("Unexpected row returned from schema query: \($0)") }
)
}
func execute(enum e: DatabaseEnum) -> EventLoopFuture<Void> {
switch e.action {
case .create:
return e.createCases.reduce(self.create(enum: e.name)) { $0.value($1) }.run()
case .update:
if !e.deleteCases.isEmpty {
self.logger.debug("PostgreSQL does not support deleting enum cases.")
}
guard !e.createCases.isEmpty else {
return self.eventLoop.makeSucceededFuture(())
}
return self.eventLoop.flatten(e.createCases.map { create in
self.alter(enum: e.name).add(value: create).run()
})
case .delete:
return self.drop(enum: e.name).run()
}
}
func transaction<T>(_ closure: @escaping @Sendable (any Database) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
guard !self.inTransaction else {
return closure(self)
}
return self.withConnection { conn in
guard let sqlConn = conn as? any SQLDatabase else {
fatalError("""
Connection yielded by a Fluent+Postgres database is not also an SQLDatabase.
This is a bug in Fluent; please report it at https://github.com/vapor/fluent-postgres-driver/issues
""")
}
return sqlConn.raw("BEGIN").run().flatMap {
closure(conn).flatMap { result in
sqlConn.raw("COMMIT").run().and(value: result).map { $1 }
}.flatMapError { error in
sqlConn.raw("ROLLBACK").run().flatMapThrowing { throw error }
}
}
}
}
func withConnection<T>(_ closure: @escaping @Sendable (any Database) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
self.withConnection { (underlying: any PostgresDatabase) in
closure(_FluentPostgresDatabase(
database: underlying.sql(encodingContext: self.encodingContext, decodingContext: self.decodingContext, queryLogLevel: self.database.queryLogLevel),
context: self.context,
encodingContext: self.encodingContext,
decodingContext: self.decodingContext,
inTransaction: true
))
}
}
}
extension _FluentPostgresDatabase: TransactionControlDatabase {
func beginTransaction() -> EventLoopFuture<Void> {
self.raw("BEGIN").run()
}
func commitTransaction() -> EventLoopFuture<Void> {
self.raw("COMMIT").run()
}
func rollbackTransaction() -> EventLoopFuture<Void> {
self.raw("ROLLBACK").run()
}
}
extension _FluentPostgresDatabase: SQLDatabase {
var version: (any SQLDatabaseReportedVersion)? { self.database.version }
var dialect: any SQLDialect { self.database.dialect }
var queryLogLevel: Logger.Level? { self.database.queryLogLevel }
func execute(sql query: any SQLExpression, _ onRow: @escaping @Sendable (any SQLRow) -> ()) -> EventLoopFuture<Void> {
self.database.execute(sql: query, onRow)
}
func execute(sql query: any SQLExpression, _ onRow: @escaping @Sendable (any SQLRow) -> ()) async throws {
try await self.database.execute(sql: query, onRow)
}
func withSession<R>(_ closure: @escaping @Sendable (any SQLDatabase) async throws -> R) async throws -> R {
try await self.database.withSession(closure)
}
}
extension _FluentPostgresDatabase: PostgresDatabase {
func send(_ request: any PostgresRequest, logger: Logger) -> EventLoopFuture<Void> {
self.withConnection { $0.send(request, logger: logger) }
}
func withConnection<T>(_ closure: @escaping (PostgresConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
guard let psqlDb: any PostgresDatabase = self.database as? any PostgresDatabase else {
fatalError("""
Connection yielded by a Fluent+Postgres database is not also a PostgresDatabase.
This is a bug in Fluent; please report it at https://github.com/vapor/fluent-postgres-driver/issues
""")
}
return psqlDb.withConnection(closure)
}
}