Skip to content

Commit

Permalink
Protocol: add API to control whether AddMatch emits exceptions on dis…
Browse files Browse the repository at this point in the history
…pose. (#229)

* Protocol: add API to control whether AddMatch emits exceptions on dispose.

* Rename AddMatchFlags to ObserverFlags.

* Rename MatchActionException to ActionException.
  • Loading branch information
tmds committed Jan 31, 2024
1 parent 1999cdf commit a4c5f3d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Tmds.DBus.Protocol;

public static class MatchActionException
public static class ActionException
{
// Exception used when the IDisposable returned by AddMatchAsync gets disposed.
public static bool IsObserverDisposed(Exception exception)
Expand Down
11 changes: 8 additions & 3 deletions src/Tmds.DBus.Protocol/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,16 @@ public async Task<T> CallMethodAsync<T>(MessageBuffer message, MessageValueReade
return await connection.CallMethodAsync(message, reader, readerState).ConfigureAwait(false);
}

public async ValueTask<IDisposable> AddMatchAsync<T>(MatchRule rule, MessageValueReader<T> reader, Action<Exception?, T, object?, object?> handler, object? readerState = null, object? handlerState = null, bool emitOnCapturedContext = true, bool subscribe = true)
public ValueTask<IDisposable> AddMatchAsync<T>(MatchRule rule, MessageValueReader<T> reader, Action<Exception?, T, object?, object?> handler, object? readerState = null, object? handlerState = null, bool emitOnCapturedContext = true, bool subscribe = true)
=> AddMatchAsync(rule, reader, handler, readerState, handlerState, emitOnCapturedContext, ObserverFlags.EmitOnConnectionDispose | ObserverFlags.EmitOnObserverDispose | (!subscribe ? ObserverFlags.NoSubscribe : default));

public ValueTask<IDisposable> AddMatchAsync<T>(MatchRule rule, MessageValueReader<T> reader, Action<Exception?, T, object?, object?> handler, object? readerState, object? handlerState, bool emitOnCapturedContext, ObserverFlags flags)
=> AddMatchAsync(rule, reader, handler, readerState, handlerState, emitOnCapturedContext ? SynchronizationContext.Current : null, flags);

public async ValueTask<IDisposable> AddMatchAsync<T>(MatchRule rule, MessageValueReader<T> reader, Action<Exception?, T, object?, object?> handler, object? readerState , object? handlerState, SynchronizationContext? synchronizationContext, ObserverFlags flags)
{
SynchronizationContext? synchronizationContext = emitOnCapturedContext ? SynchronizationContext.Current : null;
DBusConnection connection = await ConnectCoreAsync().ConfigureAwait(false);
return await connection.AddMatchAsync(synchronizationContext, rule, reader, handler, readerState, handlerState, subscribe).ConfigureAwait(false);
return await connection.AddMatchAsync(synchronizationContext, rule, reader, handler, readerState, handlerState, flags).ConfigureAwait(false);
}

public void AddMethodHandler(IMethodHandler methodHandler)
Expand Down
29 changes: 19 additions & 10 deletions src/Tmds.DBus.Protocol/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ public void Dispose()
{
foreach (var observer in matchMaker.Observers)
{
observer.Dispose(new DisconnectedException(disconnectReason), removeObserver: false);
bool emitException = !object.ReferenceEquals(disconnectReason, Connection.DisposedException) ||
observer.EmitOnConnectionDispose;
Exception? exception = emitException ? new DisconnectedException(disconnectReason) : null;
observer.Dispose(exception, removeObserver: false);
}
}
_matchMakers.Clear();
Expand Down Expand Up @@ -746,7 +749,7 @@ MessageBuffer CreateMessage(IEnumerable<string>? rules)
}
}

public ValueTask<IDisposable> AddMatchAsync<T>(SynchronizationContext? synchronizationContext, MatchRule rule, MessageValueReader<T> valueReader, Action<Exception?, T, object?, object?> valueHandler, object? readerState, object? handlerState, bool subscribe)
public ValueTask<IDisposable> AddMatchAsync<T>(SynchronizationContext? synchronizationContext, MatchRule rule, MessageValueReader<T> valueReader, Action<Exception?, T, object?, object?> valueHandler, object? readerState, object? handlerState, ObserverFlags flags)
{
MessageHandlerDelegate4 fn = static (Exception? exception, Message message, object? reader, object? handler, object? rs, object? hs) =>
{
Expand All @@ -763,16 +766,17 @@ public ValueTask<IDisposable> AddMatchAsync<T>(SynchronizationContext? synchroni
}
};

return AddMatchAsync(synchronizationContext, rule, new(fn, valueReader, valueHandler, readerState, handlerState), subscribe);
return AddMatchAsync(synchronizationContext, rule, new(fn, valueReader, valueHandler, readerState, handlerState), flags);
}

