diff --git a/Tests/MQTTNIOTests/MQTTNIOTests.swift b/Tests/MQTTNIOTests/MQTTNIOTests.swift index 0f6d8a41..dea7975b 100644 --- a/Tests/MQTTNIOTests/MQTTNIOTests.swift +++ b/Tests/MQTTNIOTests/MQTTNIOTests.swift @@ -451,7 +451,6 @@ final class MQTTNIOTests: XCTestCase { XCTFail("\(error)") } } - _ = try client2.connect(cleanSession: true).wait() _ = try client2.connect(cleanSession: false).wait() _ = try client2.subscribe(to: [.init(topicFilter: "testPersistentAtLeastOnce", qos: .atLeastOnce)]).wait() try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() @@ -461,11 +460,49 @@ final class MQTTNIOTests: XCTestCase { // client2 should receive this publish as we have reconnected with clean session set to false try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() Thread.sleep(forTimeInterval: 1) + + wait(for: [expectation], timeout: 5.0) + + try client.disconnect().wait() + try client2.disconnect().wait() + } + + func testNonPersistentSession() throws { + let expectation = XCTestExpectation(description: "testPersistentSession") + expectation.expectedFulfillmentCount = 1 + expectation.assertForOverFulfill = true + + let payloadString = #"{"from":1000000,"to":1234567,"type":1,"content":"I am a beginner in swift and I am studying hard!!测试\n\n test, message","timestamp":1607243024,"nonce":"pAx2EsUuXrVuiIU3GGOGHNbUjzRRdT5b","sign":"ff902e31a6a5f5343d70a3a93ac9f946adf1caccab539c6f3a6"}"# + let payload = ByteBufferAllocator().buffer(string: payloadString) + + let client = self.createClient(identifier: "testPersistentSession_publisher") + defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) } + _ = try client.connect().wait() + let client2 = self.createClient(identifier: "testPersistentSession_subscriber") + defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) } + + client2.addPublishListener(named: "test") { result in + switch result { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + expectation.fulfill() + + case .failure(let error): + XCTFail("\(error)") + } + } + _ = try client2.connect(cleanSession: false).wait() + _ = try client2.subscribe(to: [.init(topicFilter: "testPersistentAtLeastOnce", qos: .atLeastOnce)]).wait() + try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() + Thread.sleep(forTimeInterval: 1) + // disconnect and reconnect with clean session try client2.disconnect().wait() - // should not receive previous publish on connect as this is a cleanSession _ = try client2.connect(cleanSession: true).wait() // client2 should not receive this publish as we have reconnected with clean session set to true try client.publish(to: "testPersistentAtLeastOnce", payload: payload, qos: .atLeastOnce).wait() + Thread.sleep(forTimeInterval: 1) wait(for: [expectation], timeout: 5.0)