Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Async Storage shutdown #3196

Merged
merged 10 commits into from
May 24, 2024
1 change: 1 addition & 0 deletions Sources/Development/routes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public func routes(_ app: Application) throws {
return [cred1]
}

@Sendable
func opaqueRouteTester(_ req: Request) async throws -> some AsyncResponseEncodable {
"Hello World"
}
Expand Down
25 changes: 12 additions & 13 deletions Sources/Vapor/Application.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public final class Application: Sendable {
_ eventLoopGroupProvider: EventLoopGroupProvider = .singleton
) {
self.init(environment, eventLoopGroupProvider, async: false)
self.asyncCommands.use(self.servers.command, as: "serve", isDefault: true)
DotEnvFile.load(for: environment, on: .shared(self.eventLoopGroup), fileio: self.fileio, logger: self.logger)
}

Expand Down Expand Up @@ -155,12 +156,12 @@ public final class Application: Sendable {
self.servers.use(.http)
self.clients.initialize()
self.clients.use(.http)
self.asyncCommands.use(self.servers.command, as: "serve", isDefault: true)
self.asyncCommands.use(RoutesCommand(), as: "routes")
}

public static func make(_ environment: Environment = .development, _ eventLoopGroupProvider: EventLoopGroupProvider = .singleton) async throws -> Application {
let app = Application(environment, eventLoopGroupProvider, async: true)
await app.asyncCommands.use(app.servers.asyncCommand, as: "serve", isDefault: true)
await DotEnvFile.load(for: app.environment, fileio: app.fileio, logger: app.logger)
return app
}
Expand Down Expand Up @@ -271,8 +272,11 @@ public final class Application: Sendable {

self.logger.trace("Shutting down providers")
self.lifecycle.handlers.reversed().forEach { $0.shutdown(self) }

triggerShutdown()
self.lifecycle.handlers = []

self.logger.trace("Clearing Application storage")
self.storage.shutdown()
self.storage.clear()

switch self.eventLoopGroupProvider {
case .shared:
Expand All @@ -298,8 +302,11 @@ public final class Application: Sendable {
for handler in self.lifecycle.handlers.reversed() {
await handler.shutdownAsync(self)
}

triggerShutdown()
self.lifecycle.handlers = []

self.logger.trace("Clearing Application storage")
await self.storage.asyncShutdown()
self.storage.clear()

switch self.eventLoopGroupProvider {
case .shared:
Expand All @@ -316,14 +323,6 @@ public final class Application: Sendable {
self._didShutdown.withLockedValue { $0 = true }
self.logger.trace("Application shutdown complete")
}

private func triggerShutdown() {
self.lifecycle.handlers = []

self.logger.trace("Clearing Application storage")
self.storage.shutdown()
self.storage.clear()
}

deinit {
self.logger.trace("Application deinitialized, goodbye!")
Expand Down
2 changes: 1 addition & 1 deletion Sources/Vapor/Client/ClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import NIOHTTP1
import Foundation

public struct ClientResponse {
public struct ClientResponse: Sendable {
public var status: HTTPStatus
public var headers: HTTPHeaders
public var body: ByteBuffer?
Expand Down
11 changes: 11 additions & 0 deletions Sources/Vapor/Commands/ServeCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public final class ServeCommand: AsyncCommand, Sendable {
self.box.withLockedValue { $0 = box }
}

@available(*, noasync, message: "Use the async asyncShutdown() method instead.")
func shutdown() {
var box = self.box.withLockedValue { $0 }
box.didShutdown = true
Expand All @@ -115,6 +116,16 @@ public final class ServeCommand: AsyncCommand, Sendable {
self.box.withLockedValue { $0 = box }
}

func asyncShutdown() async {
var box = self.box.withLockedValue { $0 }
box.didShutdown = true
box.running?.stop()
await box.server?.shutdown()
box.signalSources.forEach { $0.cancel() } // clear refs
box.signalSources = []
self.box.withLockedValue { $0 = box }
}

deinit {
assert(self.box.withLockedValue({ $0.didShutdown }), "ServeCommand did not shutdown before deinit")
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Vapor/HTTP/Client/Application+HTTP+Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ extension Application.HTTP {
configuration: self.configuration,
backgroundActivityLogger: self.application.logger
)
self.application.storage.set(Key.self, to: new) {
try $0.syncShutdown()
self.application.storage.setFirstTime(Key.self, to: new, onShutdown: { try $0.syncShutdown() }) {
try await $0.shutdown()
}
return new
}
Expand Down
15 changes: 15 additions & 0 deletions Sources/Vapor/Server/Application+Servers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extension Application {
self.storage.makeServer.withLockedValue { $0 = .init(factory: makeServer) }
}

@available(*, noasync, renamed: "asyncCommand", message: "Use the async property instead.")
public var command: ServeCommand {
if let existing = self.application.storage.get(CommandKey.self) {
return existing
Expand All @@ -62,6 +63,20 @@ extension Application {
return new
}
}

public var asyncCommand: ServeCommand {
get async {
if let existing = self.application.storage.get(CommandKey.self) {
return existing
} else {
let new = ServeCommand()
await self.application.storage.setWithAsyncShutdown(CommandKey.self, to: new) {
await $0.asyncShutdown()
}
return new
}
}
}

let application: Application

Expand Down
59 changes: 59 additions & 0 deletions Sources/Vapor/Utilities/Storage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,25 @@ public struct Storage: Sendable {
struct Value<T: Sendable>: AnyStorageValue {
var value: T
var onShutdown: (@Sendable (T) throws -> ())?
var onAsyncShutdown: (@Sendable (T) async throws -> ())?
func shutdown(logger: Logger) {
do {
try self.onShutdown?(self.value)
} catch {
logger.warning("Could not shutdown \(T.self): \(error)")
}
}
func asyncShutdown(logger: Logger) async {
do {
if let onAsyncShutdown {
try await onAsyncShutdown(self.value)
} else {
try self.onShutdown?(self.value)
}
} catch {
logger.warning("Could not shutdown \(T.self): \(error)")
}
}
}

/// The logger provided to shutdown closures.
Expand Down Expand Up @@ -79,6 +91,7 @@ public struct Storage: Sendable {
/// Set or remove a value for a given key, optionally providing a shutdown closure for the value.
///
/// If a key that has a shutdown closure is removed by this method, the closure **is** invoked.
@available(*, noasync, message: "Use the async setWithAsyncShutdown() method instead.", renamed: "setWithAsyncShutdown")
public mutating func set<Key>(
_ key: Key.Type,
to value: Key.Value?,
Expand All @@ -94,20 +107,66 @@ public struct Storage: Sendable {
existing.shutdown(logger: self.logger)
}
}

/// Set or remove a value for a given key, optionally providing an async shutdown closure for the value.
///
/// If a key that has a shutdown closure is removed by this method, the closure **is** invoked.
public mutating func setWithAsyncShutdown<Key>(
_ key: Key.Type,
to value: Key.Value?,
onShutdown: (@Sendable (Key.Value) async throws -> ())? = nil
) async
where Key: StorageKey
{
let key = ObjectIdentifier(Key.self)
if let value = value {
self.storage[key] = Value(value: value, onShutdown: nil, onAsyncShutdown: onShutdown)
} else if let existing = self.storage[key] {
self.storage[key] = nil
await existing.asyncShutdown(logger: self.logger)
}
gwynne marked this conversation as resolved.
Show resolved Hide resolved
}

// Provides a way to set an async shutdown with an async call to avoid breaking the API
// This must not be called when a value alraedy exists in storage
mutating func setFirstTime<Key>(
_ key: Key.Type,
to value: Key.Value?,
onShutdown: (@Sendable (Key.Value) throws -> ())? = nil,
onAsyncShutdown: (@Sendable (Key.Value) async throws -> ())? = nil
)
where Key: StorageKey
{
let key = ObjectIdentifier(Key.self)
precondition(self.storage[key] == nil, "You must not call this when a value already exists in storage")
if let value {
self.storage[key] = Value(value: value, onShutdown: onShutdown, onAsyncShutdown: onAsyncShutdown)
}
}

/// For every key in the container having a shutdown closure, invoke the closure. Designed to
/// be invoked during an explicit app shutdown process or in a reference type's `deinit`.
@available(*, noasync, message: "Use the async asyncShutdown() method instead.")
public func shutdown() {
self.storage.values.forEach {
$0.shutdown(logger: self.logger)
}
}

/// For every key in the container having a shutdown closure, invoke the closure. Designed to
/// be invoked during an explicit app shutdown process or in a reference type's `deinit`.
public func asyncShutdown() async {
for value in self.storage.values {
await value.asyncShutdown(logger: self.logger)
}
}
}

/// ``Storage`` uses this protocol internally to generically invoke shutdown closures for arbitrarily-
/// typed key values.
protocol AnyStorageValue: Sendable {
func shutdown(logger: Logger)
func asyncShutdown(logger: Logger) async
}

/// A key used to store values in a ``Storage`` must conform to this protocol.
Expand Down