private async ValueTask<IDisposable> AddMatchAsync(SynchronizationContext? synchronizationContext, MatchRule rule, MessageHandler4 handler, bool subscribe)
private async ValueTask<IDisposable> AddMatchAsync(SynchronizationContext? synchronizationContext, MatchRule rule, MessageHandler4 handler, ObserverFlags flags)
{
MatchRuleData data = rule.Data;
MatchMaker? matchMaker;
string ruleString;
Observer observer;
MessageBuffer? addMatchMessage = null;
bool subscribe;

lock (_gate)
{
Expand All @@ -782,7 +786,7 @@ private async ValueTask<IDisposable> AddMatchAsync(SynchronizationContext? synch
}
if (!RemoteIsBus)
{
subscribe = false;
flags |= ObserverFlags.NoSubscribe;
}
if (_isMonitor)
{
Expand All @@ -797,9 +801,10 @@ private async ValueTask<IDisposable> AddMatchAsync(SynchronizationContext? synch
_matchMakers.Add(ruleString, matchMaker);
}

observer = new Observer(synchronizationContext, matchMaker, handler, subscribe);
observer = new Observer(synchronizationContext, matchMaker, handler, flags);
matchMaker.Observers.Add(observer);

subscribe = observer.Subscribes;
bool sendMessage = subscribe && matchMaker.AddMatchTcs is null;
if (sendMessage)
{
Expand Down Expand Up @@ -869,19 +874,23 @@ sealed class Observer : IDisposable
private readonly SynchronizationContext? _synchronizationContext;
private readonly MatchMaker _matchMaker;
private readonly MessageHandler4 _messageHandler;
private readonly ObserverFlags _flags;
private bool _disposed;

public bool Subscribes { get; }
public bool Subscribes => (_flags & ObserverFlags.NoSubscribe) == 0;
public bool EmitOnConnectionDispose => (_flags & ObserverFlags.EmitOnConnectionDispose) != 0;
public bool EmitOnObserverDispose => (_flags & ObserverFlags.EmitOnObserverDispose) != 0;

public Observer(SynchronizationContext? synchronizationContext, MatchMaker matchMaker, in MessageHandler4 messageHandler, bool subscribes)
public Observer(SynchronizationContext? synchronizationContext, MatchMaker matchMaker, in MessageHandler4 messageHandler, ObserverFlags flags)
{
_synchronizationContext = synchronizationContext;
_matchMaker = matchMaker;
_messageHandler = messageHandler;
Subscribes = subscribes;
_flags = flags;
}

public void Dispose() => Dispose(ObserverDisposedException);
public void Dispose() =>
Dispose(EmitOnObserverDispose ? ObserverDisposedException : null);

public void Dispose(Exception? exception, bool removeObserver = true)
{
Expand Down
10 changes: 10 additions & 0 deletions src/Tmds.DBus.Protocol/ObserverFlags.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Tmds.DBus.Protocol;

[Flags]
public enum ObserverFlags
{
None = 0,
EmitOnConnectionDispose = 1,
EmitOnObserverDispose = 2,
NoSubscribe = 4
}
66 changes: 62 additions & 4 deletions test/Tmds.DBus.Protocol.Tests/ExceptionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public async Task ObserverDisposed()
disposable.Dispose();

Exception ex = await tcs.Task;
Assert.True(MatchActionException.IsObserverDisposed(ex));
Assert.True(MatchActionException.IsDisposed(ex));
Assert.True(ActionException.IsObserverDisposed(ex));
Assert.True(ActionException.IsDisposed(ex));
}

[Fact]
Expand All @@ -46,8 +46,66 @@ public async Task ConnectionDisposed()
conn1.Dispose();

Exception ex = await tcs.Task;
Assert.True(MatchActionException.IsConnectionDisposed(ex));
Assert.True(MatchActionException.IsDisposed(ex));
Assert.True(ActionException.IsConnectionDisposed(ex));
Assert.True(ActionException.IsDisposed(ex));
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task CanOptOutObserverDisposedEmit(bool optIn)
{
var connections = PairedConnection.CreatePair();
using var conn1 = connections.Item1;
using var conn2 = connections.Item2;

Exception? exception = null;
var disposable = await conn1.AddMatchAsync(
new MatchRule(), (Message message, object? state) => "", (Exception? ex, string s, object? s1, object? s2) =>
{
exception ??= ex;
}, null, null, synchronizationContext: null, optIn ? ObserverFlags.EmitOnObserverDispose : ObserverFlags.None);

disposable.Dispose();

if (optIn)
{
Assert.NotNull(exception);
}
else
{
Assert.Null(exception);
}
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task CanOptOutConnectionDisposedEmit(bool optIn)
{
var connections = PairedConnection.CreatePair();
using var conn1 = connections.Item1;
using var conn2 = connections.Item2;

Exception? exception = null;
TaskCompletionSource<Exception> tcs = new();

var disposable = await conn1.AddMatchAsync(
new MatchRule(), (Message message, object? state) => "", (Exception? ex, string s, object? s1, object? s2) =>
{
exception ??= ex;
}, null, null, synchronizationContext: null, optIn ? ObserverFlags.EmitOnConnectionDispose : ObserverFlags.None);

conn1.Dispose();

if (optIn)
{
Assert.NotNull(exception);
}
else
{
Assert.Null(exception);
}
}
}
}

0 comments on commit a4c5f3d

Please sign in to comment.