From 2555968a040aeb0d6437cdc1b1b89e1e881069e7 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 19 Dec 2022 08:40:58 +0000 Subject: [PATCH] Don't ack pings twice (#1534) Motivation: gRPC Swift is emitting two acks per ping. NIOHTTP2 is emitting one and the keepalive handler is emitting the other. Modifications: - Don't emit ping acks from the keep alive handler; just let the H2 handler do it. Result: - No unnecessary ping acks are emitted. - Resolves #1520 --- Sources/GRPC/GRPCIdleHandler.swift | 4 + Sources/GRPC/GRPCKeepaliveHandlers.swift | 12 +- Tests/GRPCTests/GRPCPingHandlerTests.swift | 124 ++++++++++++++++----- 3 files changed, 106 insertions(+), 34 deletions(-) diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index b2b87e7af..11c82a075 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -184,6 +184,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { case .none: () + case .ack: + // NIO's HTTP2 handler acks for us so this is a no-op. + () + case .cancelScheduledTimeout: self.scheduledClose?.cancel() self.scheduledClose = nil diff --git a/Sources/GRPC/GRPCKeepaliveHandlers.swift b/Sources/GRPC/GRPCKeepaliveHandlers.swift index d22d182ab..3fa66ed75 100644 --- a/Sources/GRPC/GRPCKeepaliveHandlers.swift +++ b/Sources/GRPC/GRPCKeepaliveHandlers.swift @@ -90,6 +90,7 @@ struct PingHandler { enum Action { case none + case ack case schedulePing(delay: TimeAmount, timeout: TimeAmount) case cancelScheduledTimeout case reply(HTTP2Frame.FramePayload) @@ -170,14 +171,14 @@ struct PingHandler { // This is a valid ping, reset our strike count and reply with a pong. self.pingStrikes = 0 self.lastReceivedPingDate = self.now() - return .reply(self.generatePingFrame(data: pingData, ack: true)) + return .ack } } else { // We don't support ping strikes. We'll just reply with a pong. // // Note: we don't need to update `pingStrikes` or `lastReceivedPingDate` as we don't // support ping strikes. - return .reply(self.generatePingFrame(data: pingData, ack: true)) + return .ack } } @@ -185,20 +186,19 @@ struct PingHandler { if self.shouldBlockPing { return .none } else { - return .reply(self.generatePingFrame(data: self.pingData, ack: false)) + return .reply(self.generatePingFrame(data: self.pingData)) } } private mutating func generatePingFrame( - data: HTTP2PingData, - ack: Bool + data: HTTP2PingData ) -> HTTP2Frame.FramePayload { if self.activeStreams == 0 { self.sentPingsWithoutData += 1 } self.lastSentPingDate = self.now() - return HTTP2Frame.FramePayload.ping(data, ack: ack) + return HTTP2Frame.FramePayload.ping(data, ack: false) } /// Returns true if, on receipt of a ping, the ping should be regarded as a ping strike. diff --git a/Tests/GRPCTests/GRPCPingHandlerTests.swift b/Tests/GRPCTests/GRPCPingHandlerTests.swift index a959962d7..6d86ef0b2 100644 --- a/Tests/GRPCTests/GRPCPingHandlerTests.swift +++ b/Tests/GRPCTests/GRPCPingHandlerTests.swift @@ -15,6 +15,7 @@ */ @testable import GRPC import NIOCore +import NIOEmbedded import NIOHTTP2 import XCTest @@ -249,24 +250,15 @@ class GRPCPingHandlerTests: GRPCTestCase { pingData: HTTP2PingData(withInteger: 1), ack: false ) - XCTAssertEqual( - response, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(response, .ack) // Received another ping, response should be a pong (ping strikes not in effect) response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) - XCTAssertEqual( - response, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(response, .ack) // Received another ping, response should be a pong (ping strikes not in effect) response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) - XCTAssertEqual( - response, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(response, .ack) } func testPingWithoutDataResultsInPongForClient() { @@ -274,10 +266,7 @@ class GRPCPingHandlerTests: GRPCTestCase { self.setupPingHandler(permitWithoutCalls: false) let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) - XCTAssertEqual( - action, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(action, .ack) } func testPingWithoutDataResultsInPongForServer() { @@ -291,10 +280,7 @@ class GRPCPingHandlerTests: GRPCTestCase { ) let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) - XCTAssertEqual( - action, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(action, .ack) } func testPingStrikesOnServer() { @@ -312,10 +298,7 @@ class GRPCPingHandlerTests: GRPCTestCase { pingData: HTTP2PingData(withInteger: 1), ack: false ) - XCTAssertEqual( - response, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(response, .ack) // Received another ping, which is invalid (ping strike), response should be no action response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) @@ -326,10 +309,7 @@ class GRPCPingHandlerTests: GRPCTestCase { // Received another ping, which is valid now, response should be a pong response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) - XCTAssertEqual( - response, - .reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true)) - ) + XCTAssertEqual(response, .ack) // Received another ping, which is invalid (ping strike), response should be no action response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false) @@ -381,6 +361,8 @@ extension PingHandler.Action: Equatable { switch (lhs, rhs) { case (.none, .none): return true + case (.ack, .ack): + return true case (let .schedulePing(lhsDelay, lhsTimeout), let .schedulePing(rhsDelay, rhsTimeout)): return lhsDelay == rhsDelay && lhsTimeout == rhsTimeout case (.cancelScheduledTimeout, .cancelScheduledTimeout): @@ -401,3 +383,89 @@ extension PingHandler.Action: Equatable { } } } + +extension GRPCPingHandlerTests { + func testSingleAckIsEmittedOnPing() throws { + let client = EmbeddedChannel() + let _ = try client.configureHTTP2Pipeline(mode: .client) { _ in + fatalError("Unexpected inbound stream") + }.wait() + + let server = EmbeddedChannel() + let serverMux = try server.configureHTTP2Pipeline(mode: .server) { _ in + fatalError("Unexpected inbound stream") + }.wait() + + let idleHandler = GRPCIdleHandler( + idleTimeout: .minutes(5), + keepalive: .init(), + logger: self.serverLogger + ) + try server.pipeline.syncOperations.addHandler(idleHandler, position: .before(serverMux)) + try server.connect(to: .init(unixDomainSocketPath: "/ignored")).wait() + try client.connect(to: .init(unixDomainSocketPath: "/ignored")).wait() + + func interact(client: EmbeddedChannel, server: EmbeddedChannel) throws { + var didRead = true + while didRead { + didRead = false + + if let data = try client.readOutbound(as: ByteBuffer.self) { + didRead = true + try server.writeInbound(data) + } + + if let data = try server.readOutbound(as: ByteBuffer.self) { + didRead = true + try client.writeInbound(data) + } + } + } + + try interact(client: client, server: server) + + // Settings. + let f1 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self)) + f1.payload.assertSettings(ack: false) + + // Settings ack. + let f2 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self)) + f2.payload.assertSettings(ack: true) + + // Send a ping. + let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(.init(withInteger: 42), ack: false)) + try client.writeOutbound(ping) + try interact(client: client, server: server) + + // Ping ack. + let f3 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self)) + f3.payload.assertPing(ack: true) + + XCTAssertNil(try client.readInbound(as: HTTP2Frame.self)) + } +} + +extension HTTP2Frame.FramePayload { + func assertSettings(ack: Bool, file: StaticString = #file, line: UInt = #line) { + switch self { + case let .settings(settings): + switch settings { + case .ack: + XCTAssertTrue(ack, file: file, line: line) + case .settings: + XCTAssertFalse(ack, file: file, line: line) + } + default: + XCTFail("Expected .settings got \(self)", file: file, line: line) + } + } + + func assertPing(ack: Bool, file: StaticString = #file, line: UInt = #line) { + switch self { + case let .ping(_, ack: pingAck): + XCTAssertEqual(pingAck, ack, file: file, line: line) + default: + XCTFail("Expected .ping got \(self)", file: file, line: line) + } + } +}