Skip to content

Commit

Permalink
chore: comply to threading analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Aug 23, 2019
1 parent ec7ba4a commit 6fa15cc
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 64 deletions.
62 changes: 30 additions & 32 deletions src/AutoDialer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void Dispose()
/// </remarks>
public int MinConnections { get; set; } = DefaultMinConnections;

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Called when the swarm has a new peer.
/// </summary>
Expand All @@ -91,31 +92,30 @@ public void Dispose()
/// If the <see cref="MinConnections"/> is not reached, then the
/// <paramref name="peer"/> is dialed.
/// </remarks>
void OnPeerDiscovered(object sender, Peer peer)
async void OnPeerDiscovered(object sender, Peer peer)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
var n = swarm.Manager.Connections.Count() + pendingConnects;
if (swarm.IsRunning && n < MinConnections)
{
Interlocked.Increment(ref pendingConnects);
Task.Run(async () =>
log.Debug($"Dialing new {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch(Exception)
{
log.Debug($"Dialing new {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch(Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
});
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
}
}

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Called when the swarm has lost a connection to a peer.
/// </summary>
Expand All @@ -129,7 +129,8 @@ void OnPeerDiscovered(object sender, Peer peer)
/// If the <see cref="MinConnections"/> is not reached, then another
/// peer is dialed.
/// </remarks>
void OnPeerDisconnected(object sender, Peer disconnectedPeer)
async void OnPeerDisconnected(object sender, Peer disconnectedPeer)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
var n = swarm.Manager.Connections.Count() + pendingConnects;
if (!swarm.IsRunning || n >= MinConnections)
Expand All @@ -146,22 +147,19 @@ void OnPeerDisconnected(object sender, Peer disconnectedPeer)
var peer = peers[rng.Next(peers.Count())];

Interlocked.Increment(ref pendingConnects);
Task.Run(async () =>
log.Debug($"Dialing {peer}");
try
{
log.Debug($"Dialing {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch (Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
});
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch (Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
}

}
Expand Down
4 changes: 4 additions & 0 deletions src/Multiplex/Substream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public void NoMoreData()
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
#pragma warning disable VSTHRD002
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down Expand Up @@ -164,7 +166,9 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
/// <inheritdoc />
public override void Flush()
{
#pragma warning disable VSTHRD002
FlushAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down
8 changes: 4 additions & 4 deletions src/PeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ public async Task InitiateAsync(
Initiator = true,
Connection = this
};
muxer.SubstreamCreated += (s, e) => ReadMessages(e, CancellationToken.None);
muxer.SubstreamCreated += (s, e) => _ = ReadMessagesAsync(e, CancellationToken.None);
this.MuxerEstablished.SetResult(muxer);

var _ = muxer.ProcessRequestsAsync();
_ = muxer.ProcessRequestsAsync();
}

/// <summary>
Expand Down Expand Up @@ -267,7 +267,7 @@ public Task EstablishProtocolAsync(string name, CancellationToken cancel)
/// <summary>
/// Starts reading messages from the remote peer.
/// </summary>
public async void ReadMessages(CancellationToken cancel)
public async Task ReadMessagesAsync(CancellationToken cancel)
{
log.Debug($"start reading messsages from {RemoteAddress}");

Expand Down Expand Up @@ -310,7 +310,7 @@ public async void ReadMessages(CancellationToken cancel)
/// <summary>
/// Starts reading messages from the remote peer on the specified stream.
/// </summary>
public async void ReadMessages(Stream stream, CancellationToken cancel)
public async Task ReadMessagesAsync(Stream stream, CancellationToken cancel)
{
IPeerProtocol protocol = new Multistream1();
try
Expand Down
10 changes: 3 additions & 7 deletions src/PeerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ public Task StartAsync()
Swarm.ConnectionEstablished += Swarm_ConnectionEstablished;
Swarm.PeerNotReachable += Swarm_PeerNotReachable;

var thread = new Thread(Phoenix)
{
IsBackground = true
};
cancel = new CancellationTokenSource();
thread.Start();
var _ = PhoenixAsync(cancel.Token);

log.Debug("started");
return Task.CompletedTask;
Expand Down Expand Up @@ -145,9 +141,9 @@ void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
/// <summary>
/// Background process to try reconnecting to a dead peer.
/// </summary>
async void Phoenix()
async Task PhoenixAsync(CancellationToken cancellation)
{
while (!cancel.IsCancellationRequested)
while (!cancellation.IsCancellationRequested)
{
try
{
Expand Down
8 changes: 5 additions & 3 deletions src/PeerTalk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
<PackageReference Include="semver" Version="2.0.4" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.8.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="16.3.13" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard14'">
<PackageReference Include="System.Net.NameResolution" Version="4.3.0.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Protocols/Identify1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public override string ToString()
/// </param>
/// <param name="cancel"></param>
/// <returns></returns>
public async Task<Peer> GetRemotePeer(PeerConnection connection, CancellationToken cancel)
public async Task<Peer> GetRemotePeerAsync(PeerConnection connection, CancellationToken cancel)
{
var muxer = await connection.MuxerEstablished.Task.ConfigureAwait(false);
log.Debug("Get remote identity");
Expand Down
2 changes: 1 addition & 1 deletion src/Protocols/Mplex67.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public override string ToString()
Connection = connection,
Receiver = true
};
muxer.SubstreamCreated += (s, e) => connection.ReadMessages(e, CancellationToken.None);
muxer.SubstreamCreated += (s, e) => _ = connection.ReadMessagesAsync(e, CancellationToken.None);

// Attach muxer to the connection. It now becomes the message reader.
connection.MuxerEstablished.SetResult(muxer);
Expand Down
2 changes: 2 additions & 0 deletions src/PubSub/FloodRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel)
}
}

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Raised when a connection is established to a remote peer.
/// </summary>
Expand All @@ -250,6 +251,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel)
/// all topics that are of interest to the local peer.
/// </remarks>
async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
if (localTopics.Count == 0)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/PubSub/NotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void Router_MessageReceived(object sender, PublishedMessage msg)
}

// Tell other message routers.
Task.WhenAll(Routers
_ = Task.WhenAll(Routers
.Where(r => r != sender)
.Select(r => r.PublishAsync(msg, CancellationToken.None))
);
Expand Down
4 changes: 2 additions & 2 deletions src/Routing/Dht1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public async Task<IEnumerable<Peer>> FindProvidersAsync(
/// </remarks>
public void Advertise(Cid cid)
{
Task.Run(async () =>
_ = Task.Run(async () =>
{
int advertsNeeded = 4;
var message = new DhtMessage
Expand Down Expand Up @@ -278,7 +278,7 @@ public void Advertise(Cid cid)
using (var stream = await Swarm.DialAsync(peer, this.ToString()))
{
ProtoBuf.Serializer.SerializeWithLengthPrefix(stream, message, PrefixStyle.Base128);
stream.Flush();
await stream.FlushAsync();
}
if (--advertsNeeded == 0)
break;
Expand Down
1 change: 1 addition & 0 deletions src/SecureCommunication/Secio1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public override string ToString()
await EncryptAsync(connection, cancel).ConfigureAwait(false);
}

#pragma warning disable VSTHRD103
/// <inheritdoc />
public async Task<Stream> EncryptAsync(PeerConnection connection, CancellationToken cancel = default(CancellationToken))
{
Expand Down
8 changes: 6 additions & 2 deletions src/SecureCommunication/Secio1Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public override void SetLength(long value)
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
#pragma warning disable VSTHRD002
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down Expand Up @@ -202,7 +204,9 @@ async Task<byte[]> ReadPacketBytesAsync(int count, CancellationToken cancel)
/// <inheritdoc />
public override void Flush()
{
#pragma warning disable VSTHRD002
FlushAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand All @@ -225,8 +229,8 @@ public override async Task FlushAsync(CancellationToken cancel)
stream.WriteByte((byte)(length >> 16));
stream.WriteByte((byte)(length >> 8));
stream.WriteByte((byte)(length));
stream.Write(data, 0, data.Length);
stream.Write(mac, 0, mac.Length);
await stream.WriteAsync(data, 0, data.Length);
await stream.WriteAsync(mac, 0, mac.Length);
await stream.FlushAsync(cancel).ConfigureAwait(false);

outStream.SetLength(0);
Expand Down
2 changes: 1 addition & 1 deletion src/StatsStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class StatsStream : Stream

static StatsStream()
{
Task.Run(async () =>
_ = Task.Run(async () =>
{
while (true)
{
Expand Down
12 changes: 7 additions & 5 deletions src/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public async Task StopAsync()
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel))
{
return await pendingConnections
.GetOrAdd(peer, (key) => new AsyncLazy<PeerConnection>(() => Dial(peer, peer.Addresses, cts.Token)))
.GetOrAdd(peer, (key) => new AsyncLazy<PeerConnection>(() => DialAsync(peer, peer.Addresses, cts.Token)))
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -569,7 +569,7 @@ public async Task StopAsync()
/// <param name="addrs"></param>
/// <param name="cancel"></param>
/// <returns></returns>
async Task<PeerConnection> Dial(Peer remote, IEnumerable<MultiAddress> addrs, CancellationToken cancel)
async Task<PeerConnection> DialAsync(Peer remote, IEnumerable<MultiAddress> addrs, CancellationToken cancel)
{
log.Debug($"Dialing {remote}");

Expand Down Expand Up @@ -638,7 +638,7 @@ async Task<PeerConnection> Dial(Peer remote, IEnumerable<MultiAddress> addrs, Ca
{
identify = protocols.OfType<Identify1>().First();
}
await identify.GetRemotePeer(connection, cancel).ConfigureAwait(false);
await identify.GetRemotePeerAsync(connection, cancel).ConfigureAwait(false);
}
catch (Exception)
{
Expand Down Expand Up @@ -843,6 +843,7 @@ public Task<MultiAddress> StartListeningAsync(MultiAddress address)
return Task.FromResult(addresses.First());
}

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Called when a remote peer is connecting to the local peer.
/// </summary>
Expand All @@ -860,6 +861,7 @@ public Task<MultiAddress> StartListeningAsync(MultiAddress address)
/// logged as warning.
/// </remarks>
async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remote)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
if (!IsRunning)
{
Expand Down Expand Up @@ -920,7 +922,7 @@ async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remot

// Start the handshake
// TODO: Isn't connection cancel token required.
connection.ReadMessages(default(CancellationToken));
_ = connection.ReadMessagesAsync(default(CancellationToken));

// Wait for security to be established.
await connection.SecurityEstablished.Task.ConfigureAwait(false);
Expand All @@ -935,7 +937,7 @@ async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remot
{
identify = protocols.OfType<Identify1>().First();
}
connection.RemotePeer = await identify.GetRemotePeer(connection, default(CancellationToken)).ConfigureAwait(false);
connection.RemotePeer = await identify.GetRemotePeerAsync(connection, default(CancellationToken)).ConfigureAwait(false);

connection.RemotePeer = RegisterPeer(connection.RemotePeer);
connection.RemoteAddress = new MultiAddress($"{remote}/ipfs/{connection.RemotePeer.Id}");
Expand Down
2 changes: 1 addition & 1 deletion src/TaskHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static async Task<T> WhenAnyResult<T>(
var winner = await Task.WhenAny(running).ConfigureAwait(false);
if (!winner.IsCanceled && !winner.IsFaulted)
{
return winner.Result;
return await winner;
}
if (winner.IsFaulted)
{
Expand Down
Loading

0 comments on commit 6fa15cc

Please sign in to comment.