Skip to content

Commit

Permalink
Protocol: add basic Connection tests. (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds committed May 11, 2022
1 parent 6fd8741 commit 94940ff
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 8 deletions.
14 changes: 10 additions & 4 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,21 @@ class AddProxy

class AddImplementation : IMethodHandler
{
private const string Interface = "org.example.Adder";
public string Path => "/org/example/Adder";

public async ValueTask<bool> TryHandleMethodAsync(Connection connection, Message message)
{
switch ((message.MemberAsString, message.SignatureAsString))
switch (message.InterfaceAsString)
{
case ("Add", "ii"):
Add(connection, message);
return true;
case Interface:
switch ((message.MemberAsString, message.SignatureAsString))
{
case ("Add", "ii"):
Add(connection, message);
return true;
}
break;
}

return false;
Expand Down
4 changes: 2 additions & 2 deletions src/Tmds.DBus.Protocol/AddressReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public struct AddressEntry
public static bool TryGetNextEntry(string addresses, ref AddressEntry address)
{
int offset = address.String is null ? 0 : address.Offset + address.Count + 1;
ReadOnlySpan<char> span = addresses.AsSpan().Slice(offset);
if (span.Length == 0)
if (offset >= addresses.Length - 1)
{
return false;
}
ReadOnlySpan<char> span = addresses.AsSpan().Slice(offset);
int length = span.IndexOf(';');
if (length == -1)
{
Expand Down
21 changes: 19 additions & 2 deletions src/Tmds.DBus.Protocol/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public Connection(ConnectionOptions connectionOptions)
_connectionOptions = (ClientConnectionOptions)connectionOptions;
}

// For tests.
internal void Connect(IMessageStream stream)
{
_connection = new DBusConnection(this);
_connection.Connect(stream);
_state = ConnectionState.Connected;
}

public async ValueTask ConnectAsync()
{
await ConnectCoreAsync(autoConnect: false).ConfigureAwait(false);
Expand Down Expand Up @@ -110,7 +118,7 @@ private async Task<DBusConnection> DoConnectAsync()
{
ThrowHelper.ThrowIfDisposed(_disposed, this);

if (_connection == connection)
if (_connection == connection && _state == ConnectionState.Connecting)
{
_connectingTask = null;
_connectCts = null;
Expand All @@ -128,9 +136,18 @@ private async Task<DBusConnection> DoConnectAsync()
{
Disconnect(exception, connection);

// Prefer throwing ObjectDisposedException.
ThrowHelper.ThrowIfDisposed(_disposed, this);

throw;
// Throw DisconnectedException or ConnectException.
if (exception is DisconnectedException || exception is ConnectException)
{
throw;
}
else
{
throw new ConnectException(exception.Message, exception);
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/Tmds.DBus.Protocol/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ public DBusConnection(Connection parent)
_pathHandlers = new();
}

// For tests.
internal void Connect(IMessageStream stream)
{
_messageStream = stream;

stream.ReceiveMessages(
static (Exception? exception, Message message, DBusConnection connection) =>
connection.HandleMessages(exception, message), this);

_state = ConnectionState.Connected;
}

public async ValueTask ConnectAsync(string address, string? userId, bool supportsFdPassing, CancellationToken cancellationToken)
{
_state = ConnectionState.Connecting;
Expand Down
130 changes: 130 additions & 0 deletions test/Tmds.DBus.Protocol.Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System.Threading.Tasks;
using Xunit;

namespace Tmds.DBus.Protocol.Tests
{
public class ConnectionTests
{
[Fact]
public async Task MethodAsync()
{
var connections = PairedConnection.CreatePair();
using var conn1 = connections.Item1;
using var conn2 = connections.Item2;
conn2.AddMethodHandler(new StringOperations());
var proxy = new StringOperationsProxy(conn1, "servicename");
var reply = await proxy.ConcatAsync("hello ", "world");
Assert.Equal("hello world", reply);
}

[InlineData("tcp:host=localhost,port=1")]
[InlineData("unix:path=/does/not/exist")]
[Theory]
public async Task UnreachableAddressAsync(string address)
{
using (var connection = new Connection(address))
{
await Assert.ThrowsAsync<ConnectException>(() => connection.ConnectAsync().AsTask());
}
}

static class StringOperationsConstants
{
public const string Path = "/tmds/dbus/tests/stringoperations";
public const string Concat = "Concat";
public const string Interface = "tmds.dbus.tests.StringOperations";
}

class StringOperationsProxy
{
private readonly Connection _connection;
private readonly string _peer;

public StringOperationsProxy(Connection connection, string peer)
{
_connection = connection;
_peer = peer;
}

public Task<string> ConcatAsync(string lhs, string rhs)
{
return _connection.CallMethodAsync(
CreateAddMessage(),
(Message message, object? state) =>
{
return message.GetBodyReader().ReadString();
});

MessageBuffer CreateAddMessage()
{
using var writer = _connection.GetMessageWriter();

writer.WriteMethodCallHeader(
destination: _peer,
path: StringOperationsConstants.Path,
@interface: StringOperationsConstants.Interface,
signature: "ss",
member: StringOperationsConstants.Concat);

writer.WriteString(lhs);
writer.WriteString(rhs);

return writer.CreateMessage();
}
}
}

class StringOperations : IMethodHandler
{
public string Path => "/tmds/dbus/tests/stringoperations";

public const string ConcatMember = "Concat";

public bool RunMethodHandlerSynchronously(Message message) => true;

public ValueTask<bool> TryHandleMethodAsync(Connection connection, Message message)
{
switch (message.InterfaceAsString)
{
case StringOperationsConstants.Interface:
switch ((message.MemberAsString, message.SignatureAsString))
{
case (ConcatMember, "ss"):
Concat(connection, message);
return ValueTask.FromResult(true);
}
break;
}

return ValueTask.FromResult(true);
}

private void Concat(Connection connection, Message message)
{
var reader = message.GetBodyReader();

string lhs = reader.ReadString();
string rhs = reader.ReadString();

string result = lhs + rhs;

connection.TrySendMessage(CreateResponseMessage(connection, message, result));

static MessageBuffer CreateResponseMessage(Connection connection, Message message, string result)
{
using var writer = connection.GetMessageWriter();

writer.WriteMethodReturnHeader(
replySerial: message.Serial,
destination: message.Sender,
signature: "s"
);

writer.WriteString(result);

return writer.CreateMessage();
}
}
}
}
}
97 changes: 97 additions & 0 deletions test/Tmds.DBus.Protocol.Tests/PairedConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Tmds.DBus.Protocol.Tests
{
class PairedConnection
{
public static (Connection, Connection) CreatePair()
{
var streams = PairedMessageStream.CreatePair();
var conn1 = new Connection("conn1-address");
conn1.Connect(streams.Item1);
var conn2 = new Connection("conn2-address");
conn2.Connect(streams.Item2);
return (conn1, conn2);
}
}

class PairedMessageStream : IMessageStream
{
SemaphoreSlim _readSemaphore;
ConcurrentQueue<MessageBuffer?> _readQueue;
SemaphoreSlim _writeSemaphore;
ConcurrentQueue<MessageBuffer?> _writeQueue;

public static Tuple<IMessageStream, IMessageStream> CreatePair()
{
var sem1 = new SemaphoreSlim(0);
var sem2 = new SemaphoreSlim(0);
var queue1 = new ConcurrentQueue<MessageBuffer?>();
var queue2 = new ConcurrentQueue<MessageBuffer?>();
return Tuple.Create<IMessageStream, IMessageStream>(
new PairedMessageStream(queue1, queue2, sem1, sem2),
new PairedMessageStream(queue2, queue1, sem2, sem1)
);
}

private PairedMessageStream(ConcurrentQueue<MessageBuffer?> readQueue, ConcurrentQueue<MessageBuffer?> writeQueue,
SemaphoreSlim readSemaphore, SemaphoreSlim writeSemaphore)
{
_readSemaphore = readSemaphore;
_writeSemaphore = writeSemaphore;
_writeQueue = writeQueue;
_readQueue = readQueue;
}

public async void ReceiveMessages<T>(IMessageStream.MessageReceivedHandler<T> handler, T state)
{
MessagePool pool = new();
try
{
while (true)
{
await _readSemaphore.WaitAsync();
if (_readQueue.TryDequeue(out MessageBuffer? messageBuffer))
{
if (messageBuffer is null)
{
throw new IOException("Connection closed by peer");
}
ReadOnlySequence<byte> data = messageBuffer.AsReadOnlySequence();
Message? message = Message.TryReadMessage(pool, ref data, messageBuffer.Handles);
if (message is null)
{
throw new ProtocolException("Cannot parse message.");
}
if (data.Length != 0)
{
throw new ProtocolException("Message buffer contains more than one message.");
}
handler(closeReason: null, message, state);
}
}
}
catch (Exception ex)
{
handler(closeReason: ex, null!, state);
}
}

public ValueTask<bool> TrySendMessageAsync(MessageBuffer message)
{
_writeQueue.Enqueue(message);
_writeSemaphore.Release();
return ValueTask.FromResult(true);
}

public void Close(Exception closeReason)
{
TrySendMessageAsync(null!); // Use null as EOF.
}
}
}

0 comments on commit 94940ff

Please sign in to comment.