From 86e1d80117e9c8283b10927a8c3580b262735ec1 Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 14:57:38 -0400 Subject: [PATCH 1/7] added block comment warning that listener is responsible for managing connection --- .../Connection/PostgreSQLConnection+NotifyAndListen.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift index c1c650d7..036a20bb 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift @@ -1,6 +1,8 @@ import Async extension PostgreSQLConnection { + /// Note: after calling `listen'` on a connection, it can no longer handle other database operations. Do not try to send other SQL commands through this connection afterwards. + /// IAlso, notifications will only be sent for as long as this connection remains open; you are responsible for opening a new connection to listen on when this one closes. public func listen( _ channelName: String, handler: @escaping (String) throws -> () From 443581195689a3efe1a1982c273960fd8b25003b Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 16:08:49 -0400 Subject: [PATCH 2/7] @MrMage had an excellent insight into the base decoder, and the length of words, wow I can't belive I missed this --- .../Message/PostgreSQLNotificationResponse.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift index c34c6a6a..ec6ced2e 100644 --- a/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift +++ b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift @@ -2,13 +2,13 @@ import Foundation struct PostgreSQLNotificationResponse: Decodable { /// The message coming from PSQL + let channel: String let message: String init(from decoder: Decoder) throws { let container = try decoder.singleValueContainer() - _ = try container.decode(Int32.self) // message length - _ = try container.decode(Int32.self) // process id of message - let channelId = try container.decode(String.self) - let message = try? container.decode(String.self) - self.message = message ?? channelId + _ = try container.decode(Int32.self) + channel = try container.decode(String.self) + message = try container.decode(String.self) + NSLog("Found self \(channel) \(message)") } } From 54e9e0419bf4d38492ce0a301c23b4c16d3d42f8 Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 16:09:34 -0400 Subject: [PATCH 3/7] changed channel names so they wouldnt hide as 32 bit words --- .../PostgreSQLConnectionTests.swift | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index e8cac7ec..e6487d01 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -33,7 +33,7 @@ class PostgreSQLConnectionTests: XCTestCase { for row in rows { try XCTAssert( row.firstValue(forColumn: "typlen")?.decode(Int.self) == 1 || - row.firstValue(forColumn: "typlen")?.decode(Int.self) == 2 + row.firstValue(forColumn: "typlen")?.decode(Int.self) == 2 ) } } @@ -261,7 +261,7 @@ class PostgreSQLConnectionTests: XCTestCase { XCTAssertEqual(createResult.count, 0) let insertResult = try client.query("insert into foo values ($1, $2);", [ Int32(1), Hello(message: "hello, world") - ]).wait() + ]).wait() XCTAssertEqual(insertResult.count, 0) let parameterizedResult = try client.query("select * from foo").wait() @@ -282,7 +282,7 @@ class PostgreSQLConnectionTests: XCTestCase { let insertResult = try client.query("insert into nulltest (i, d) VALUES ($1, $2)", [ PostgreSQLData(type: .int2, format: .binary, data: Data([0x00, 0x01])), PostgreSQLData(type: .timestamp, format: .binary, data: nil), - ]).wait() + ]).wait() XCTAssertEqual(insertResult.count, 0) let parameterizedResult = try client.query("select * from nulltest").wait() XCTAssertEqual(parameterizedResult.count, 1) @@ -348,7 +348,7 @@ class PostgreSQLConnectionTests: XCTestCase { let completionHandlerExpectation2 = expectation(description: "final completion handler called") let notifyConn = try PostgreSQLConnection.makeTest() let listenConn = try PostgreSQLConnection.makeTest() - let channelName = "Foo" + let channelName = "Fooze" let messageText = "Bar" let finalMessageText = "Baz" @@ -358,17 +358,16 @@ class PostgreSQLConnectionTests: XCTestCase { } else if text == finalMessageText { completionHandlerExpectation2.fulfill() } - }.catch({ err in XCTFail("error \(err)") }) + }.catch({ err in XCTFail("error \(err)") }) try notifyConn.notify(channelName, message: messageText).wait() try notifyConn.notify(channelName, message: finalMessageText).wait() + waitForExpectations(timeout: defaultTimeout) notifyConn.close() listenConn.close() - waitForExpectations(timeout: defaultTimeout) } - func testURLParsing() throws { let databaseURL = "postgres://username:password@hostname.com:5432/database" let config = try PostgreSQLDatabaseConfig(url: databaseURL) @@ -390,7 +389,7 @@ class PostgreSQLConnectionTests: XCTestCase { ("testGH24", testGH24), ("testNotifyAndListen", testNotifyAndListen), ("testURLParsing", testURLParsing), - ] + ] } extension PostgreSQLConnection { @@ -406,7 +405,7 @@ extension PostgreSQLConnection { let group = MultiThreadedEventLoopGroup(numThreads: 1) let client = try PostgreSQLConnection.connect(hostname: hostname, on: group) { error in XCTFail("\(error)") - }.wait() + }.wait() _ = try client.authenticate(username: "vapor_username", database: "vapor_database", password: nil).wait() return client } From 2b20d6081b56863df178e6528b2e11bf79835c92 Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 17:06:01 -0400 Subject: [PATCH 4/7] added the ability to listen to multiple channels --- ...PostgreSQLConnection+NotifyAndListen.swift | 15 ++++- .../Connection/PostgreSQLConnection.swift | 24 ++++++-- .../PostgreSQLNotificationResponse.swift | 1 - .../PostgreSQLConnectionTests.swift | 59 +++++++++++++++++++ 4 files changed, 91 insertions(+), 8 deletions(-) diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift index 036a20bb..06ab84f0 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift @@ -1,5 +1,6 @@ import Async + extension PostgreSQLConnection { /// Note: after calling `listen'` on a connection, it can no longer handle other database operations. Do not try to send other SQL commands through this connection afterwards. /// IAlso, notifications will only be sent for as long as this connection remains open; you are responsible for opening a new connection to listen on when this one closes. @@ -7,15 +8,19 @@ extension PostgreSQLConnection { _ channelName: String, handler: @escaping (String) throws -> () ) throws -> Future { - beforeClose = { conn in + closeHandlers.append({ conn in let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";") return conn.send([.query(query)], onResponse: { _ in }) + }) + + notificationHandlers[channelName] = { message in + try handler(message) } let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";") return queue.enqueue([.query(query)], onInput: { message in switch message { case let .notificationResponse(notification): - try handler(notification.message) + try self.notificationHandlers[notification.channel]?(notification.message) default: break } @@ -28,4 +33,10 @@ extension PostgreSQLConnection { let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';") return send([.query(query)]).map(to: Void.self, { _ in }) } + + public func unlisten(_ channelName: String, unlistenHandler: (() -> Void)? = nil) throws -> Future { + notificationHandlers.removeValue(forKey: channelName) + let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";") + return send([.query(query)], onResponse: { _ in unlistenHandler?() }) + } } diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift index f70b7f2d..6bb3a27e 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift @@ -44,6 +44,15 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker { /// The current query running, if one exists. private var pipeline: Future + /// Block type to be called on close of connection + internal typealias CloseHandler = ((PostgreSQLConnection) -> Future) + /// Called on close of the connection + internal var closeHandlers = [CloseHandler]() + /// Handler type for Notifications + internal typealias NotificationHandler = (String) throws -> Void + /// Handlers to be stored by channel name + internal var notificationHandlers: [String: NotificationHandler] = [:] + /// Creates a new Redis client on the provided data source and sink. init(queue: QueueHandler, channel: Channel) { self.queue = queue @@ -184,19 +193,24 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker { } } - internal var beforeClose: ((PostgreSQLConnection) -> Future)? /// Closes this client. public func close() { - if let beforeClose = beforeClose { - _ = beforeClose(self).then { _ in - self.channel.close(mode: CloseMode.all) + _ = executeCloseHandlersThenClose() + } + + + private func executeCloseHandlersThenClose() -> Future { + if let beforeClose = closeHandlers.popLast() { + return beforeClose(self).then { _ in + self.executeCloseHandlersThenClose() } } else { - channel.close(promise: nil) + return channel.close(mode: .all) } } + /// Called when this class deinitializes. deinit { close() diff --git a/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift index ec6ced2e..cc454c27 100644 --- a/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift +++ b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift @@ -9,6 +9,5 @@ struct PostgreSQLNotificationResponse: Decodable { _ = try container.decode(Int32.self) channel = try container.decode(String.self) message = try container.decode(String.self) - NSLog("Found self \(channel) \(message)") } } diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index e6487d01..6d9912b3 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -368,6 +368,63 @@ class PostgreSQLConnectionTests: XCTestCase { listenConn.close() } + func testNotifyAndListenOnMultipleChannels() throws { + let completionHandlerExpectation1 = expectation(description: "first completion handler called") + let completionHandlerExpectation2 = expectation(description: "final completion handler called") + let notifyConn = try PostgreSQLConnection.makeTest() + let listenConn = try PostgreSQLConnection.makeTest() + let channelName = "Fooze" + let channelName2 = "Foozalz" + let messageText = "Bar" + let finalMessageText = "Baz" + + try listenConn.listen(channelName) { text in + if text == messageText { + completionHandlerExpectation1.fulfill() + } + + }.catch({ err in XCTFail("error \(err)") }) + + try listenConn.listen(channelName2) { text in + if text == finalMessageText { + completionHandlerExpectation2.fulfill() + } + }.catch({ err in XCTFail("error \(err)") }) + + try notifyConn.notify(channelName, message: messageText).wait() + try notifyConn.notify(channelName2, message: finalMessageText).wait() + + waitForExpectations(timeout: defaultTimeout) + notifyConn.close() + listenConn.close() + } + + func testUnlisten() throws { + let completionHandlerExpectation = expectation(description: "notify completion handler called") + completionHandlerExpectation.expectedFulfillmentCount = 2 + completionHandlerExpectation.assertForOverFulfill = true + + let notifyConn = try PostgreSQLConnection.makeTest() + let listenConn = try PostgreSQLConnection.makeTest() + let channelName = "Foozers" + let messageText = "Bar" + + try listenConn.listen(channelName) { text in + if text == messageText { + completionHandlerExpectation.fulfill() + } + }.catch({ err in XCTFail("error \(err)") }) + + try notifyConn.notify(channelName, message: messageText).wait() + try notifyConn.unlisten(channelName, unlistenHandler: { + completionHandlerExpectation.fulfill() + }).wait() + waitForExpectations(timeout: defaultTimeout) + + notifyConn.close() + listenConn.close() + } + func testURLParsing() throws { let databaseURL = "postgres://username:password@hostname.com:5432/database" let config = try PostgreSQLDatabaseConfig(url: databaseURL) @@ -388,6 +445,8 @@ class PostgreSQLConnectionTests: XCTestCase { ("testNull", testNull), ("testGH24", testGH24), ("testNotifyAndListen", testNotifyAndListen), + ("testNotifyAndListenOnMultipleChannels", testNotifyAndListenOnMultipleChannels), + ("testUnlisten", testUnlisten), ("testURLParsing", testURLParsing), ] } From fb828505d2a25f674ed70ddb598de7a15bfb6ea4 Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 17:09:47 -0400 Subject: [PATCH 5/7] got rid of terrible xcode formatting --- .../PostgreSQLConnectionTests.swift | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index 6d9912b3..826c0bfe 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -33,7 +33,7 @@ class PostgreSQLConnectionTests: XCTestCase { for row in rows { try XCTAssert( row.firstValue(forColumn: "typlen")?.decode(Int.self) == 1 || - row.firstValue(forColumn: "typlen")?.decode(Int.self) == 2 + row.firstValue(forColumn: "typlen")?.decode(Int.self) == 2 ) } } @@ -261,7 +261,7 @@ class PostgreSQLConnectionTests: XCTestCase { XCTAssertEqual(createResult.count, 0) let insertResult = try client.query("insert into foo values ($1, $2);", [ Int32(1), Hello(message: "hello, world") - ]).wait() + ]).wait() XCTAssertEqual(insertResult.count, 0) let parameterizedResult = try client.query("select * from foo").wait() @@ -282,7 +282,7 @@ class PostgreSQLConnectionTests: XCTestCase { let insertResult = try client.query("insert into nulltest (i, d) VALUES ($1, $2)", [ PostgreSQLData(type: .int2, format: .binary, data: Data([0x00, 0x01])), PostgreSQLData(type: .timestamp, format: .binary, data: nil), - ]).wait() + ]).wait() XCTAssertEqual(insertResult.count, 0) let parameterizedResult = try client.query("select * from nulltest").wait() XCTAssertEqual(parameterizedResult.count, 1) @@ -358,7 +358,7 @@ class PostgreSQLConnectionTests: XCTestCase { } else if text == finalMessageText { completionHandlerExpectation2.fulfill() } - }.catch({ err in XCTFail("error \(err)") }) + }.catch({ err in XCTFail("error \(err)") }) try notifyConn.notify(channelName, message: messageText).wait() try notifyConn.notify(channelName, message: finalMessageText).wait() @@ -382,14 +382,13 @@ class PostgreSQLConnectionTests: XCTestCase { if text == messageText { completionHandlerExpectation1.fulfill() } - - }.catch({ err in XCTFail("error \(err)") }) + }.catch({ err in XCTFail("error \(err)") }) try listenConn.listen(channelName2) { text in if text == finalMessageText { completionHandlerExpectation2.fulfill() } - }.catch({ err in XCTFail("error \(err)") }) + }.catch({ err in XCTFail("error \(err)") }) try notifyConn.notify(channelName, message: messageText).wait() try notifyConn.notify(channelName2, message: finalMessageText).wait() @@ -448,7 +447,7 @@ class PostgreSQLConnectionTests: XCTestCase { ("testNotifyAndListenOnMultipleChannels", testNotifyAndListenOnMultipleChannels), ("testUnlisten", testUnlisten), ("testURLParsing", testURLParsing), - ] + ] } extension PostgreSQLConnection { From 4a4b7acfa1ad83236c25504ce278dcf067297450 Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 17:10:40 -0400 Subject: [PATCH 6/7] missed a spot --- Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index 826c0bfe..f9c9e020 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -455,15 +455,14 @@ extension PostgreSQLConnection { static func makeTest() throws -> PostgreSQLConnection { let hostname: String #if Xcode - //hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100" - hostname = "localhost" + hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100" #else hostname = "localhost" #endif let group = MultiThreadedEventLoopGroup(numThreads: 1) let client = try PostgreSQLConnection.connect(hostname: hostname, on: group) { error in XCTFail("\(error)") - }.wait() + }.wait() _ = try client.authenticate(username: "vapor_username", database: "vapor_database", password: nil).wait() return client } From fe5774d3927cfdbd634720715c911dd395a7829a Mon Sep 17 00:00:00 2001 From: Shaun Hubbard Date: Wed, 25 Apr 2018 17:33:10 -0400 Subject: [PATCH 7/7] linux diffs in the suite --- Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index f9c9e020..b51c5d51 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -399,9 +399,9 @@ class PostgreSQLConnectionTests: XCTestCase { } func testUnlisten() throws { - let completionHandlerExpectation = expectation(description: "notify completion handler called") - completionHandlerExpectation.expectedFulfillmentCount = 2 - completionHandlerExpectation.assertForOverFulfill = true + let unlistenHandlerExpectation = expectation(description: "unlisten completion handler called") + + let listenHandlerExpectation = expectation(description: "listen completion handler called") let notifyConn = try PostgreSQLConnection.makeTest() let listenConn = try PostgreSQLConnection.makeTest() @@ -410,16 +410,15 @@ class PostgreSQLConnectionTests: XCTestCase { try listenConn.listen(channelName) { text in if text == messageText { - completionHandlerExpectation.fulfill() + listenHandlerExpectation.fulfill() } }.catch({ err in XCTFail("error \(err)") }) try notifyConn.notify(channelName, message: messageText).wait() try notifyConn.unlisten(channelName, unlistenHandler: { - completionHandlerExpectation.fulfill() + unlistenHandlerExpectation.fulfill() }).wait() waitForExpectations(timeout: defaultTimeout) - notifyConn.close() listenConn.close() }