Skip to content

Commit

Permalink
Merge pull request #414 from oising/readconsoleinputstream-demo
Browse files Browse the repository at this point in the history
ReadConsoleInputStream demo
  • Loading branch information
miniksa committed Apr 25, 2019
2 parents 370cea5 + cfe3eb9 commit 2e0fd58
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 0 deletions.
58 changes: 58 additions & 0 deletions tools/ReadConsoleInputStream/ConcurrentBoundedQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Samples.Terminal
{
/// <summary>
/// Implements a bounded queue that won't block on overflow; instead the oldest item is discarded.
/// </summary>
/// <typeparam name="T"></typeparam>
public class ConcurrentBoundedQueue<T> : ConcurrentQueue<T>
{
public ConcurrentBoundedQueue(int capacity)
{
Capacity = GetAlignedCapacity(capacity);
}

public ConcurrentBoundedQueue(IEnumerable<T> collection, int capacity) : base(collection)
{
Capacity = GetAlignedCapacity(capacity);
}

private int GetAlignedCapacity(int n)
{
if (n < 2)
{
throw new ArgumentException("Capacity must be at least 2");
}

var f = Math.Log(n, 2);
var p = Math.Ceiling(f);

return (int) Math.Pow(2, p);
}

public new void Enqueue(T item)
{
// if we're about to overflow, dump oldest item
if (Count >= Capacity)
{
lock (this)
{
while (Count >= Capacity)
{
TryDequeue(out _);
}
}
}

base.Enqueue(item);
}

public int Capacity
{
get; private set;
}
}
}
31 changes: 31 additions & 0 deletions tools/ReadConsoleInputStream/NativeMethods.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;

namespace Samples.Terminal
{
internal static class NativeMethods
{
private static int MakeHRFromErrorCode(int errorCode)
{
// Don't convert it if it is already an HRESULT
if ((0xFFFF0000 & errorCode) != 0)
{
Debug.Assert(false, "errorCode is already HRESULT");
return errorCode;
}

return unchecked(((int)0x80070000) | errorCode);
}

internal static Exception GetExceptionForWin32Error(int errorCode)
{
return Marshal.GetExceptionForHR(MakeHRFromErrorCode(errorCode));
}

internal static Exception GetExceptionForLastWin32Error()
{
return GetExceptionForWin32Error(Marshal.GetLastWin32Error());
}
}
}
147 changes: 147 additions & 0 deletions tools/ReadConsoleInputStream/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* This is a demo that shows how we can have a stream-oriented view of characters from the console
* while also listening to console events like mouse, menu, focus, buffer/viewport(1) resize events.
*
* This has always been tricky to do because ReadConsoleW/A doesn't allow retrieving events.
* Only ReadConsoleInputW/A returns events, but isn't stream-oriented. Using both doesn't work because
* ReadConsoleW/A flushes the input queue, meaning calls to ReadConsoleInputW/A will wait forever.
*
* I do this by deriving a new Stream class which wraps ReadConsoleInputW and accepts a provider/consumer
* implementation of BlockingCollection<Kernel32.INPUT_RECORD>. This allows asynchronous monitoring of
* console events while simultaneously streaming the character input. I also use Mark Gravell's great
* System.IO.Pipelines utility classes (2) and David Hall's excellent P/Invoke wrappers (3) to make this
* demo cleaner to read; both are pulled from NuGet.
*
* (1) in versions of windows 10 prior to 1809, the buffer resize event only fires for enlarging
* the viewport, as this would cause the buffer to be enlarged too. Now it fires even when
* shrinking the viewport, which won't change the buffer size.
*
* (2) https://github.com/mgravell/Pipelines.Sockets.Unofficial
* https://www.nuget.org/packages/Pipelines.Sockets.Unofficial
*
* (3) https://github.com/dahall/Vanara
* https://www.nuget.org/packages/Vanara.Pinvoke.Kernel32
*
* Oisin Grehan - 2019/4/21
*
* https://twitter.com/oising
* https://github.com/oising
*/

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Pipelines.Sockets.Unofficial;
using Vanara.PInvoke;

namespace Samples.Terminal
{
internal class Program
{
private static async Task Main(string[] args)
{
// run for 90 seconds
var timeout = TimeSpan.FromSeconds(90);

// in reality this will likely never be reached, but it is useful to guard against
// conditions where the queue isn't drained, or not drained fast enough.
const int maxNonKeyEventRetention = 128;

var source = new CancellationTokenSource(timeout);
var token = source.Token;
var handle = Kernel32.GetStdHandle(Kernel32.StdHandleType.STD_INPUT_HANDLE);

if (!Kernel32.GetConsoleMode(handle, out Kernel32.CONSOLE_INPUT_MODE mode))
throw NativeMethods.GetExceptionForWin32Error(Marshal.GetLastWin32Error());

mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_WINDOW_INPUT;
mode |= Kernel32.CONSOLE_INPUT_MODE.ENABLE_VIRTUAL_TERMINAL_INPUT;
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_ECHO_INPUT;
mode &= ~Kernel32.CONSOLE_INPUT_MODE.ENABLE_LINE_INPUT;

if (!Kernel32.SetConsoleMode(handle, mode))
throw NativeMethods.GetExceptionForLastWin32Error();

// base our provider/consumer on a bounded queue to keep memory usage under control
var events = new BlockingCollection<Kernel32.INPUT_RECORD>(
new ConcurrentBoundedQueue<Kernel32.INPUT_RECORD>(maxNonKeyEventRetention));

// Task that will consume non-key events asynchronously
var consumeEvents = Task.Run(() =>
{
Console.WriteLine("consumeEvents started");
try
{
while (!events.IsCompleted)
{
// blocking call
var record = events.Take(token);
Console.WriteLine("record: {0}",
Enum.GetName(typeof(Kernel32.EVENT_TYPE), record.EventType));
}
}
catch (OperationCanceledException)
{
// timeout
}
Console.WriteLine("consumeEvents ended");
}, token);

// Task that will watch for key events while feeding non-key events into our provider/consumer collection
var readInputAndProduceEvents = Task.Run(async () =>
{
//So, this is the key point - we cannot use the following or we lose all non-key events:
// Stream stdin = Console.OpenStandardInput();
// get a unicode character stream over console input
Stream stdin = new ReadConsoleInputStream(handle, events);
// wrap in a System.IO.Pipelines.PipeReader to get clean async and span/memory usage
var reader = StreamConnection.GetReader(stdin);
while (!token.IsCancellationRequested)
{
// blocking call
var result = await reader.ReadAsync(token);
if (result.IsCanceled)
break;
var sequence = result.Buffer;
var segment = sequence.Start;
while (sequence.TryGet(ref segment, out var mem))
{
// decode back from unicode
var datum = Encoding.Unicode.GetString(mem.Span);
Console.Write(datum);
}
reader.AdvanceTo(sequence.End);
}
}, token);

Console.WriteLine("Running");

try
{
await Task.WhenAll(consumeEvents, readInputAndProduceEvents);
}
catch (OperationCanceledException)
{
// timeout
}

Console.WriteLine("press any key...");
Console.ReadKey(true);
}
}
}
Loading

0 comments on commit 2e0fd58

Please sign in to comment.