-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathMySQLConnection.swift
108 lines (91 loc) · 3.53 KB
/
MySQLConnection.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import NIOCore
import Logging
import NIOSSL
import NIOPosix
public final class MySQLConnection: MySQLDatabase {
public static func connect(
to socketAddress: SocketAddress,
username: String,
database: String,
password: String? = nil,
tlsConfiguration: TLSConfiguration? = .makeClientConfiguration(),
serverHostname: String? = nil,
logger: Logger = .init(label: "codes.vapor.mysql"),
on eventLoop: any EventLoop
) -> EventLoopFuture<MySQLConnection> {
let bootstrap = ClientBootstrap(group: eventLoop)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
logger.debug("Opening new connection to \(socketAddress)")
return bootstrap.connect(to: socketAddress).flatMap { channel in
let sequence = MySQLPacketSequence()
let done = channel.eventLoop.makePromise(of: Void.self)
done.futureResult.whenFailure { _ in
channel.close(mode: .all, promise: nil)
}
return channel.pipeline.addHandlers([
ByteToMessageHandler(MySQLPacketDecoder(
sequence: sequence,
logger: logger
)),
MessageToByteHandler(MySQLPacketEncoder(
sequence: sequence,
logger: logger
)),
MySQLConnectionHandler(logger: logger, state: .handshake(.init(
username: username,
database: database,
password: password,
tlsConfiguration: tlsConfiguration,
serverHostname: serverHostname,
done: done
)), sequence: sequence),
ErrorHandler()
], position: .last).flatMap {
return done.futureResult.map { MySQLConnection(channel: channel, logger: logger) }
}
}
}
public let channel: any Channel
public var eventLoop: any EventLoop {
self.channel.eventLoop
}
public let logger: Logger
public var isClosed: Bool {
!self.channel.isActive
}
internal init(channel: any Channel, logger: Logger) {
self.channel = channel
self.logger = logger
}
public func close() -> EventLoopFuture<Void> {
guard self.channel.isActive else {
return self.channel.eventLoop.makeSucceededFuture(())
}
return self.channel.close(mode: .all)
}
public func send(_ command: any MySQLCommand, logger: Logger) -> EventLoopFuture<Void> {
guard self.channel.isActive else {
return self.channel.eventLoop.makeFailedFuture(MySQLError.closed)
}
let promise = self.eventLoop.makePromise(of: Void.self)
let c = MySQLCommandContext(
handler: command,
promise: promise
)
return self.channel.write(c)
.flatMap { promise.futureResult }
}
public func withConnection<T>(_ closure: @escaping (MySQLConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
closure(self)
}
deinit {
assert(!self.channel.isActive, "MySQLConnection not closed before deinit.")
}
}
final class ErrorHandler: ChannelInboundHandler {
typealias InboundIn = Never
init() { }
func errorCaught(context: ChannelHandlerContext, error: any Error) {
assertionFailure("uncaught error: \(error)")
}
}