Skip to content

Commit

Permalink
feat(BitswapApi): support peer ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Jun 17, 2019
1 parent ab20edf commit 305de1b
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 26 deletions.
163 changes: 157 additions & 6 deletions src/BlockExchange/Bitswap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,29 @@ public class Bitswap : IService
static ILog log = LogManager.GetLogger(typeof(Bitswap));

ConcurrentDictionary<Cid, WantedBlock> wants = new ConcurrentDictionary<Cid, WantedBlock>();
ConcurrentDictionary<Peer, BitswapLedger> peerLedgers = new ConcurrentDictionary<Peer, BitswapLedger>();

IBitswapProtocol[] protocols;

/// <summary>
/// The number of blocks sent by other peers.
/// </summary>
public ulong BlocksReceived;
ulong BlocksReceived;

/// <summary>
/// The number of bytes sent by other peers.
/// </summary>
public ulong DataReceived;
ulong DataReceived;

/// <summary>
/// The number of blocks sent to other peers.
/// </summary>
public ulong BlocksSent;
ulong BlocksSent;

/// <summary>
/// The number of bytes sent to other peers.
/// </summary>
public ulong DataSent;
ulong DataSent;

/// <summary>
/// The number of duplicate blocks sent by other peers.
Expand All @@ -49,7 +51,7 @@ public class Bitswap : IService
/// A duplicate block is a block that is already stored in the
/// local repository.
/// </remarks>
public ulong DupBlksReceived;
ulong DupBlksReceived;

/// <summary>
/// The number of duplicate bytes sent by other peers.
Expand All @@ -58,7 +60,7 @@ public class Bitswap : IService
/// A duplicate block is a block that is already stored in the
/// local repository.
/// </remarks>
public ulong DupDataReceived;
ulong DupDataReceived;

/// <summary>
/// Creates a new instance of the <see cref="Bitswap"/> class.
Expand Down Expand Up @@ -104,6 +106,26 @@ public BitswapData Statistics
};
}
}

/// <summary>
/// Gets the bitswap ledger for the specified peer.
/// </summary>
/// <param name="peer">
/// The peer to get information on. If the peer is unknown, then a ledger
/// with zeros is returned.
/// </param>
/// <returns>
/// Statistics on the bitswap blocks exchanged with the peer.
/// </returns>
/// <seealso cref="Ipfs.CoreApi.IBitswapApi.LedgerAsync(Peer, CancellationToken)"/>
public BitswapLedger PeerLedger(Peer peer)
{
if (peerLedgers.TryGetValue(peer, out BitswapLedger ledger))
{
return ledger;
}
return new BitswapLedger { Peer = peer };
}

/// <summary>
/// Raised when a blocked is needed.
Expand All @@ -124,6 +146,9 @@ public Task StartAsync()
}
Swarm.ConnectionEstablished += Swarm_ConnectionEstablished;

// TODO: clear the stats.
peerLedgers.Clear();

return Task.CompletedTask;
}

Expand Down Expand Up @@ -270,6 +295,132 @@ public void Unwant(Cid id)
}
}

/// <summary>
/// Indicate that a remote peer sent a block.
/// </summary>
/// <param name="remote">
/// The peer that sent the block.
/// </param>
/// <param name="block">
/// The data for the block.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation.
/// </returns>
/// <remarks>
/// <para>
/// Updates the statistics.
/// </para>
/// <para>
/// If the block is acceptable then the <paramref name="block"/> is added to local cache
/// via the <see cref="BlockService"/>.
/// </para>
/// </remarks>
public Task OnBlockReceivedAsync(Peer remote, byte[] block)
{
return OnBlockReceivedAsync(remote, block, Cid.DefaultContentType, MultiHash.DefaultAlgorithmName);
}

/// <summary>
/// Indicate that a remote peer sent a block.
/// </summary>
/// <param name="remote">
/// The peer that sent the block.
/// </param>
/// <param name="block">
/// The data for the block.
/// </param>
/// <param name="contentType">
/// The <see cref="Cid.ContentType"/> of the block.
/// </param>
/// <param name="multiHash">
/// The multihash algorithm name of the block.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation.
/// </returns>
/// <remarks>
/// <para>
/// Updates the statistics.
/// </para>
/// <para>
/// If the block is acceptable then the <paramref name="block"/> is added to local cache
/// via the <see cref="BlockService"/>.
/// </para>
/// </remarks>
public async Task OnBlockReceivedAsync(Peer remote, byte[] block, string contentType, string multiHash)
{
// Update statistics.
++BlocksReceived;
DataReceived += (ulong)block.LongLength;
peerLedgers.AddOrUpdate(remote,
(peer) => new BitswapLedger
{
Peer = peer,
BlocksExchanged = 1,
DataReceived = (ulong)block.LongLength
},
(peer, ledger) =>
{
++ledger.BlocksExchanged;
DataReceived += (ulong)block.LongLength;
return ledger;
});

// TODO: Detect if duplicate and update stats
var isDuplicate = false;
if (isDuplicate)
{
++DupBlksReceived;
DupDataReceived += (ulong)block.Length;
}

// TODO: Determine if we should accept the block from the remote.
var acceptble = true;
if (acceptble)
{
await BlockService.PutAsync(
data: block,
contentType: contentType,
multiHash: multiHash,
pin: false)
.ConfigureAwait(false);
}
}

