Skip to content

Fix build warnings in the StreamDemuxerTests #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/KubernetesClient/StreamDemuxer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ public Stream GetStream(byte? inputIndex, byte? outputIndex)

protected async Task RunLoop(CancellationToken cancellationToken)
{
// This is a background task. Immediately yield to the caller.
await Task.Yield();

// Get a 1KB buffer
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 1024);
// This maps remembers bytes skipped for each stream.
Expand Down
11 changes: 7 additions & 4 deletions tests/KubernetesClient.Tests/Mock/MockWebSocket.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Nito.AsyncEx;
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
Expand All @@ -13,7 +14,7 @@ public class MockWebSocket : WebSocket
private WebSocketState state;
private string subProtocol;
private ConcurrentQueue<MessageData> receiveBuffers = new ConcurrentQueue<MessageData>();
private AutoResetEvent receiveEvent = new AutoResetEvent(false);
private AsyncAutoResetEvent receiveEvent = new AsyncAutoResetEvent(false);

public MockWebSocket(string subProtocol = null)
{
Expand Down Expand Up @@ -78,12 +79,13 @@ public override void Dispose()
this.receiveEvent.Set();
}

public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
public override async Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
if (this.receiveBuffers.Count == 0)
{
this.receiveEvent.WaitOne();
await this.receiveEvent.WaitAsync(cancellationToken).ConfigureAwait(false);
}

int bytesReceived = 0;
bool endOfMessage = true;
WebSocketMessageType messageType = WebSocketMessageType.Close;
Expand All @@ -107,7 +109,8 @@ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buf
received.Buffer = received.Buffer.Slice(buffer.Count);
}
}
return Task.FromResult(new WebSocketReceiveResult(bytesReceived, messageType, endOfMessage));

return new WebSocketReceiveResult(bytesReceived, messageType, endOfMessage);
}

public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
Expand Down
29 changes: 14 additions & 15 deletions tests/KubernetesClient.Tests/StreamDemuxerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public StreamDemuxerTests(ITestOutputHelper testOutput)
public async Task SendDataRemoteCommand()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
{
List<byte> sentBuffer = new List<byte>();
ws.MessageSent += (sender, args) =>
{
sentBuffer.AddRange(args.Data.Buffer);
};

StreamDemuxer demuxer = new StreamDemuxer(ws);
Task.Run(() => demuxer.Start());
demuxer.Start();

byte channelIndex = 12;
var stream = demuxer.GetStream(channelIndex, channelIndex);
Expand All @@ -50,15 +50,15 @@ public async Task SendDataRemoteCommand()
public async Task SendMultipleDataRemoteCommand()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
{
List<byte> sentBuffer = new List<byte>();
ws.MessageSent += (sender, args) =>
{
sentBuffer.AddRange(args.Data.Buffer);
};

StreamDemuxer demuxer = new StreamDemuxer(ws);
Task.Run(() => demuxer.Start());
demuxer.Start();

byte channelIndex = 12;
var stream = demuxer.GetStream(channelIndex, channelIndex);
Expand All @@ -80,9 +80,9 @@ public async Task SendMultipleDataRemoteCommand()
public async Task ReceiveDataRemoteCommand()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
{
StreamDemuxer demuxer = new StreamDemuxer(ws);
Task.Run(() => demuxer.Start());
demuxer.Start();

List<byte> receivedBuffer = new List<byte>();
byte channelIndex = 12;
Expand Down Expand Up @@ -129,9 +129,9 @@ public async Task ReceiveDataRemoteCommand()
public async Task ReceiveDataPortForward()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
{
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
Task.Run(() => demuxer.Start());
demuxer.Start();

List<byte> receivedBuffer = new List<byte>();
byte channelIndex = 12;
Expand Down Expand Up @@ -179,9 +179,9 @@ public async Task ReceiveDataPortForward()
public async Task ReceiveDataPortForwardOneByteMessage()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
{
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
Task.Run(() => demuxer.Start());
demuxer.Start();

List<byte> receivedBuffer = new List<byte>();
byte channelIndex = 12;
Expand Down Expand Up @@ -227,9 +227,9 @@ public async Task ReceiveDataPortForwardOneByteMessage()
public async Task ReceiveDataRemoteCommandMultipleStream()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws))
{
StreamDemuxer demuxer = new StreamDemuxer(ws);
Task.Run(() => demuxer.Start());
demuxer.Start();

List<byte> receivedBuffer1 = new List<byte>();
byte channelIndex1 = 1;
Expand Down Expand Up @@ -304,9 +304,9 @@ public async Task ReceiveDataRemoteCommandMultipleStream()
public async Task ReceiveDataPortForwardMultipleStream()
{
using (MockWebSocket ws = new MockWebSocket())
using (StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward))
{
StreamDemuxer demuxer = new StreamDemuxer(ws, StreamType.PortForward);
Task.Run(() => demuxer.Start());
demuxer.Start();

List<byte> receivedBuffer1 = new List<byte>();
byte channelIndex1 = 1;
Expand Down Expand Up @@ -379,7 +379,6 @@ public async Task ReceiveDataPortForwardMultipleStream()
}
}


private static byte[] GenerateRandomBuffer(int length, byte channelIndex, byte content, bool portForward)
{
var buffer = GenerateRandomBuffer(length, content);
Expand Down