Skip to content

Commit

Permalink
Async fixups (#118)
Browse files Browse the repository at this point in the history
* fix: avoid potential deadlocks

* fix: use an async method when available

* fix: always use 'Async' suffix for an async method name

* fix: always use 'Async' suffix for an async method name
  • Loading branch information
richardschneider committed Jun 14, 2019
1 parent 4bea391 commit 3fa73f8
Show file tree
Hide file tree
Showing 23 changed files with 82 additions and 76 deletions.
4 changes: 3 additions & 1 deletion src/BlockExchange/Bitswap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public Task StartAsync()

// When a connection is established
// (1) Send the local peer's want list to the remote
#pragma warning disable VSTHRD100 // Avoid async void methods
async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
try
{
Expand Down Expand Up @@ -206,7 +208,7 @@ public IEnumerable<Cid> PeerWants(MultiHash peer)
/// <see cref="Unwant"/> method will also cancel the operation.
/// </para>
/// </remarks>
public Task<IDataBlock> Want(Cid id, MultiHash peer, CancellationToken cancel)
public Task<IDataBlock> WantAsync(Cid id, MultiHash peer, CancellationToken cancel)
{
log.Trace($"{peer} wants {id}");

Expand Down
2 changes: 1 addition & 1 deletion src/BlockExchange/Bitswap1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
}
else
{
block = await Bitswap.Want(cid, remotePeer.Id, cancel).ConfigureAwait(false);
block = await Bitswap.WantAsync(cid, remotePeer.Id, cancel).ConfigureAwait(false);
}

// Send block to remote.
Expand Down
2 changes: 1 addition & 1 deletion src/BlockExchange/Bitswap11.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
}
else
{
block = await Bitswap.Want(cid, remotePeer.Id, cancel).ConfigureAwait(false);
block = await Bitswap.WantAsync(cid, remotePeer.Id, cancel).ConfigureAwait(false);
}

// Send block to remote.
Expand Down
2 changes: 1 addition & 1 deletion src/CoreApi/BitswapApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task<IDataBlock> GetAsync(Cid id, CancellationToken cancel = defaul
{
var bs = await ipfs.BitswapService.ConfigureAwait(false);
var peer = await ipfs.LocalPeer.ConfigureAwait(false);
return await bs.Want(id, peer.Id, cancel).ConfigureAwait(false);
return await bs.WantAsync(id, peer.Id, cancel).ConfigureAwait(false);
}

public async Task UnwantAsync(Cid id, CancellationToken cancel = default(CancellationToken))
Expand Down
4 changes: 2 additions & 2 deletions src/CoreApi/BlockApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public async Task<IDataBlock> GetAsync(Cid id, CancellationToken cancel = defaul
id: id,
limit: 20, // TODO: remove this
cancel: queryCancel.Token,
action: (peer) => { var __ = ProviderFound(peer, queryCancel.Token).ConfigureAwait(false); }
action: (peer) => { var __ = ProviderFoundAsync(peer, queryCancel.Token).ConfigureAwait(false); }
);

var got = await bitswapGet;
Expand All @@ -138,7 +138,7 @@ public async Task<IDataBlock> GetAsync(Cid id, CancellationToken cancel = defaul
}
}

async Task ProviderFound(Peer peer, CancellationToken cancel)
async Task ProviderFoundAsync(Peer peer, CancellationToken cancel)
{
if (cancel.IsCancellationRequested)
return;
Expand Down
4 changes: 2 additions & 2 deletions src/CoreApi/DagApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Ipfs.Engine.CoreApi
{
class DagApi : IDagApi
{
static PODOptions podOptions = new PODOptions
static readonly PODOptions podOptions = new PODOptions
(
removeIsPrefix: false,
useCamelCase: false
Expand Down Expand Up @@ -96,7 +96,7 @@ public DagApi(IpfsEngine ipfs)
using (var sw = new StreamWriter(ms))
using (var writer = new JsonTextWriter(sw))
{
data.WriteTo(writer);
await data.WriteToAsync(writer);
writer.Flush();
ms.Position = 0;
var format = GetDataFormat(contentType);
Expand Down
6 changes: 3 additions & 3 deletions src/CoreApi/DnsApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public async Task<string> ResolveAsync(string name, bool recursive = false, Canc
{
var attempts = new Task<string>[]
{
Find(name, cts.Token),
Find("_dnslink." + name, cts.Token)
FindAsync(name, cts.Token),
FindAsync("_dnslink." + name, cts.Token)
};
link = await TaskHelper.WhenAnyResult(attempts, cancel).ConfigureAwait(false);
cts.Cancel();
Expand All @@ -52,7 +52,7 @@ public async Task<string> ResolveAsync(string name, bool recursive = false, Canc
throw new NotSupportedException($"Cannot resolve '{link}'.");
}

async Task<string> Find(string name, CancellationToken cancel)
async Task<string> FindAsync(string name, CancellationToken cancel)
{
var response = await ipfs.Options.Dns.QueryAsync(name, DnsType.TXT, cancel).ConfigureAwait(false);
var link = response.Answers
Expand Down
8 changes: 4 additions & 4 deletions src/CoreApi/FileSystemApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public FileSystemApi(IpfsEngine ipfs)
if (options.Trickle) throw new NotImplementedException("Trickle");

var blockService = GetBlockService(options);
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);

var chunker = new SizeChunker();
var nodes = await chunker.ChunkAsync(stream, name, options, blockService, keyChain, cancel).ConfigureAwait(false);
Expand Down Expand Up @@ -255,8 +255,8 @@ public async Task<string> ReadAllTextAsync(string path, CancellationToken cancel
public async Task<Stream> ReadFileAsync(string path, CancellationToken cancel = default(CancellationToken))
{
var cid = await ipfs.ResolveIpfsPathToCidAsync(path, cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
return await FileSystem.CreateReadStream(cid, ipfs.Block, keyChain, cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await FileSystem.CreateReadStreamAsync(cid, ipfs.Block, keyChain, cancel).ConfigureAwait(false);
}

public async Task<Stream> ReadFileAsync(string path, long offset, long count = 0, CancellationToken cancel = default(CancellationToken))
Expand Down Expand Up @@ -317,7 +317,7 @@ async Task AddTarNodeAsync(Cid cid, string name, TarOutputStream tar, Cancellati
header.TypeFlag = TarHeader.LF_NORMAL;
header.Size = content.Length;
tar.PutNextEntry(entry);
content.CopyTo(tar);
await content.CopyToAsync(tar);
tar.CloseEntry();
}

Expand Down
12 changes: 6 additions & 6 deletions src/CoreApi/KeyApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,37 @@ public KeyApi(IpfsEngine ipfs)

public async Task<IKey> CreateAsync(string name, string keyType, int size, CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.CreateAsync(name, keyType, size, cancel).ConfigureAwait(false);
}

public async Task<string> ExportAsync(string name, char[] password, CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.ExportAsync(name, password, cancel).ConfigureAwait(false);
}

public async Task<IKey> ImportAsync(string name, string pem, char[] password = null, CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.ImportAsync(name, pem, password, cancel).ConfigureAwait(false);
}

public async Task<IEnumerable<IKey>> ListAsync(CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.ListAsync(cancel).ConfigureAwait(false);
}

public async Task<IKey> RemoveAsync(string name, CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.RemoveAsync(name, cancel).ConfigureAwait(false);
}

public async Task<IKey> RenameAsync(string oldName, string newName, CancellationToken cancel = default(CancellationToken))
{
var keyChain = await ipfs.KeyChain(cancel).ConfigureAwait(false);
var keyChain = await ipfs.KeyChainAsync(cancel).ConfigureAwait(false);
return await keyChain.RenameAsync(oldName, newName, cancel).ConfigureAwait(false);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Cryptography/Cms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public partial class KeyChain
/// is used to digitally sign, digest, authenticate, and/or encrypt
/// arbitrary message content.
/// </remarks>
public async Task<byte[]> CreateProtectedData(
public async Task<byte[]> CreateProtectedDataAsync(
string keyName,
byte[] plainText,
CancellationToken cancel = default(CancellationToken))
Expand Down Expand Up @@ -108,7 +108,7 @@ public partial class KeyChain
/// is used to digitally sign, digest, authenticate, and/or encrypt
/// arbitrary message content.
/// </remarks>
public async Task<byte[]> ReadProtectedData(
public async Task<byte[]> ReadProtectedDataAsync(
byte[] cipherText,
CancellationToken cancel = default(CancellationToken))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Cryptography/KeyChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ async Task<IKey> AddPrivateKeyAsync(string name, AsymmetricCipherKeyPair keyPair
};
var pw = new PemWriter(sw);
pw.WriteObject(pkcs8);
pw.Writer.Flush();
await pw.Writer.FlushAsync();
pem = sw.ToString();
}

Expand Down
9 changes: 6 additions & 3 deletions src/IpfsEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void Init()
LocalPeer = new AsyncLazy<Peer>(async () =>
{
log.Debug("Building local peer");
var keyChain = await KeyChain().ConfigureAwait(false);
var keyChain = await KeyChainAsync().ConfigureAwait(false);
log.Debug("Getting key info about self");
var self = await keyChain.FindKeyByNameAsync("self").ConfigureAwait(false);
var localPeer = new Peer
Expand Down Expand Up @@ -151,7 +151,7 @@ void Init()
}
}
var peer = await LocalPeer.ConfigureAwait(false);
var keyChain = await KeyChain().ConfigureAwait(false);
var keyChain = await KeyChainAsync().ConfigureAwait(false);
var self = await keyChain.GetPrivateKeyAsync("self").ConfigureAwait(false);
var swarm = new Swarm
{
Expand Down Expand Up @@ -271,8 +271,9 @@ void Init()
/// A task that represents the asynchronous operation. The task's result is
/// the <see cref="KeyChain"/>.
/// </returns>
public async Task<KeyChain> KeyChain(CancellationToken cancel = default(CancellationToken))
public async Task<KeyChain> KeyChainAsync(CancellationToken cancel = default(CancellationToken))
{
// TODO: this should be a LazyAsync property.
if (keyChain == null)
{
lock (this)
Expand Down Expand Up @@ -597,6 +598,7 @@ public void Stop()
/// </summary>
public AsyncLazy<PeerTalk.Routing.Dht1> DhtService { get; private set; }

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Fired when a peer is discovered.
/// </summary>
Expand All @@ -606,6 +608,7 @@ public void Stop()
/// Registers the peer with the <see cref="SwarmService"/>.
/// </remarks>
async void OnPeerDiscovered(object sender, Peer peer)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
try
{
Expand Down
1 change: 1 addition & 0 deletions src/IpfsEngine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="16.0.102" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard14'">
Expand Down
2 changes: 1 addition & 1 deletion src/RandomWalk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void RunQuery()
using (var timeout = new CancellationTokenSource(QueryTime))
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(timeout.Token, cancel.Token))
{
var _ = Dht.FindPeerAsync(id, cts.Token).Result;
var _ = Dht.FindPeerAsync(id, cts.Token).ConfigureAwait(false).GetAwaiter().GetResult();
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/UnixFileSystem/ChunkedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public override void Write(byte[] buffer, int offset, int count)
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
return ReadAsync(buffer, offset, count).ConfigureAwait(false).GetAwaiter().GetResult();
}

/// <inheritdoc />
Expand All @@ -129,12 +129,12 @@ async Task<ArraySegment<byte>> GetBlockAsync (long position, CancellationToken c
var need = blocks.Last(b => b.Position <= position);
if (need != currentBlock)
{
var stream = await FileSystem.CreateReadStream(need.Id, BlockService, KeyChain, cancel).ConfigureAwait(false);
var stream = await FileSystem.CreateReadStreamAsync(need.Id, BlockService, KeyChain, cancel).ConfigureAwait(false);
currentBlock = need;
currentData = new byte[stream.Length];
for (int i = 0, n; i < stream.Length; i += n)
{
n = stream.Read(currentData, i, (int) stream.Length - i);
n = await stream.ReadAsync(currentData, i, (int) stream.Length - i);
}
}
int offset = (int)(position - currentBlock.Position);
Expand Down
4 changes: 2 additions & 2 deletions src/UnixFileSystem/FileSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static class FileSystem
/// The id's <see cref="Cid.ContentType"/> is used to determine how to read
/// the conent.
/// </remarks>
public static Task<Stream> CreateReadStream(
public static Task<Stream> CreateReadStreamAsync(
Cid id,
IBlockApi blockService,
KeyChain keyChain,
Expand Down Expand Up @@ -104,7 +104,7 @@ public static class FileSystem
CancellationToken cancel)
{
var block = await blockService.GetAsync(id, cancel).ConfigureAwait(false);
var plain = await keyChain.ReadProtectedData(block.DataBytes, cancel).ConfigureAwait(false);
var plain = await keyChain.ReadProtectedDataAsync(block.DataBytes, cancel).ConfigureAwait(false);
return new MemoryStream(plain, false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/UnixFileSystem/SizeChunker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class SizeChunker
// TODO: Inefficent to copy chunk, use ArraySegment in DataMessage.Data
var plain = new byte[length];
Array.Copy(chunk, plain, length);
var cipher = await keyChain.CreateProtectedData(options.ProtectionKey, plain, cancel).ConfigureAwait(false);
var cipher = await keyChain.CreateProtectedDataAsync(options.ProtectionKey, plain, cancel).ConfigureAwait(false);
var cid = await blockService.PutAsync(
data: cipher,
contentType: "cms",
Expand Down
20 changes: 10 additions & 10 deletions test/BlockExchange/BitswapTest .cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void WantList()

var cid = new DagNode(Encoding.UTF8.GetBytes("BitswapTest unknown block")).Id;
var cancel = new CancellationTokenSource();
var task = bitswap.Want(cid, self.Id, cancel.Token);
var task = bitswap.WantAsync(cid, self.Id, cancel.Token);
CollectionAssert.Contains(bitswap.PeerWants(self.Id).ToArray(), cid);

bitswap.Unwant(cid);
Expand All @@ -39,7 +39,7 @@ public void Want_Cancel()
var bitswap = new Bitswap { Swarm = new Swarm { LocalPeer = self } };
var cid = new DagNode(Encoding.UTF8.GetBytes("BitswapTest unknown block")).Id;
var cancel = new CancellationTokenSource();
var task = bitswap.Want(cid, self.Id, cancel.Token);
var task = bitswap.WantAsync(cid, self.Id, cancel.Token);
CollectionAssert.Contains(bitswap.PeerWants(self.Id).ToArray(), cid);

cancel.Cancel();
Expand All @@ -62,10 +62,10 @@ public void Block_Needed()
};
try
{
bitswap.Want(cid1, self.Id, cancel.Token);
bitswap.Want(cid1, self.Id, cancel.Token);
bitswap.Want(cid2, self.Id, cancel.Token);
bitswap.Want(cid2, self.Id, cancel.Token);
bitswap.WantAsync(cid1, self.Id, cancel.Token);
bitswap.WantAsync(cid1, self.Id, cancel.Token);
bitswap.WantAsync(cid2, self.Id, cancel.Token);
bitswap.WantAsync(cid2, self.Id, cancel.Token);
Assert.AreEqual(2, callCount);
}
finally
Expand All @@ -80,7 +80,7 @@ public void Want_Unwant()
var bitswap = new Bitswap { Swarm = new Swarm { LocalPeer = self } };
var cid = new DagNode(Encoding.UTF8.GetBytes("BitswapTest unknown block")).Id;
var cancel = new CancellationTokenSource();
var task = bitswap.Want(cid, self.Id, cancel.Token);
var task = bitswap.WantAsync(cid, self.Id, cancel.Token);
CollectionAssert.Contains(bitswap.PeerWants(self.Id).ToArray(), cid);

bitswap.Unwant(cid);
Expand All @@ -97,7 +97,7 @@ public void Found()
var a = new DagNode(Encoding.UTF8.GetBytes("BitswapTest found block a"));
var b = new DagNode(Encoding.UTF8.GetBytes("BitswapTest found block b"));
var cancel = new CancellationTokenSource();
var task = bitswap.Want(a.Id, self.Id, cancel.Token);
var task = bitswap.WantAsync(a.Id, self.Id, cancel.Token);
Assert.IsFalse(task.IsCompleted);
CollectionAssert.Contains(bitswap.PeerWants(self.Id).ToArray(), a.Id);

Expand All @@ -120,8 +120,8 @@ public void Found_Count()
Assert.AreEqual(0, bitswap.Found(a));

var cancel = new CancellationTokenSource();
var task1 = bitswap.Want(a.Id, self.Id, cancel.Token);
var task2 = bitswap.Want(a.Id, self.Id, cancel.Token);
var task1 = bitswap.WantAsync(a.Id, self.Id, cancel.Token);
var task2 = bitswap.WantAsync(a.Id, self.Id, cancel.Token);
Assert.AreEqual(2, bitswap.Found(a));

Assert.IsTrue(task1.IsCompleted);
Expand Down

0 comments on commit 3fa73f8

Please sign in to comment.