Skip to content

Commit

Permalink
- Close database connections
Browse files Browse the repository at this point in the history
- Use withPooledConnection for database conn so that it is closed automatically.
- Stop discarding futures!
  • Loading branch information
tjshae committed Mar 6, 2019
1 parent e9e7763 commit f0fff15
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 54 deletions.
20 changes: 10 additions & 10 deletions Package.resolved
Expand Up @@ -15,8 +15,8 @@
"repositoryURL": "https://github.com/vapor/core.git",
"state": {
"branch": null,
"revision": "1794ff138bd669175a2528d27695028d7cb30471",
"version": "3.5.0"
"revision": "439d6dcd6c520451ae30d39b2ca9f2aba96c22f4",
"version": "3.7.0"
}
},
{
Expand All @@ -42,8 +42,8 @@
"repositoryURL": "https://github.com/vapor/fluent.git",
"state": {
"branch": null,
"revision": "00b81a9362549facb8e2ac93b17d2a78599fce3b",
"version": "3.1.2"
"revision": "3776a0f623e08b413e878f282a70e8952651c91f",
"version": "3.1.3"
}
},
{
Expand Down Expand Up @@ -87,8 +87,8 @@
"repositoryURL": "https://github.com/vapor/postgresql.git",
"state": {
"branch": null,
"revision": "d34cf720a43ea2ef8dbf2ba7d9f18d39bb1eb795",
"version": "1.2.2"
"revision": "830abbc80de4fad428987bea1bf3d43013f17cc1",
"version": "1.4.0"
}
},
{
Expand Down Expand Up @@ -123,8 +123,8 @@
"repositoryURL": "https://github.com/apple/swift-nio.git",
"state": {
"branch": null,
"revision": "98434c1f1d687ff5a24d2cabfbd19b5c7d2d7a2f",
"version": "1.13.0"
"revision": "87dbd0216c47ea2e7ddb1b545271b716e03b943e",
"version": "1.13.1"
}
},
{
Expand Down Expand Up @@ -186,8 +186,8 @@
"repositoryURL": "https://github.com/vapor/vapor.git",
"state": {
"branch": null,
"revision": "6c7284681c3432fee29c268babf954be6816cbfb",
"version": "3.2.2"
"revision": "c86ada59b31c69f08a6abd4f776537cba48d5df6",
"version": "3.3.0"
}
},
{
Expand Down
34 changes: 18 additions & 16 deletions Package.swift
Expand Up @@ -3,22 +3,24 @@
import PackageDescription

let package = Package(
name: "JobsPostgreSQLDriver",
products: [
.library(
name: "JobsPostgreSQLDriver",
targets: ["JobsPostgreSQLDriver"]),
name: "JobsPostgreSQLDriver",
products: [
.library(
name: "JobsPostgreSQLDriver",
targets: ["JobsPostgreSQLDriver"]),
],
dependencies: [
.package(url: "https://github.com/vapor-community/jobs.git", from: "0.2.0"),
.package(url: "https://github.com/vapor/fluent-postgresql.git", from: "1.0.0")
],
targets: [
.target(
name: "JobsPostgreSQLDriver",
dependencies: ["Jobs", "FluentPostgreSQL"]),
.testTarget(
name: "JobsPostgreSQLDriverTests",
dependencies: ["JobsPostgreSQLDriver"]),
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "3.0.0"),
.package(url: "https://github.com/vapor-community/jobs.git", from: "0.2.0"),
.package(url: "https://github.com/vapor/fluent-postgresql.git", from: "1.0.0")
],
targets: [
.target(
name: "JobsPostgreSQLDriver",
dependencies: ["Vapor", "Jobs", "FluentPostgreSQL"]),
.testTarget(
name: "JobsPostgreSQLDriverTests",
dependencies: ["JobsPostgreSQLDriver"]),
]
)

10 changes: 2 additions & 8 deletions README.md
Expand Up @@ -24,27 +24,21 @@ CREATE TABLE job (
CREATE INDEX job_key_idx ON job(key);
```

Initialize the Jobs service and set the default database for the `JobModel` model in your `confirgure.swift`:
Initialize the Jobs service and set the default database for the `JobModel` model in your `configure.swift`:

```swift
import Jobs
import JobsPostgreSQLDriver

/// Register the Jobs service, with PostgreSQL persistence layer
services.register(JobsPersistenceLayer.self) { container -> JobsPostgreSQLDriver in
return JobsPostgreSQLDriver(database: postgresql, eventLoop: container.next())
return JobsPostgreSQLDriver(databaseIdentifier: .psql, container: container)
}
try jobs(&services)

/// Set the default database on the JobModel
JobModel.defaultDatabase = .psql
```

Where `postgresql` is the instance of your `PostgreSQLDatabase`. ie:

```swift
let postgresql = PostgreSQLDatabase(config: PostgreSQLDatabaseConfig(url: ConfigVars.DATABASE_URL)!)
```


Follow the instructions in the `Jobs` package for more setup and configuration information.
49 changes: 29 additions & 20 deletions Sources/JobsPostgreSQLDriver/JobsPostgreSQLDriver.swift
Expand Up @@ -16,27 +16,31 @@ import NIO
public struct JobsPostgreSQLDriver {

/// The `PostgreSQLDatabase` to run commands on
let database: PostgreSQLDatabase
let databaseIdentifier: DatabaseIdentifier<PostgreSQLDatabase>

/// The `EventLoop` to run jobs on
public let eventLoop: EventLoop
/// The `Container` to run jobs on
public let container: Container

/// Creates a new `JobsPostgreSQLDriver` instance
///
/// - Parameters:
/// - database: The `PostgreSQLDatabase` to run commands on
/// - eventLoop: The `EventLoop` to run jobs on
public init(database: PostgreSQLDatabase, eventLoop: EventLoop) {
self.database = database
self.eventLoop = eventLoop
/// - databaseIdentifier: The `DatabaseIdentifier<PostgreSQLDatabase>` to run commands on
/// - container: The `Container` to run jobs on
public init(databaseIdentifier: DatabaseIdentifier<PostgreSQLDatabase>, container: Container) {
self.databaseIdentifier = databaseIdentifier
self.container = container
}
}

extension JobsPostgreSQLDriver: JobsPersistenceLayer {
public var eventLoop: EventLoop {
return container.next()
}

public func get(key: String) -> EventLoopFuture<JobStorage?> {
// Establish a database connection
return database.newConnection(on: eventLoop).flatMap { conn in

return container.withPooledConnection(to: databaseIdentifier) { conn in
// We ned to use SKIP LOCKED in order to handle multiple threads all getting the next job
// Not sure how to make use of SKIP LOCKED in the QueryBuilder, saw raw SQL it is ...
let sql = PostgreSQLQuery(stringLiteral: """
Expand All @@ -53,7 +57,7 @@ extension JobsPostgreSQLDriver: JobsPersistenceLayer {
)
RETURNING *
""")

// Retrieve the next Job
return conn.query(sql).map(to: JobStorage?.self) { rows in
if let job = rows.first,
Expand All @@ -69,21 +73,28 @@ extension JobsPostgreSQLDriver: JobsPersistenceLayer {

public func set(key: String, jobStorage: JobStorage) -> EventLoopFuture<Void> {
// Establish a database connection
return database.newConnection(on: eventLoop).map { conn in
return container.withPooledConnection(to: databaseIdentifier) { conn in
// Encode and save the Job
let data = try JSONEncoder().encode(jobStorage)
let _ = JobModel(key: key, jobId: jobStorage.id, data: data).save(on: conn)
return JobModel(key: key, jobId: jobStorage.id, data: data).save(on: conn).map { jobModel in
return
}
}
}

public func completed(key: String, jobStorage: JobStorage) -> EventLoopFuture<Void> {
// Establish a database connection
return database.newConnection(on: eventLoop).flatMap { conn in
return container.withPooledConnection(to: databaseIdentifier) { conn in
// Update the state
return JobModel.query(on: conn).filter(\.jobId == jobStorage.id).first().map { jobModel in
jobModel?.state = JobState.completed.rawValue
jobModel?.updatedAt = Date()
let _ = jobModel?.save(on: conn)
return JobModel.query(on: conn).filter(\.jobId == jobStorage.id).first().flatMap { jobModel in
if let jobModel = jobModel {
jobModel.state = JobState.completed.rawValue
jobModel.updatedAt = Date()
return jobModel.save(on: conn).map(to: Void.self) { jobModel in
return
}
}
return conn.future()
}
}
}
Expand Down Expand Up @@ -162,5 +173,3 @@ extension JobModel: Content { }

/// Allows `JobModel` to be used as a dynamic parameter in route definitions.
extension JobModel: Parameter { }


0 comments on commit f0fff15

Please sign in to comment.