Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventloop + thread fixes #2037

Merged
merged 3 commits into from Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Development/routes.swift
Expand Up @@ -61,7 +61,7 @@ public func routes(_ r: Routes, _ c: Container) throws {
}
}

let ip = req.channel.remoteAddress?.description ?? "<no ip>"
let ip = req.remoteAddress?.description ?? "<no ip>"
ws.send("Hello 👋 \(ip)")
}

Expand Down
25 changes: 17 additions & 8 deletions Sources/Vapor/Application.swift
Expand Up @@ -7,15 +7,28 @@ public final class Application {

public var userInfo: [AnyHashable: Any]

public let lock: NSLock
public let sync: Lock

private let configure: (inout Services) throws -> ()

private let threadPool: NIOThreadPool

private var didShutdown: Bool

public var running: Running? {
get {
self.sync.lock()
defer { self.sync.unlock() }
return self._running
}
set {
self.sync.lock()
defer { self.sync.unlock() }
self._running = newValue
}
}

public var running: Running?
private var _running: Running?

public var logger: Logger

Expand All @@ -24,11 +37,7 @@ public final class Application {
public struct Running {
public var stop: () -> Void
public init(stop: @escaping () -> Void) {
self.stop = {
DispatchQueue.global().async {
stop()
}
}
self.stop = stop
}
}

Expand All @@ -41,7 +50,7 @@ public final class Application {
self.userInfo = [:]
self.didShutdown = false
self.configure = configure
self.lock = NSLock()
self.sync = Lock()
self.threadPool = .init(numberOfThreads: 1)
self.threadPool.start()
self.logger = .init(label: "codes.vapor.application")
Expand Down
21 changes: 10 additions & 11 deletions Sources/Vapor/Request/Request.swift
Expand Up @@ -111,14 +111,10 @@ public final class Request: CustomStringConvertible {
desc.append(self.body.description)
return desc.joined(separator: "\n")
}

public let remoteAddress: SocketAddress?

// public var upgrader: HTTPClientProtocolUpgrader?

public let channel: Channel

public var eventLoop: EventLoop {
return self.channel.eventLoop
}
public let eventLoop: EventLoop

public var parameters: Parameters

Expand All @@ -130,15 +126,16 @@ public final class Request: CustomStringConvertible {
version: HTTPVersion = .init(major: 1, minor: 1),
headers: HTTPHeaders = .init(),
collectedBody: ByteBuffer? = nil,
on channel: Channel
remoteAddress: SocketAddress? = nil,
on eventLoop: EventLoop
) {
self.init(
method: method,
url: url,
version: version,
headersNoUpdate: headers,
collectedBody: collectedBody,
on: channel
on: eventLoop
)
if let body = collectedBody {
self.headers.updateContentLength(body.readableBytes)
Expand All @@ -151,7 +148,8 @@ public final class Request: CustomStringConvertible {
version: HTTPVersion = .init(major: 1, minor: 1),
headersNoUpdate headers: HTTPHeaders = .init(),
collectedBody: ByteBuffer? = nil,
on channel: Channel
remoteAddress: SocketAddress? = nil,
on eventLoop: EventLoop
) {
self.method = method
self.url = url
Expand All @@ -162,7 +160,8 @@ public final class Request: CustomStringConvertible {
} else {
self.bodyStorage = .none
}
self.channel = channel
self.remoteAddress = remoteAddress
self.eventLoop = eventLoop
self.parameters = .init()
self.userInfo = [:]
self.isKeepAlive = true
Expand Down
8 changes: 7 additions & 1 deletion Sources/Vapor/Server/HTTPServer.swift
Expand Up @@ -153,11 +153,17 @@ public final class HTTPServer: Server {
configuration: self.configuration,
on: self.application.eventLoopGroup
)
self.connection = try connection.wait()

try self.application.sync.do {
self.connection = try connection.wait()
}
self.didStart = true
}

public func shutdown() {
self.application.sync.lock()
defer { self.application.sync.unlock() }

guard let connection = self.connection else {
fatalError("Called shutdown before start")
}
Expand Down
3 changes: 2 additions & 1 deletion Sources/Vapor/Server/HTTPServerRequestDecoder.swift
Expand Up @@ -47,7 +47,8 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand
url: .init(string: head.uri),
version: head.version,
headersNoUpdate: head.headers,
on: context.channel
remoteAddress: context.channel.remoteAddress,
on: context.channel.eventLoop
)
switch head.version.major {
case 2:
Expand Down
4 changes: 2 additions & 2 deletions Sources/Vapor/Services/Services+Default.swift
Expand Up @@ -54,8 +54,8 @@ extension Services {
}
s.register(MemorySessions.Storage.self) { c in
let app = try c.make(Application.self)
app.lock.lock()
defer { app.lock.unlock() }
app.sync.lock()
defer { app.sync.unlock() }
let key = "memory-sessions-storage"
if let existing = app.userInfo[key] as? MemorySessions.Storage {
return existing
Expand Down
20 changes: 20 additions & 0 deletions Sources/Vapor/Utilities/Lock.swift
@@ -0,0 +1,20 @@
public struct Lock {
private let nslock: NSLock
init() {
self.nslock = .init()
}

public func lock() {
self.nslock.lock()
}

public func unlock() {
self.nslock.unlock()
}

public func `do`(_ closure: () throws -> Void) rethrows {
self.lock()
defer { self.unlock() }
try closure()
}
}