Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import Async


extension PostgreSQLConnection {
/// Note: after calling `listen'` on a connection, it can no longer handle other database operations. Do not try to send other SQL commands through this connection afterwards.
/// IAlso, notifications will only be sent for as long as this connection remains open; you are responsible for opening a new connection to listen on when this one closes.
public func listen(
_ channelName: String,
handler: @escaping (String) throws -> ()
) throws -> Future<Void> {
beforeClose = { conn in
closeHandlers.append({ conn in
let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
return conn.send([.query(query)], onResponse: { _ in })
})

notificationHandlers[channelName] = { message in
try handler(message)
}
let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";")
return queue.enqueue([.query(query)], onInput: { message in
switch message {
case let .notificationResponse(notification):
try handler(notification.message)
try self.notificationHandlers[notification.channel]?(notification.message)
default:
break
}
Expand All @@ -26,4 +33,10 @@ extension PostgreSQLConnection {
let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';")
return send([.query(query)]).map(to: Void.self, { _ in })
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres‘ notify command allows omitting the payload, so how about allowingNil messages in notify?

public func unlisten(_ channelName: String, unlistenHandler: (() -> Void)? = nil) throws -> Future<Void> {
notificationHandlers.removeValue(forKey: channelName)
let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
return send([.query(query)], onResponse: { _ in unlistenHandler?() })
}
}
24 changes: 19 additions & 5 deletions Sources/PostgreSQL/Connection/PostgreSQLConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker {
/// The current query running, if one exists.
private var pipeline: Future<Void>

/// Block type to be called on close of connection
internal typealias CloseHandler = ((PostgreSQLConnection) -> Future<Void>)
/// Called on close of the connection
internal var closeHandlers = [CloseHandler]()
/// Handler type for Notifications
internal typealias NotificationHandler = (String) throws -> Void
/// Handlers to be stored by channel name
internal var notificationHandlers: [String: NotificationHandler] = [:]

/// Creates a new Redis client on the provided data source and sink.
init(queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>, channel: Channel) {
self.queue = queue
Expand Down Expand Up @@ -184,19 +193,24 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker {
}
}

internal var beforeClose: ((PostgreSQLConnection) -> Future<Void>)?

/// Closes this client.
public func close() {
if let beforeClose = beforeClose {
_ = beforeClose(self).then { _ in
self.channel.close(mode: CloseMode.all)
_ = executeCloseHandlersThenClose()
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not: extra newline and two spaces before „func“

private func executeCloseHandlersThenClose() -> Future<Void> {
if let beforeClose = closeHandlers.popLast() {
return beforeClose(self).then { _ in
self.executeCloseHandlersThenClose()
}
} else {
channel.close(promise: nil)
return channel.close(mode: .all)
}
}


/// Called when this class deinitializes.
deinit {
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import Foundation

struct PostgreSQLNotificationResponse: Decodable {
/// The message coming from PSQL
let channel: String
let message: String
init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
_ = try container.decode(Int32.self) // message length
_ = try container.decode(Int32.self) // process id of message
let channelId = try container.decode(String.self)
let message = try? container.decode(String.self)
self.message = message ?? channelId
_ = try container.decode(Int32.self)
channel = try container.decode(String.self)
message = try container.decode(String.self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres allows for now message to be provided, so I think message should be an optional and this an optional decode? Or will the message simply be empty in that case? (At least worth testing.)

}
}
61 changes: 58 additions & 3 deletions Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class PostgreSQLConnectionTests: XCTestCase {
let completionHandlerExpectation2 = expectation(description: "final completion handler called")
let notifyConn = try PostgreSQLConnection.makeTest()
let listenConn = try PostgreSQLConnection.makeTest()
let channelName = "Foo"
let channelName = "Fooze"
let messageText = "Bar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: also test with a message text longer than 3 characters, and test with an empty message? (See above.)

let finalMessageText = "Baz"

Expand All @@ -363,11 +363,65 @@ class PostgreSQLConnectionTests: XCTestCase {
try notifyConn.notify(channelName, message: messageText).wait()
try notifyConn.notify(channelName, message: finalMessageText).wait()

waitForExpectations(timeout: defaultTimeout)
notifyConn.close()
listenConn.close()
}

func testNotifyAndListenOnMultipleChannels() throws {
let completionHandlerExpectation1 = expectation(description: "first completion handler called")
let completionHandlerExpectation2 = expectation(description: "final completion handler called")
let notifyConn = try PostgreSQLConnection.makeTest()
let listenConn = try PostgreSQLConnection.makeTest()
let channelName = "Fooze"
let channelName2 = "Foozalz"
let messageText = "Bar"
let finalMessageText = "Baz"

try listenConn.listen(channelName) { text in
if text == messageText {
completionHandlerExpectation1.fulfill()
}
}.catch({ err in XCTFail("error \(err)") })

try listenConn.listen(channelName2) { text in
if text == finalMessageText {
completionHandlerExpectation2.fulfill()
}
}.catch({ err in XCTFail("error \(err)") })

try notifyConn.notify(channelName, message: messageText).wait()
try notifyConn.notify(channelName2, message: finalMessageText).wait()

waitForExpectations(timeout: defaultTimeout)
notifyConn.close()
listenConn.close()
}

func testUnlisten() throws {
let unlistenHandlerExpectation = expectation(description: "unlisten completion handler called")

let listenHandlerExpectation = expectation(description: "listen completion handler called")

let notifyConn = try PostgreSQLConnection.makeTest()
let listenConn = try PostgreSQLConnection.makeTest()
let channelName = "Foozers"
let messageText = "Bar"

try listenConn.listen(channelName) { text in
if text == messageText {
listenHandlerExpectation.fulfill()
}
}.catch({ err in XCTFail("error \(err)") })

try notifyConn.notify(channelName, message: messageText).wait()
try notifyConn.unlisten(channelName, unlistenHandler: {
unlistenHandlerExpectation.fulfill()
}).wait()
waitForExpectations(timeout: defaultTimeout)
notifyConn.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also test that no further notifications are sent after unlisten has been called.

listenConn.close()
}

func testURLParsing() throws {
let databaseURL = "postgres://username:password@hostname.com:5432/database"
Expand All @@ -389,6 +443,8 @@ class PostgreSQLConnectionTests: XCTestCase {
("testNull", testNull),
("testGH24", testGH24),
("testNotifyAndListen", testNotifyAndListen),
("testNotifyAndListenOnMultipleChannels", testNotifyAndListenOnMultipleChannels),
("testUnlisten", testUnlisten),
("testURLParsing", testURLParsing),
]
}
Expand All @@ -398,8 +454,7 @@ extension PostgreSQLConnection {
static func makeTest() throws -> PostgreSQLConnection {
let hostname: String
#if Xcode
//hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
hostname = "localhost"
hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
#else
hostname = "localhost"
#endif
Expand Down