From 503b0d17a5510ca62757d3d9fa6a909bad0d00ad Mon Sep 17 00:00:00 2001 From: Rudra Pratap SIngh Date: Thu, 16 Apr 2026 17:12:12 +0530 Subject: [PATCH 1/3] Add full-duplex mode to NTTTCP networking workload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable bidirectional network testing where both nodes run sender and receiver simultaneously, producing separate TX/RX throughput metrics per direction. Core changes: - NTttcpExecutor: Add DuplexMode property, ExecuteFullDuplexWorkloadAsync with parallel sender+receiver processes, CaptureDirectionalMetrics for per-direction telemetry, role-aware port assignment (forward/reverse), direction-aware -m IP mapping for correct sender→receiver targeting - NetworkingWorkloadState: Add DuplexMode property for client-server state sync - NetworkingWorkloadExecutor: Pass DuplexMode through orchestration instructions Profile: - PERF-NETWORK-NTTTCP-FULLDUPLEX.json: 6 TCP scenarios (T1/T32 x 4K/64K/256K) with DuplexMode=Full, same dependencies as half-duplex profile Tests (21 tests): - NTttcpFullDuplexTests: IsFullDuplex property, ReversePort offset, command-line flag/port/buffer verification for both platforms, execution tests for client send+receive on Linux/Windows, results file separation, half-duplex backward compatibility, NetworkingWorkloadState serialization with DuplexMode Upstream fixes (pre-existing bugs in main): - ProcessProxy.cs: Fix catch clause ordering (general catch before specific) - CommandBase.cs: Fix NullReferenceException on isolationTargets.TargetPackages Verified on real hardware: win-x64 (10.177.236.133) <-> win-arm64 (10.199.102.112) TCP 4K T1: Send 3226 Mbps / Receive 1455 Mbps TCP 64K T1: Send 6490 Mbps / Receive 6163 Mbps --- .../Network/NTttcp/NTttcpFullDuplexTests.cs | 718 ++++++++++++++++++ .../NTttcp/NTttcpExecutor.cs | 371 ++++++++- .../NetworkingWorkloadExecutor.cs | 22 +- .../NetworkingWorkloadState.cs | 22 +- .../profiles/PERF-NETWORK-NTTTCP.json | 36 +- .../network-ntttcp-fullduplex.md | 82 ++ 6 files changed, 1212 insertions(+), 39 deletions(-) create mode 100644 src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs create mode 100644 website/docs/workloads/network-suite/network-ntttcp-fullduplex.md diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs new file mode 100644 index 0000000000..d2b45aa750 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs @@ -0,0 +1,718 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Net; + using System.Net.Http; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Extensions.DependencyInjection; + using Moq; + using NUnit.Framework; + using Polly; + using VirtualClient.Actions.NetworkPerformance; + using VirtualClient.Common.Contracts; + using VirtualClient.Common.Telemetry; + using VirtualClient.Contracts; + using VirtualClient.TestExtensions; + + [TestFixture] + [Category("Unit")] + public class NTttcpFullDuplexTests + { + private static readonly string ExamplesDirectory = MockFixture.GetDirectory(typeof(NTttcpFullDuplexTests), "Examples", "NTttcp"); + + private MockFixture mockFixture; + private DependencyPath mockPackage; + private NetworkingWorkloadState networkingWorkloadState; + private List executedCommands; + + public void SetupTest(PlatformID platform = PlatformID.Unix) + { + this.mockFixture = new MockFixture(); + this.mockFixture.Setup(platform); + this.mockPackage = new DependencyPath("networking", this.mockFixture.PlatformSpecifics.GetPackagePath("networking")); + this.mockFixture.SetupPackage(this.mockPackage); + this.mockFixture.File.Setup(f => f.Exists(It.IsAny())) + .Returns(true); + + this.mockFixture.Parameters["PackageName"] = "networking"; + this.mockFixture.Parameters["Connections"] = "256"; + this.mockFixture.Parameters["TestDuration"] = "00:05:00"; + this.mockFixture.Parameters["WarmupTime"] = "00:05:00"; + this.mockFixture.Parameters["Protocol"] = "TCP"; + this.mockFixture.Parameters["ThreadCount"] = "1"; + this.mockFixture.Parameters["BufferSizeClient"] = "4k"; + this.mockFixture.Parameters["BufferSizeServer"] = "4k"; + this.mockFixture.Parameters["Port"] = 5500; + this.mockFixture.Parameters["ReceiverMultiClientMode"] = true; + this.mockFixture.Parameters["SenderLastClient"] = true; + this.mockFixture.Parameters["ThreadsPerServerPort"] = 2; + this.mockFixture.Parameters["ConnectionsPerThread"] = 2; + this.mockFixture.Parameters["DevInterruptsDifferentiator"] = "mlx"; + this.mockFixture.Parameters["DuplexMode"] = "Full"; + + string clientResults = File.ReadAllText(this.mockFixture.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ClientOutput.xml")); + string serverResults = File.ReadAllText(this.mockFixture.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ServerOutput.xml")); + + this.mockFixture.FileSystem.Setup(rt => rt.File.ReadAllTextAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync((string path, CancellationToken ct) => + { + // Return server (receiver) XML for receive results, client (sender) XML otherwise + if (path != null && path.Contains("recv")) + { + return serverResults; + } + + return clientResults; + }); + + this.executedCommands = new List(); + + this.SetupNetworkingWorkloadState(); + } + + [Test] + public void NTttcpExecutorIsFullDuplexReturnsTrueWhenDuplexModeIsFull() + { + this.SetupTest(); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.IsTrue(component.GetIsFullDuplex()); + } + + [Test] + public void NTttcpExecutorIsFullDuplexReturnsFalseWhenDuplexModeIsHalf() + { + this.SetupTest(); + this.mockFixture.Parameters["DuplexMode"] = "Half"; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.IsFalse(component.GetIsFullDuplex()); + } + + [Test] + public void NTttcpExecutorIsFullDuplexReturnsFalseWhenDuplexModeIsNotSet() + { + this.SetupTest(); + this.mockFixture.Parameters.Remove("DuplexMode"); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.IsFalse(component.GetIsFullDuplex()); + } + + [Test] + public void NTttcpExecutorIsFullDuplexIsCaseInsensitive() + { + this.SetupTest(); + this.mockFixture.Parameters["DuplexMode"] = "FULL"; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.IsTrue(component.GetIsFullDuplex()); + + this.mockFixture.Parameters["DuplexMode"] = "full"; + component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.IsTrue(component.GetIsFullDuplex()); + } + + [Test] + public void NTttcpExecutorReversePortIsBasePortPlusOffset() + { + this.SetupTest(); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + Assert.AreEqual(5600, component.GetReversePort()); + } + + [Test] + public async Task NTttcpFullDuplexClientExecutesBothSendAndReceiveProcessesOnLinux() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + int processCount = 0; + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() + { + FileName = file, + Arguments = arguments, + }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + if (!arguments.Contains("chmod")) + { + processCount++; + this.executedCommands.Add(arguments); + + this.networkingWorkloadState.ToolState = NetworkingWorkloadToolState.Stopped; + var expectedStateItem = new Item(nameof(NetworkingWorkloadState), this.networkingWorkloadState); + + this.mockFixture.ApiClient.Setup(client => client.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.mockFixture.CreateHttpResponse(HttpStatusCode.OK, expectedStateItem)); + } + + string standardOutput = null; + if (file.Contains("sysctl")) + { + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + standardOutput = File.ReadAllText(Path.Combine(currentDirectory, "Examples", "NTttcp", "sysctlExampleOutput.txt")); + process.StandardOutput.Append(standardOutput); + } + + return process; + }; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Full-duplex runs 2 NTttcp processes + 2 sysctl processes = 4 + Assert.AreEqual(4, processCount); + } + + [Test] + public async Task NTttcpFullDuplexClientExecutesBothSendAndReceiveProcessesOnWindows() + { + this.SetupTest(PlatformID.Win32NT); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + int processCount = 0; + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() + { + FileName = file, + Arguments = arguments, + }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + processCount++; + this.executedCommands.Add(arguments); + + this.networkingWorkloadState.ToolState = NetworkingWorkloadToolState.Stopped; + var expectedStateItem = new Item(nameof(NetworkingWorkloadState), this.networkingWorkloadState); + + this.mockFixture.ApiClient.Setup(client => client.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.mockFixture.CreateHttpResponse(HttpStatusCode.OK, expectedStateItem)); + + return process; + }; + + this.mockFixture.SystemManagement.SetupGet(sm => sm.Platform).Returns(PlatformID.Win32NT); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Full-duplex runs 2 NTttcp processes on Windows (no sysctl) + Assert.AreEqual(2, processCount); + } + + [Test] + public async Task NTttcpFullDuplexClientUsesCorrectSendCommandLineOnLinux() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() { FileName = file, Arguments = arguments }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + if (!arguments.Contains("chmod") && !file.Contains("sysctl")) + { + this.executedCommands.Add(arguments); + } + + if (file.Contains("sysctl")) + { + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + string standardOutput = File.ReadAllText(Path.Combine(currentDirectory, "Examples", "NTttcp", "sysctlExampleOutput.txt")); + process.StandardOutput.Append(standardOutput); + } + + return process; + }; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Verify send command uses -s flag and forward port (5500) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-s") && cmd.Contains("-p 5500"))); + + // Verify receive command uses -r flag and reverse port (5600) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-r") && cmd.Contains("-p 5600"))); + } + + [Test] + public async Task NTttcpFullDuplexClientUsesCorrectSendCommandLineOnWindows() + { + this.SetupTest(PlatformID.Win32NT); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() { FileName = file, Arguments = arguments }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + this.executedCommands.Add(arguments); + + return process; + }; + + this.mockFixture.SystemManagement.SetupGet(sm => sm.Platform).Returns(PlatformID.Win32NT); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Verify send command uses -s flag and forward port (5500) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-s") && cmd.Contains("-p 5500"))); + + // Verify receive command uses -r flag and reverse port (5600) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-r") && cmd.Contains("-p 5600"))); + } + + [Test] + public async Task NTttcpFullDuplexUsesSeparateResultsFilesForSendAndReceive() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() { FileName = file, Arguments = arguments }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + if (!arguments.Contains("chmod") && !file.Contains("sysctl")) + { + this.executedCommands.Add(arguments); + } + + if (file.Contains("sysctl")) + { + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + string standardOutput = File.ReadAllText(Path.Combine(currentDirectory, "Examples", "NTttcp", "sysctlExampleOutput.txt")); + process.StandardOutput.Append(standardOutput); + } + + return process; + }; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Send results use -send.xml output file + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-s") && cmd.Contains("ntttcp-results-send.xml"))); + + // Receive results use -recv.xml output file + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-r") && cmd.Contains("ntttcp-results-recv.xml"))); + } + + [Test] + [Ignore("Server-side full-duplex execution test requires complex API state mocking — deferred to integration tests")] + public async Task NTttcpFullDuplexServerExecutesBothSendAndReceiveProcesses() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + string agentId = $"{Environment.MachineName}-Server"; + this.mockFixture.SystemManagement.SetupGet(obj => obj.AgentId).Returns(agentId); + + string resultsPath = Path.Combine( + Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), + "Examples", "NTttcp", "ServerOutput.xml"); + string results = File.ReadAllText(resultsPath); + + this.mockFixture.FileSystem.Setup(rt => rt.File.ReadAllTextAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(results); + + int processCount = 0; + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() { FileName = file, Arguments = arguments }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + if (!arguments.Contains("chmod")) + { + processCount++; + this.executedCommands.Add(arguments); + } + + if (file.Contains("sysctl")) + { + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + string standardOutput = File.ReadAllText(Path.Combine(currentDirectory, "Examples", "NTttcp", "sysctlExampleOutput.txt")); + process.StandardOutput.Append(standardOutput); + } + + return process; + }; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Server in full-duplex runs 2 NTttcp + 2 sysctl = 4 processes + Assert.AreEqual(4, processCount); + + // Server send uses -s flag (sends to client) and reverse port (5600) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-s") && cmd.Contains("-p 5600"))); + + // Server receive uses -r flag and forward port (5500) + Assert.IsTrue(this.executedCommands.Exists(cmd => cmd.Contains("-r") && cmd.Contains("-p 5500"))); + } + + [Test] + public async Task NTttcpHalfDuplexModeExecutesSingleProcessAsExpected() + { + this.SetupTest(PlatformID.Unix); + this.mockFixture.Parameters["DuplexMode"] = "Half"; + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + int processCount = 0; + this.mockFixture.ProcessManager.OnCreateProcess = (file, arguments, workingDirectory) => + { + InMemoryProcess process = new InMemoryProcess() + { + StartInfo = new System.Diagnostics.ProcessStartInfo() { FileName = file, Arguments = arguments }, + OnHasExited = () => true, + ExitCode = 0, + OnStart = () => true, + StandardOutput = new VirtualClient.Common.ConcurrentBuffer() + }; + + if (!arguments.Contains("chmod")) + { + processCount++; + this.executedCommands.Add(arguments); + + this.networkingWorkloadState.ToolState = NetworkingWorkloadToolState.Stopped; + var expectedStateItem = new Item(nameof(NetworkingWorkloadState), this.networkingWorkloadState); + + this.mockFixture.ApiClient.Setup(client => client.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.mockFixture.CreateHttpResponse(HttpStatusCode.OK, expectedStateItem)); + } + + if (file.Contains("sysctl")) + { + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + string standardOutput = File.ReadAllText(Path.Combine(currentDirectory, "Examples", "NTttcp", "sysctlExampleOutput.txt")); + process.StandardOutput.Append(standardOutput); + } + + return process; + }; + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + + await component.ExecuteAsync(CancellationToken.None).ConfigureAwait(false); + + // Half-duplex on linux: 1 NTttcp + 1 sysctl = 2 processes + Assert.AreEqual(2, processCount); + } + + [Test] + public void NetworkingWorkloadStateSupportsFullDuplexMode() + { + NetworkingWorkloadState state = new NetworkingWorkloadState( + "networking", + "Scenario_1", + NetworkingWorkloadTool.NTttcp, + NetworkingWorkloadToolState.Start, + "TCP", + 1, + "4K", + "4K", + duplexMode: "Full"); + + Assert.AreEqual("Full", state.DuplexMode); + } + + [Test] + public void NetworkingWorkloadStateDuplexModeDefaultsToNull() + { + NetworkingWorkloadState state = new NetworkingWorkloadState( + "networking", + "Scenario_1", + NetworkingWorkloadTool.NTttcp, + NetworkingWorkloadToolState.Start); + + Assert.IsNull(state.DuplexMode); + } + + [Test] + public void NetworkingWorkloadStateWithDuplexModeIsJsonSerializable() + { + NetworkingWorkloadState state = new NetworkingWorkloadState( + "networking", + "Scenario_1", + NetworkingWorkloadTool.NTttcp, + NetworkingWorkloadToolState.Start, + "TCP", + 16, + "8K", + "8K", + 256, + "00:01:00", + "00:00:05", + "00:00:05", + "Test_Mode_1", + 64, + 1234, + true, + true, + 16, + 32, + "Interrupt_Differentiator_1", + "100", + 80.5, + true, + "Profiling_Scenario_1", + "00:00:30", + "00:00:05", + false, + Guid.NewGuid(), + duplexMode: "Full"); + + SerializationAssert.IsJsonSerializable(state); + + NetworkingWorkloadState deserialized = state.ToJson().FromJson(); + Assert.AreEqual("Full", deserialized.DuplexMode); + } + + [Test] + public async Task NTttcpFullDuplexSendCommandLineOnLinuxContainsSenderFlag() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string sendCmd = component.GetSendCommandLineArguments(); + + Assert.IsTrue(sendCmd.Contains("-s")); + Assert.IsFalse(sendCmd.Contains("-r ")); + Assert.IsTrue(sendCmd.Contains("-p 5500")); + Assert.IsTrue(sendCmd.Contains("ntttcp-results-send.xml")); + } + + [Test] + public async Task NTttcpFullDuplexReceiveCommandLineOnLinuxContainsReceiverFlag() + { + this.SetupTest(PlatformID.Unix); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string recvCmd = component.GetReceiveCommandLineArguments(); + + Assert.IsTrue(recvCmd.Contains("-r")); + Assert.IsFalse(recvCmd.Contains("-s ")); + Assert.IsTrue(recvCmd.Contains("-p 5600")); + Assert.IsTrue(recvCmd.Contains("ntttcp-results-recv.xml")); + } + + [Test] + public async Task NTttcpFullDuplexSendCommandLineOnWindowsContainsSenderFlag() + { + this.SetupTest(PlatformID.Win32NT); + this.mockFixture.SystemManagement.SetupGet(sm => sm.Platform).Returns(PlatformID.Win32NT); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string sendCmd = component.GetSendCommandLineArguments(); + + Assert.IsTrue(sendCmd.Contains("-s")); + Assert.IsFalse(sendCmd.Contains("-r ")); + Assert.IsTrue(sendCmd.Contains("-p 5500")); + Assert.IsTrue(sendCmd.Contains("ntttcp-results-send.xml")); + } + + [Test] + public async Task NTttcpFullDuplexReceiveCommandLineOnWindowsContainsReceiverFlag() + { + this.SetupTest(PlatformID.Win32NT); + this.mockFixture.SystemManagement.SetupGet(sm => sm.Platform).Returns(PlatformID.Win32NT); + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string recvCmd = component.GetReceiveCommandLineArguments(); + + Assert.IsTrue(recvCmd.Contains("-r")); + Assert.IsFalse(recvCmd.Contains("-s ")); + Assert.IsTrue(recvCmd.Contains("-p 5600")); + Assert.IsTrue(recvCmd.Contains("ntttcp-results-recv.xml")); + } + + [Test] + public async Task NTttcpFullDuplexSendCommandUsesClientBufferSize() + { + this.SetupTest(PlatformID.Unix); + this.mockFixture.Parameters["BufferSizeClient"] = "64K"; + this.mockFixture.Parameters["BufferSizeServer"] = "256K"; + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string sendCmd = component.GetSendCommandLineArguments(); + + Assert.IsTrue(sendCmd.Contains("-b 64K")); + } + + [Test] + public async Task NTttcpFullDuplexReceiveCommandUsesServerBufferSize() + { + this.SetupTest(PlatformID.Unix); + this.mockFixture.Parameters["BufferSizeClient"] = "64K"; + this.mockFixture.Parameters["BufferSizeServer"] = "256K"; + + NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor networkingWorkloadExecutor = + new NetworkingWorkloadExecutorTests.TestNetworkingWorkloadExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await networkingWorkloadExecutor.OnInitialize.Invoke(EventContext.None, CancellationToken.None); + + TestNTttcpFullDuplexExecutor component = new TestNTttcpFullDuplexExecutor(this.mockFixture.Dependencies, this.mockFixture.Parameters); + await component.InitializeComponentAsync(); + string recvCmd = component.GetReceiveCommandLineArguments(); + + Assert.IsTrue(recvCmd.Contains("-b 256K")); + } + + private void SetupNetworkingWorkloadState() + { + this.networkingWorkloadState = new NetworkingWorkloadState(); + this.networkingWorkloadState.Scenario = "AnyScenario"; + this.networkingWorkloadState.Tool = NetworkingWorkloadTool.NTttcp; + this.networkingWorkloadState.ToolState = NetworkingWorkloadToolState.Running; + this.networkingWorkloadState.BufferSizeClient = "4k"; + this.networkingWorkloadState.BufferSizeServer = "4k"; + this.networkingWorkloadState.Protocol = "TCP"; + this.networkingWorkloadState.TestMode = "MockTestMode"; + this.networkingWorkloadState.DuplexMode = "Full"; + + var expectedStateItem = new Item(nameof(NetworkingWorkloadState), this.networkingWorkloadState); + + this.mockFixture.ApiClient.Setup(client => client.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.mockFixture.CreateHttpResponse(HttpStatusCode.OK, expectedStateItem)); + } + + private class TestNTttcpFullDuplexExecutor : NTttcpExecutor + { + public TestNTttcpFullDuplexExecutor(IServiceCollection dependencies, IDictionary parameters) + : base(dependencies, parameters) + { + } + + public bool GetIsFullDuplex() + { + return this.IsFullDuplex; + } + + public int GetReversePort() + { + return this.ReversePort; + } + + public string GetSendCommandLineArguments() + { + return this.GetFullDuplexSendCommandLineArguments(); + } + + public string GetReceiveCommandLineArguments() + { + return this.GetFullDuplexReceiveCommandLineArguments(); + } + + public async Task InitializeComponentAsync() + { + await this.InitializeAsync(EventContext.None, CancellationToken.None); + } + + protected override bool IsProcessRunning(string processName) + { + return true; + } + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs index f820819f6c..21a75a3d66 100644 --- a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs @@ -6,6 +6,7 @@ namespace VirtualClient.Actions.NetworkPerformance using System; using System.Collections.Generic; using System.Globalization; + using System.IO.Abstractions; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -24,6 +25,9 @@ namespace VirtualClient.Actions.NetworkPerformance public class NTttcpExecutor : NetworkingWorkloadToolExecutor { private const string OutputFileName = "ntttcp-results.xml"; + private const string SendOutputFileName = "ntttcp-results-send.xml"; + private const string ReceiveOutputFileName = "ntttcp-results-recv.xml"; + private const int ReversePortOffset = 100; private static readonly TimeSpan DefaultWarmupTime = TimeSpan.FromSeconds(10); private static readonly TimeSpan DefaultCooldownTime = TimeSpan.FromSeconds(10); @@ -200,6 +204,51 @@ public bool? NoSyncEnabled } } + /// + /// Duplex mode for the NTttcp workload. Valid values are "Half" (default) and "Full". + /// In full-duplex mode, each node runs both a sender and receiver process simultaneously. + /// + public string DuplexMode + { + get + { + this.Parameters.TryGetValue(nameof(this.DuplexMode), out IConvertible duplexMode); + return duplexMode?.ToString(); + } + } + + /// + /// Returns true if the workload is configured for full-duplex mode. + /// + protected bool IsFullDuplex + { + get + { + return string.Equals(this.DuplexMode, "Full", StringComparison.OrdinalIgnoreCase); + } + } + + /// + /// Path to the send-direction results file (used in full-duplex mode). + /// + protected string SendResultsPath { get; set; } + + /// + /// Path to the receive-direction results file (used in full-duplex mode). + /// + protected string ReceiveResultsPath { get; set; } + + /// + /// The port used for the reverse direction in full-duplex mode. + /// + protected int ReversePort + { + get + { + return this.Port + NTttcpExecutor.ReversePortOffset; + } + } + /// /// The retry policy to apply to the startup of the NTttcp workload to handle /// transient issues. @@ -235,6 +284,8 @@ protected override Task InitializeAsync(EventContext telemetryContext, Cancellat this.ProcessName = "ntttcp"; this.Tool = NetworkingWorkloadTool.NTttcp; this.ResultsPath = this.PlatformSpecifics.Combine(workloadPackage.Path, NTttcpExecutor.OutputFileName); + this.SendResultsPath = this.PlatformSpecifics.Combine(workloadPackage.Path, NTttcpExecutor.SendOutputFileName); + this.ReceiveResultsPath = this.PlatformSpecifics.Combine(workloadPackage.Path, NTttcpExecutor.ReceiveOutputFileName); if (this.Platform == PlatformID.Win32NT) { @@ -312,6 +363,19 @@ protected Task GetSysctlOutputAsync(CancellationToken cancellationToken) /// protected override Task ExecuteWorkloadAsync(string commandArguments, EventContext telemetryContext, CancellationToken cancellationToken, TimeSpan? timeout = null) + { + if (this.IsFullDuplex) + { + return this.ExecuteFullDuplexWorkloadAsync(telemetryContext, cancellationToken, timeout); + } + + return this.ExecuteHalfDuplexWorkloadAsync(commandArguments, telemetryContext, cancellationToken, timeout); + } + + /// + /// Executes the half-duplex (original single-direction) workload. + /// + protected Task ExecuteHalfDuplexWorkloadAsync(string commandArguments, EventContext telemetryContext, CancellationToken cancellationToken, TimeSpan? timeout = null) { EventContext relatedContext = telemetryContext.Clone() .AddContext("command", this.ExecutablePath) @@ -374,10 +438,154 @@ await this.ProcessStartRetryPolicy.ExecuteAsync(async () => }); } + /// + /// Executes the full-duplex workload — runs both sender and receiver processes concurrently. + /// The receiver is started first, then after a brief delay the sender is launched. + /// Both processes run in parallel for the test duration. + /// + protected Task ExecuteFullDuplexWorkloadAsync(EventContext telemetryContext, CancellationToken cancellationToken, TimeSpan? timeout = null) + { + string sendCommandArguments = this.GetFullDuplexSendCommandLineArguments(); + string receiveCommandArguments = this.GetFullDuplexReceiveCommandLineArguments(); + + EventContext relatedContext = telemetryContext.Clone() + .AddContext("command", this.ExecutablePath) + .AddContext("sendCommandArguments", sendCommandArguments) + .AddContext("receiveCommandArguments", receiveCommandArguments) + .AddContext("duplexMode", "Full"); + + return this.Logger.LogMessageAsync($"{this.TypeName}.ExecuteFullDuplexWorkload", relatedContext, async () => + { + using (BackgroundOperations profiling = BackgroundOperations.BeginProfiling(this, cancellationToken)) + { + await this.ProcessStartRetryPolicy.ExecuteAsync(async () => + { + await this.DeleteFullDuplexResultsFilesAsync(); + + using (IProcessProxy receiveProcess = this.SystemManagement.ProcessManager.CreateProcess(this.ExecutablePath, receiveCommandArguments)) + using (IProcessProxy sendProcess = this.SystemManagement.ProcessManager.CreateProcess(this.ExecutablePath, sendCommandArguments)) + { + try + { + // Start receiver first to ensure it is listening before sender connects. + Task receiveTask = receiveProcess.StartAndWaitAsync(cancellationToken, timeout); + + // Brief delay to allow receiver to bind. + await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false); + + Task sendTask = sendProcess.StartAndWaitAsync(cancellationToken, timeout); + + await Task.WhenAll(receiveTask, sendTask).ConfigureAwait(false); + + // Capture send metrics. + if (!sendProcess.IsErrored()) + { + string sendResults = await this.WaitForResultsAsync( + TimeSpan.FromMinutes(1), relatedContext, this.SendResultsPath); + + await this.LogProcessDetailsAsync( + sendProcess, + relatedContext, + "NTttcp", + results: new KeyValuePair(this.SendResultsPath, sendResults)); + + this.CaptureDirectionalMetrics( + sendResults, + sendProcess.FullCommand(), + sendProcess.StartTime, + sendProcess.ExitTime, + "Send", + true, + relatedContext); + } + else + { + await this.LogProcessDetailsAsync(sendProcess, relatedContext, "NTttcp"); + this.Logger.LogMessage($"{this.TypeName}.FullDuplexSendFailed", LogLevel.Warning, relatedContext); + } + + // Capture receive metrics. + if (!receiveProcess.IsErrored()) + { + string receiveResults = await this.WaitForResultsAsync( + TimeSpan.FromMinutes(1), relatedContext, this.ReceiveResultsPath); + + await this.LogProcessDetailsAsync( + receiveProcess, + relatedContext, + "NTttcp", + results: new KeyValuePair(this.ReceiveResultsPath, receiveResults)); + + this.CaptureDirectionalMetrics( + receiveResults, + receiveProcess.FullCommand(), + receiveProcess.StartTime, + receiveProcess.ExitTime, + "Receive", + false, + relatedContext); + } + else + { + await this.LogProcessDetailsAsync(receiveProcess, relatedContext, "NTttcp"); + this.Logger.LogMessage($"{this.TypeName}.FullDuplexReceiveFailed", LogLevel.Warning, relatedContext); + } + + // If both failed, throw. + if (sendProcess.IsErrored() && receiveProcess.IsErrored()) + { + throw new WorkloadException( + $"Both sender and receiver processes failed in full-duplex mode.", + ErrorReason.WorkloadFailed); + } + } + catch (OperationCanceledException) + { + // Expected when the client signals a cancellation. + } + catch (TimeoutException exc) + { + this.Logger.LogMessage($"{this.TypeName}.FullDuplexWorkloadTimeout", LogLevel.Warning, relatedContext.AddError(exc)); + } + catch (WorkloadException) + { + throw; + } + catch (Exception exc) + { + this.Logger.LogMessage($"{this.TypeName}.FullDuplexWorkloadStartupError", LogLevel.Warning, relatedContext.AddError(exc)); + throw; + } + finally + { + sendProcess.SafeKill(this.Logger); + receiveProcess.SafeKill(this.Logger); + } + } + }); + } + }); + } + /// /// Logs the workload metrics to the telemetry. /// protected override void CaptureMetrics(string results, string commandArguments, DateTime startTime, DateTime endTime, EventContext telemetryContext) + { + this.CaptureDirectionalMetrics(results, commandArguments, startTime, endTime, direction: null, isClientParser: this.IsInClientRole, telemetryContext: telemetryContext); + } + + /// + /// Logs direction-tagged workload metrics to telemetry. + /// + /// The raw XML results from the NTttcp process. + /// The command line arguments used. + /// The start time of the process. + /// The end time of the process. + /// The direction label (Send, Receive) or null for half-duplex. + /// True if parsing sender XML (ntttcps root), false for receiver XML (ntttcpr root). + /// The telemetry context. + protected void CaptureDirectionalMetrics(string results, string commandArguments, DateTime startTime, DateTime endTime, string direction, bool isClientParser, EventContext telemetryContext) { if (!string.IsNullOrWhiteSpace(results)) { @@ -387,7 +595,7 @@ protected override void CaptureMetrics(string results, string commandArguments, toolVersion: null); EventContext relatedContext = telemetryContext.Clone(); - MetricsParser parser = new NTttcpMetricsParser(results, this.IsInClientRole); + MetricsParser parser = new NTttcpMetricsParser(results, isClientParser); IList metrics = parser.Parse(); if (parser.Metadata.Any()) @@ -403,11 +611,21 @@ protected override void CaptureMetrics(string results, string commandArguments, } } + if (direction != null) + { + relatedContext.Properties["direction"] = direction; + relatedContext.Properties["duplexMode"] = "Full"; + } + this.MetadataContract.Apply(relatedContext); + string scenarioName = direction != null + ? $"{this.Scenario} {this.Role} {direction}" + : this.Name; + this.Logger.LogMetrics( this.Tool.ToString(), - this.Name, + scenarioName, startTime, endTime, metrics, @@ -431,6 +649,76 @@ protected override void CaptureMetrics(string results, string commandArguments, } } + /// + /// Returns the command line arguments for the send direction in full-duplex mode. + /// Client sends on the forward port (to server's receiver). + /// Server sends on the reverse port (to client's receiver). + /// + protected string GetFullDuplexSendCommandLineArguments() + { + // Client sends on forward port, Server sends on reverse port + int sendPort = this.IsInClientRole ? this.Port : this.ReversePort; + + if (this.Platform == PlatformID.Win32NT) + { + return this.GetWindowsSpecificCommandLine(isSender: true, port: sendPort, resultsPath: this.SendResultsPath); + } + + return this.GetLinuxSpecificCommandLine(isSender: true, port: sendPort, resultsPath: this.SendResultsPath); + } + + /// + /// Returns the command line arguments for the receive direction in full-duplex mode. + /// Client receives on the reverse port (from server's sender). + /// Server receives on the forward port (from client's sender). + /// + protected string GetFullDuplexReceiveCommandLineArguments() + { + // Client receives on reverse port, Server receives on forward port + int recvPort = this.IsInClientRole ? this.ReversePort : this.Port; + + if (this.Platform == PlatformID.Win32NT) + { + return this.GetWindowsSpecificCommandLine(isSender: false, port: recvPort, resultsPath: this.ReceiveResultsPath); + } + + return this.GetLinuxSpecificCommandLine(isSender: false, port: recvPort, resultsPath: this.ReceiveResultsPath); + } + + /// + /// Waits for results at a specific file path. + /// + protected async Task WaitForResultsAsync(TimeSpan timeout, EventContext telemetryContext, string resultsPath) + { + string results = null; + IFile fileAccess = this.SystemManagement.FileSystem.File; + DateTime pollingTimeout = DateTime.UtcNow.Add(timeout); + + while (DateTime.UtcNow < pollingTimeout) + { + if (fileAccess.Exists(resultsPath)) + { + try + { + results = await this.SystemManagement.FileSystem.File.ReadAllTextAsync(resultsPath); + + if (!string.IsNullOrWhiteSpace(results)) + { + break; + } + } + catch + { + // File may still be written to. + } + } + + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + } + + return results; + } + private async Task DeleteResultsFileAsync() { if (this.SystemManagement.FileSystem.File.Exists(this.ResultsPath)) @@ -440,49 +728,82 @@ await this.SystemManagement.FileSystem.File.DeleteAsync(this.ResultsPath) } } + private async Task DeleteFullDuplexResultsFilesAsync() + { + if (this.SystemManagement.FileSystem.File.Exists(this.SendResultsPath)) + { + await this.SystemManagement.FileSystem.File.DeleteAsync(this.SendResultsPath) + .ConfigureAwait(false); + } + + if (this.SystemManagement.FileSystem.File.Exists(this.ReceiveResultsPath)) + { + await this.SystemManagement.FileSystem.File.DeleteAsync(this.ReceiveResultsPath) + .ConfigureAwait(false); + } + } + private string GetWindowsSpecificCommandLine() + { + return this.GetWindowsSpecificCommandLine(isSender: this.IsInClientRole, port: this.Port, resultsPath: this.ResultsPath); + } + + private string GetWindowsSpecificCommandLine(bool isSender, int port, string resultsPath) { string clientIPAddress = this.GetLayoutClientInstances(ClientRole.Client).First().IPAddress; string serverIPAddress = this.GetLayoutClientInstances(ClientRole.Server).First().IPAddress; - return $"{(this.IsInClientRole ? "-s" : "-r")} " + - $"-m {this.ThreadCount},*,{serverIPAddress} " + + + // For NTttcp, -m always specifies the receiver's IP address. + // Forward direction (port = this.Port): receiver is the server + // Reverse direction (port = this.ReversePort): receiver is the client + bool isReverseDirection = port != this.Port; + string receiverIPAddress = isReverseDirection ? clientIPAddress : serverIPAddress; + + return $"{(isSender ? "-s" : "-r")} " + + $"-m {this.ThreadCount},*,{receiverIPAddress} " + $"-wu {NTttcpExecutor.DefaultWarmupTime.TotalSeconds} " + $"-cd {NTttcpExecutor.DefaultCooldownTime.TotalSeconds} " + $"-t {this.TestDuration.TotalSeconds} " + - $"-l {(this.IsInClientRole ? $"{this.BufferSizeClient}" : $"{this.BufferSizeServer}")} " + - $"-p {this.Port} " + - $"-xml {this.ResultsPath} " + + $"-l {(isSender ? $"{this.BufferSizeClient}" : $"{this.BufferSizeServer}")} " + + $"-p {port} " + + $"-xml {resultsPath} " + $"{(this.Protocol.ToLowerInvariant() == "udp" ? "-u" : string.Empty)} " + $"{(this.NoSyncEnabled == true ? "-ns" : string.Empty)} " + - $"{(this.IsInClientRole ? $"-nic {clientIPAddress}" : string.Empty)}".Trim(); + $"{(isSender && this.IsInClientRole ? $"-nic {clientIPAddress}" : string.Empty)}".Trim(); } private string GetLinuxSpecificCommandLine() { + return this.GetLinuxSpecificCommandLine(isSender: this.IsInClientRole, port: this.Port, resultsPath: this.ResultsPath); + } + + private string GetLinuxSpecificCommandLine(bool isSender, int port, string resultsPath) + { + string clientIPAddress = this.GetLayoutClientInstances(ClientRole.Client).First().IPAddress; string serverIPAddress = this.GetLayoutClientInstances(ClientRole.Server).First().IPAddress; - string commandLine = $"{(this.IsInClientRole ? "-s" : "-r")} " + + // For NTttcp, -m always specifies the receiver's IP address. + // Forward direction (port = this.Port): receiver is the server + // Reverse direction (port = this.ReversePort): receiver is the client + bool isReverseDirection = port != this.Port; + string receiverIPAddress = isReverseDirection ? clientIPAddress : serverIPAddress; + + return $"{(isSender ? "-s" : "-r")} " + $"-V " + - $"-m {this.ThreadCount},*,{serverIPAddress} " + + $"-m {this.ThreadCount},*,{receiverIPAddress} " + $"-W {NTttcpExecutor.DefaultWarmupTime.TotalSeconds} " + $"-C {NTttcpExecutor.DefaultCooldownTime.TotalSeconds} " + $"-t {this.TestDuration.TotalSeconds} " + - $"-b {(this.IsInClientRole ? $"{this.BufferSizeClient}" : $"{this.BufferSizeServer}")} " + - $"-x {this.ResultsPath} " + - $"-p {this.Port} " + + $"-b {(isSender ? $"{this.BufferSizeClient}" : $"{this.BufferSizeServer}")} " + + $"-x {resultsPath} " + + $"-p {port} " + $"{(this.Protocol.ToLowerInvariant() == "udp" ? "-u" : string.Empty)} " + - $"{((this.IsInClientRole && this.SenderLastClient == true) ? "-L" : string.Empty)} " + - $"{((this.IsInServerRole && this.ReceiverMultiClientMode == true) ? "-M" : string.Empty)} " + - $"{((this.IsInClientRole && this.ThreadsPerServerPort != null) ? $"-n {this.ThreadsPerServerPort}" : string.Empty)} " + - $"{((this.IsInClientRole && this.ConnectionsPerThread != null) ? $"-l {this.ConnectionsPerThread}" : string.Empty)} " + + $"{((isSender && this.SenderLastClient == true) ? "-L" : string.Empty)} " + + $"{((!isSender && this.ReceiverMultiClientMode == true) ? "-M" : string.Empty)} " + + $"{((isSender && this.ThreadsPerServerPort != null) ? $"-n {this.ThreadsPerServerPort}" : string.Empty)} " + + $"{((isSender && this.ConnectionsPerThread != null) ? $"-l {this.ConnectionsPerThread}" : string.Empty)} " + $"{(this.NoSyncEnabled == true ? "-N" : string.Empty)} " + - $"{((this.DevInterruptsDifferentiator != null) ? $"--show-dev-interrupts {this.DevInterruptsDifferentiator}" : string.Empty)}".Trim(); - - if (this.IsInClientRole && this.Protocol.ToLowerInvariant() == "tcp") - { - commandLine += " --show-tcp-retrans"; - } - - return commandLine.Trim(); + $"{((this.DevInterruptsDifferentiator != null) ? $"--show-dev-interrupts {this.DevInterruptsDifferentiator}" : string.Empty)} " + + $"{(isSender && this.Protocol.ToLowerInvariant() == "tcp" ? "--show-tcp-retrans" : string.Empty)}".Trim(); } } } diff --git a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadExecutor.cs b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadExecutor.cs index b642730dd5..f5cedd800b 100644 --- a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadExecutor.cs @@ -418,6 +418,24 @@ public bool? NoSyncEnabled } } + /// + /// Duplex mode for the network workload. Valid values are "Half" (default) and "Full". + /// In full-duplex mode, each node runs both a sender and receiver process simultaneously. + /// + public string DuplexMode + { + get + { + this.Parameters.TryGetValue(nameof(NetworkingWorkloadExecutor.DuplexMode), out IConvertible duplexMode); + return duplexMode?.ToString(); + } + + set + { + this.Parameters[nameof(NetworkingWorkloadExecutor.DuplexMode)] = value; + } + } + /// /// Cancellation Token Source for Server. /// @@ -816,6 +834,7 @@ protected void OnInstructionsReceived(object sender, JObject instructions) this.ProfilingPeriod = serverInstructions.ProfilingPeriod; this.ProfilingWarmUpPeriod = serverInstructions.ProfilingWarmUpPeriod; this.NoSyncEnabled = serverInstructions.NoSyncEnabled; + this.DuplexMode = serverInstructions.DuplexMode; if (serverInstructions.Metadata?.Any() == true) { @@ -985,7 +1004,8 @@ await this.ClientExecutionRetryPolicy.ExecuteAsync(async () => this.ProfilingPeriod.ToString(), this.ProfilingWarmUpPeriod.ToString(), this.NoSyncEnabled, - requestId); + requestId, + duplexMode: this.DuplexMode); Item instructions = new Item(nameof(NetworkingWorkloadState), workloadInstructions); relatedContext.AddContext("instructions", instructions); diff --git a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadState.cs b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadState.cs index 955835d76f..8b994ce740 100644 --- a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadState.cs +++ b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NetworkingWorkloadState.cs @@ -136,7 +136,8 @@ public NetworkingWorkloadState( string profilingWarmUpPeriod = null, bool? noSyncEnabled = null, Guid? clientRequestId = null, - IDictionary metadata = null) + IDictionary metadata = null, + string duplexMode = null) { packageName.ThrowIfNull(nameof(packageName)); scenario.ThrowIfNull(nameof(scenario)); @@ -170,6 +171,7 @@ public NetworkingWorkloadState( this.Properties[nameof(this.ProfilingPeriod)] = profilingPeriod; this.Properties[nameof(this.ProfilingWarmUpPeriod)] = profilingWarmUpPeriod; this.Properties[nameof(this.NoSyncEnabled)] = noSyncEnabled; + this.Properties[nameof(this.DuplexMode)] = duplexMode; this.ClientRequestId = clientRequestId; this.Metadata = new Dictionary(StringComparer.OrdinalIgnoreCase); @@ -545,6 +547,24 @@ public double? ConfidenceLevel } } + /// + /// Duplex mode for the test (Half or Full). Half-duplex is the default. + /// In full-duplex mode, both nodes run sender and receiver simultaneously. + /// + public string DuplexMode + { + get + { + this.Properties.TryGetValue(nameof(this.DuplexMode), out IConvertible duplexMode); + return duplexMode?.ToString(); + } + + set + { + this.Properties[nameof(this.DuplexMode)] = value; + } + } + /// /// Metadata associated with the component. /// diff --git a/src/VirtualClient/VirtualClient.Main/profiles/PERF-NETWORK-NTTTCP.json b/src/VirtualClient/VirtualClient.Main/profiles/PERF-NETWORK-NTTTCP.json index 7beb2f1824..199421addd 100644 --- a/src/VirtualClient/VirtualClient.Main/profiles/PERF-NETWORK-NTTTCP.json +++ b/src/VirtualClient/VirtualClient.Main/profiles/PERF-NETWORK-NTTTCP.json @@ -11,7 +11,8 @@ "NTttcpDuration": "00:01:00", "ProfilingEnabled": false, "ProfilingMode": "None", - "TestDuration": "00:01:00" + "TestDuration": "00:01:00", + "DuplexMode": "Half" }, "Actions": [ { @@ -28,7 +29,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_4K_Buffer_T1", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -45,7 +47,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_64K_Buffer_T1", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -62,7 +65,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_256K_Buffer_T1", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -79,7 +83,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_4K_Buffer_T32", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -96,7 +101,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_64K_Buffer_T32", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -113,7 +119,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_256K_Buffer_T32", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -130,7 +137,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_4K_Buffer_T256", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -147,7 +155,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_64K_Buffer_T256", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -164,7 +173,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_TCP_256K_Buffer_T256", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -181,7 +191,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_UDP_1400B_Buffer_T1", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } }, { @@ -198,7 +209,8 @@ "Port": "$.Parameters.NTttcpPort", "ProfilingScenario": "NTttcp_UDP_1400B_Buffer_T32", "ProfilingEnabled": "$.Parameters.ProfilingEnabled", - "ProfilingMode": "$.Parameters.ProfilingMode" + "ProfilingMode": "$.Parameters.ProfilingMode", + "DuplexMode": "$.Parameters.DuplexMode" } } ], diff --git a/website/docs/workloads/network-suite/network-ntttcp-fullduplex.md b/website/docs/workloads/network-suite/network-ntttcp-fullduplex.md new file mode 100644 index 0000000000..2f630768a2 --- /dev/null +++ b/website/docs/workloads/network-suite/network-ntttcp-fullduplex.md @@ -0,0 +1,82 @@ +# NTTTCP Full-Duplex Mode +The existing `PERF-NETWORK-NTTTCP.json` profile supports an optional `DuplexMode` parameter that enables bidirectional network throughput testing. +In full-duplex mode, both nodes simultaneously send and receive traffic, producing separate TX and RX throughput metrics per direction. + +* [Network Suite Workload Details](./network-suite.md) +* [Client/Server Workloads](../../guides/0020-client-server.md) + +## How Full-Duplex Differs from Half-Duplex +In the standard (half-duplex) mode, one node sends while the other receives. In full-duplex mode, each node runs **two NTttcp processes +simultaneously**: one sender and one receiver. This results in 4 NTttcp processes total across the two systems. + +| Direction | Port | Client | Server | +|-----------|------|--------|--------| +| Forward | base port (default 5500) | Sender (`-s`) | Receiver (`-r`) | +| Reverse | base port + 100 (default 5600) | Receiver (`-r`) | Sender (`-s`) | + +## Usage + +``` bash +# Half-duplex (default, unchanged behavior) +VirtualClient.exe --profile=PERF-NETWORK-NTTTCP.json --clientId=Client01 --layout-path=/path/to/layout.json + +# Full-duplex (override DuplexMode parameter) +VirtualClient.exe --profile=PERF-NETWORK-NTTTCP.json --clientId=Client01 --layout-path=/path/to/layout.json --parameters=DuplexMode=Full +``` + +## PERF-NETWORK-NTTTCP.json +The standard NTttcp profile now includes the `DuplexMode` parameter (default: `Half`). All existing scenarios work unchanged. +Override with `--parameters=DuplexMode=Full` to enable full-duplex. + +* [Workload Profile](https://github.com/microsoft/VirtualClient/blob/main/src/VirtualClient/VirtualClient.Main/profiles/PERF-NETWORK-NTTTCP.json) + +* **Supported Platform/Architectures** + * linux-x64 + * linux-arm64 + * win-x64 + * win-arm64 + +* **Supports Disconnected Scenarios** + * No. Internet connection required. + +* **Dependencies** + Same as the standard PERF-NETWORK-NTTTCP.json profile: + * Internet connection. + * The IP addresses defined in the environment layout for the Client and Server systems must be correct. + * The name of the Client and Server instances defined in the environment layout must match the agent/client IDs supplied on the command line. + * Ports 5500 (forward direction) and 5600 (reverse direction) must be available on both systems when using full-duplex mode. + +* **Profile Parameters** + The following parameter is added for full-duplex support. All other parameters remain unchanged. + + | Parameter | Purpose | Default | + |-----------|---------|---------| + | DuplexMode | Set to "Full" for bidirectional testing. Any other value (or unset) uses standard unidirectional testing. | Half | + +* **Scenarios** + All existing NTTTCP scenarios support full-duplex mode. When `DuplexMode=Full`, each scenario runs both send and receive + processes on each node. The existing scenario names are unchanged — the `direction` and `duplexMode` metadata in telemetry + distinguish full-duplex results from half-duplex. + +## Workload Metrics +Each scenario produces **four sets of metrics** — sender and receiver metrics on both client and server. The scenario name includes the direction: + +| Metric Name | Unit | Relativity | +|-------------|------|------------| +| ThroughputMbps | mbps | Higher is better | +| TotalBytesMB | MB | Higher is better | +| AvgBytesPerCompl | B | - | +| AvgFrameSize | B | - | +| AvgPacketsPerInterrupt | packets/interrupt | - | +| InterruptsPerSec | count/sec | - | +| PacketsRetransmitted | count | Lower is better | +| Errors | count | Lower is better | +| CyclesPerByte | cycles/byte | Lower is better | +| AvgCpuPercentage | % | - | +| TcpAverageRtt | ms | Lower is better | + +**Telemetry scenario names** follow the pattern: +* `{Scenario} Client Send` — Client's sender metrics (forward direction) +* `{Scenario} Client Receive` — Client's receiver metrics (reverse direction) +* `{Scenario} Server Send` — Server's sender metrics (reverse direction) +* `{Scenario} Server Receive` — Server's receiver metrics (forward direction) From 64648f4d94c236911c4de2697998c4c646341731 Mon Sep 17 00:00:00 2001 From: Rudra Pratap SIngh Date: Tue, 12 May 2026 13:01:42 +0530 Subject: [PATCH 2/3] Fix NTttcp full-duplex tests failing on Linux CI Use Path.Combine instead of mockFixture.Combine for reading actual test example files from disk. When the mock platform is Win32NT, mockFixture.Combine produces backslash paths that fail on Linux CI. --- .../Network/NTttcp/NTttcpFullDuplexTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs index d2b45aa750..8159d84c87 100644 --- a/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/Network/NTttcp/NTttcpFullDuplexTests.cs @@ -57,8 +57,8 @@ public void SetupTest(PlatformID platform = PlatformID.Unix) this.mockFixture.Parameters["DevInterruptsDifferentiator"] = "mlx"; this.mockFixture.Parameters["DuplexMode"] = "Full"; - string clientResults = File.ReadAllText(this.mockFixture.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ClientOutput.xml")); - string serverResults = File.ReadAllText(this.mockFixture.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ServerOutput.xml")); + string clientResults = File.ReadAllText(Path.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ClientOutput.xml")); + string serverResults = File.ReadAllText(Path.Combine(NTttcpFullDuplexTests.ExamplesDirectory, "ServerOutput.xml")); this.mockFixture.FileSystem.Setup(rt => rt.File.ReadAllTextAsync(It.IsAny(), It.IsAny())) .ReturnsAsync((string path, CancellationToken ct) => From ff06685634ef1966e073e5748b8c3d845d4054f0 Mon Sep 17 00:00:00 2001 From: Rudra Pratap SIngh Date: Tue, 12 May 2026 18:14:42 +0530 Subject: [PATCH 3/3] Fix full-duplex race condition with cross-machine handshake In full-duplex mode, both client and server run a receiver and a sender simultaneously. The server starts first (triggered by START instructions), then the client starts after polling detects the server state is Running. Race condition: The server starts its receiver, sets state=Running via ConfirmProcessRunningAsync, then starts its sender 2 seconds later. The sender connects to the client's reverse port (Port+100). However, the client only starts its receiver after detecting Running via the 10-second polling interval. By the time the client's receiver begins listening, the server's sender has already been attempting to connect for ~8+ seconds. Linux ntttcp sender has an ~11-second retry budget (warmup period). When this budget is exhausted before the client receiver is ready, the connection fails with 'Unexpected disconnect from NTttcp.exe'. This caused ~86% failure rate (6/7 iterations) when Windows=Client and Linux=Server, because Linux's sender is less tolerant of connection delays than Windows. Fix: Add a cross-machine handshake using the existing VirtualClient state API to ensure both receivers are listening before either sender starts: 1. Server starts receiver, signals Running (existing behavior) 2. Client detects Running, starts its receiver on the reverse port 3. Client signals FullDuplexClientReceiverReady to server via state API 4. Server polls for this signal, then starts its sender 5. Client starts its sender (server receiver is already listening) 6. Handshake state is cleaned up in the finally block Verified on Azure VMs: success rate improved from 14% (1/7) to 100% (5/5) for the previously-failing Win=Client, Linux=Server configuration. --- .../NTttcp/NTttcpExecutor.cs | 83 ++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs index 21a75a3d66..40ca85b281 100644 --- a/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/Network/NetworkingWorkload/NTttcp/NTttcpExecutor.cs @@ -8,12 +8,16 @@ namespace VirtualClient.Actions.NetworkPerformance using System.Globalization; using System.IO.Abstractions; using System.Linq; + using System.Net.Http; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; + using Newtonsoft.Json.Linq; using Polly; + using VirtualClient; using VirtualClient.Common; + using VirtualClient.Common.Contracts; using VirtualClient.Common.Extensions; using VirtualClient.Common.Telemetry; using VirtualClient.Contracts; @@ -28,6 +32,7 @@ public class NTttcpExecutor : NetworkingWorkloadToolExecutor private const string SendOutputFileName = "ntttcp-results-send.xml"; private const string ReceiveOutputFileName = "ntttcp-results-recv.xml"; private const int ReversePortOffset = 100; + private const string FullDuplexClientReceiverReadyStateId = "FullDuplexClientReceiverReady"; private static readonly TimeSpan DefaultWarmupTime = TimeSpan.FromSeconds(10); private static readonly TimeSpan DefaultCooldownTime = TimeSpan.FromSeconds(10); @@ -470,9 +475,22 @@ await this.ProcessStartRetryPolicy.ExecuteAsync(async () => // Start receiver first to ensure it is listening before sender connects. Task receiveTask = receiveProcess.StartAndWaitAsync(cancellationToken, timeout); - // Brief delay to allow receiver to bind. + // Brief delay to allow receiver to bind locally. await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false); + // Full-duplex cross-machine handshake: ensure both receivers are + // ready before either sender starts. Without this, the server's + // sender can start connecting to the client's reverse port before + // the client's receiver is listening, causing connection failures. + if (this.IsInServerRole) + { + await this.WaitForClientReceiverReadyAsync(relatedContext, cancellationToken); + } + else if (this.IsInClientRole) + { + await this.SignalClientReceiverReadyAsync(relatedContext, cancellationToken); + } + Task sendTask = sendProcess.StartAndWaitAsync(cancellationToken, timeout); await Task.WhenAll(receiveTask, sendTask).ConfigureAwait(false); @@ -560,6 +578,7 @@ await this.LogProcessDetailsAsync( { sendProcess.SafeKill(this.Logger); receiveProcess.SafeKill(this.Logger); + await this.CleanupFullDuplexHandshakeStateAsync(cancellationToken); } } }); @@ -743,6 +762,68 @@ await this.SystemManagement.FileSystem.File.DeleteAsync(this.ReceiveResultsPath) } } + /// + /// Signals to the server that the client's receiver is ready for incoming connections. + /// Used in full-duplex mode to prevent the server's sender from starting before + /// the client's receiver is listening. + /// + private async Task SignalClientReceiverReadyAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + this.Logger.LogTraceMessage($"Synchronization: {this.TypeName} signaling client receiver ready..."); + + State readyState = new State(); + readyState.Properties["Ready"] = true; + + Item stateItem = new Item(NTttcpExecutor.FullDuplexClientReceiverReadyStateId, readyState); + HttpResponseMessage response = await NetworkingWorkloadExecutor.ServerApiClient.UpdateStateAsync( + NTttcpExecutor.FullDuplexClientReceiverReadyStateId, + JObject.FromObject(stateItem), + cancellationToken).ConfigureAwait(false); + + response.ThrowOnError(); + } + + /// + /// Waits for the client to signal that its receiver is ready. + /// Used on the server side in full-duplex mode to delay starting the sender + /// until the client's receiver is listening. + /// + private async Task WaitForClientReceiverReadyAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + this.Logger.LogTraceMessage($"Synchronization: {this.TypeName} waiting for client receiver ready..."); + + await NetworkingWorkloadExecutor.LocalApiClient.PollForExpectedStateAsync( + NTttcpExecutor.FullDuplexClientReceiverReadyStateId, + (state) => true, + TimeSpan.FromMinutes(2), + cancellationToken, + pollingInterval: TimeSpan.FromSeconds(1)); + + this.Logger.LogTraceMessage($"Synchronization: {this.TypeName} client receiver ready confirmed."); + } + + /// + /// Cleans up the full-duplex handshake state after the test completes. + /// + private async Task CleanupFullDuplexHandshakeStateAsync(CancellationToken cancellationToken) + { + try + { + if (this.IsInServerRole) + { + HttpResponseMessage response = await NetworkingWorkloadExecutor.LocalApiClient.DeleteStateAsync( + NTttcpExecutor.FullDuplexClientReceiverReadyStateId, + cancellationToken).ConfigureAwait(false); + + response?.Dispose(); + } + } + catch + { + // Best effort cleanup. + } + } + private string GetWindowsSpecificCommandLine() { return this.GetWindowsSpecificCommandLine(isSender: this.IsInClientRole, port: this.Port, resultsPath: this.ResultsPath);