/// <summary>
/// Indicate that the local peer sent a block to a remote peer.
/// </summary>
/// <param name="remote">
/// The peer that sent the block.
/// </param>
/// <param name="block">
/// The data for the block.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation.
/// </returns>
public Task OnBlockSentAsync(Peer remote, IDataBlock block)
{
++BlocksSent;
DataSent += (ulong)block.Size;
peerLedgers.AddOrUpdate(remote,
(peer) => new BitswapLedger
{
Peer = peer,
BlocksExchanged = 1,
DataSent = (ulong)block.Size
},
(peer, ledger) =>
{
++ledger.BlocksExchanged;
DataSent += (ulong)block.Size;
return ledger;
});

return Task.CompletedTask;
}

/// <summary>
/// Indicate that a block is found.
/// </summary>
Expand Down
9 changes: 2 additions & 7 deletions src/BlockExchange/Bitswap1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ public async Task ProcessMessageAsync(PeerConnection connection, Stream stream,
log.Debug("got some blocks");
foreach (var sentBlock in request.blocks)
{
++Bitswap.BlocksReceived;
Bitswap.DataReceived += (ulong)sentBlock.Length;
await Bitswap.BlockService.PutAsync(sentBlock, pin: false).ConfigureAwait(false);
// TODO: Detect if duplicate and update stats
await Bitswap.OnBlockReceivedAsync(connection.RemotePeer, sentBlock);
}
}
}
Expand All @@ -108,7 +105,7 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
{
await SendAsync(stream, block, cancel).ConfigureAwait(false);
}

await Bitswap.OnBlockSentAsync(remotePeer, block);
}
catch (Exception e)
{
Expand Down Expand Up @@ -152,8 +149,6 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
)
{
log.Debug($"Sending block {block.Id}");
++Bitswap.BlocksSent;
Bitswap.DataSent += (ulong)block.Size;

var message = new Message
{
Expand Down
13 changes: 2 additions & 11 deletions src/BlockExchange/Bitswap11.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,12 @@ public async Task ProcessMessageAsync(PeerConnection connection, Stream stream,
log.Debug($"got block(s) from {connection.RemotePeer}");
foreach (var sentBlock in request.payload)
{
++Bitswap.BlocksReceived;
Bitswap.DataReceived += (ulong)sentBlock.data.Length;
using (var ms = new MemoryStream(sentBlock.prefix))
{
var version = ms.ReadVarint32();
var contentType = ms.ReadMultiCodec().Name;
var multiHash = MultiHash.GetHashAlgorithmName(ms.ReadVarint32());
await Bitswap.BlockService.PutAsync(
data: sentBlock.data,
contentType: contentType,
multiHash: multiHash,
pin: false).ConfigureAwait(false);
// TODO: Detect if duplicate and update stats
await Bitswap.OnBlockReceivedAsync(connection.RemotePeer, sentBlock.data, contentType, multiHash);
}
}
}
Expand All @@ -117,7 +110,7 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
{
await SendAsync(stream, block, cancel).ConfigureAwait(false);
}

await Bitswap.OnBlockSentAsync(remotePeer, block).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
Expand Down Expand Up @@ -166,8 +159,6 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
)
{
log.Debug($"Sending block {block.Id}");
++Bitswap.BlocksSent;
Bitswap.DataSent += (ulong)block.Size;
var message = new Message
{
payload = new List<Block>
Expand Down
5 changes: 3 additions & 2 deletions src/CoreApi/BitswapApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ public async Task<IDataBlock> GetAsync(Cid id, CancellationToken cancel = defaul
return await bs.WantAsync(id, peer.Id, cancel).ConfigureAwait(false);
}

public Task<BitswapLedger> LedgerAsync(Peer peer, CancellationToken cancel = default(CancellationToken))
public async Task<BitswapLedger> LedgerAsync(Peer peer, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
var bs = await ipfs.BitswapService.ConfigureAwait(false);
return bs.PeerLedger(peer);
}

public async Task UnwantAsync(Cid id, CancellationToken cancel = default(CancellationToken))
Expand Down
33 changes: 33 additions & 0 deletions test/CoreApi/BitswapApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ public async Task GetsBlock_OnConnect()
Assert.IsTrue(getTask.IsCompleted, "task not completed");
Assert.AreEqual(cid, block.Id);
CollectionAssert.AreEqual(data, block.DataBytes);

var otherPeer = await ipfsOther.LocalPeer;
var ledger = await ipfs.Bitswap.LedgerAsync(otherPeer);
Assert.AreEqual(otherPeer, ledger.Peer);
Assert.AreEqual(1UL, ledger.BlocksExchanged);
Assert.AreEqual((ulong)block.Size, ledger.DataReceived);
Assert.AreEqual(0UL, ledger.DataSent);
Assert.IsTrue(ledger.IsInDebt);

var localPeer = await ipfs.LocalPeer;
ledger = await ipfsOther.Bitswap.LedgerAsync(localPeer);
Assert.AreEqual(localPeer, ledger.Peer);
Assert.AreEqual(1UL, ledger.BlocksExchanged);
Assert.AreEqual(0UL, ledger.DataReceived);
Assert.AreEqual((ulong)block.Size, ledger.DataSent);
Assert.IsFalse(ledger.IsInDebt);
}
finally
{
Expand Down Expand Up @@ -215,6 +231,7 @@ public async Task GetsBlock_Cidv1()
await ipfs.StopAsync();
}
}

[TestMethod]
public async Task GetBlock_Timeout()
{
Expand All @@ -237,5 +254,21 @@ public async Task GetBlock_Timeout()
}
}

[TestMethod]
public async Task PeerLedger()
{
await ipfs.StartAsync();
try
{
var peer = await ipfsOther.LocalPeer;
var ledger = await ipfs.Bitswap.LedgerAsync(peer);
Assert.IsNotNull(ledger);
}
finally
{
await ipfs.StopAsync();
}
}

}
}

0 comments on commit 305de1b

Please sign in to comment.