Skip to content

Commit

Permalink
Merge pull request #52 from richardschneider/core-api-50-0
Browse files Browse the repository at this point in the history
Upgrade to core api v0.50.0
  • Loading branch information
richardschneider committed May 10, 2019
2 parents 60e4b2f + d5da246 commit 0996705
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 7 deletions.
29 changes: 25 additions & 4 deletions src/CoreApi/DhtApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,33 @@ public Task<Peer> FindPeerAsync(MultiHash id, CancellationToken cancel = default
return ipfs.IdAsync(id, cancel);
}

public async Task<IEnumerable<Peer>> FindProvidersAsync(Cid id, int limit = 20, CancellationToken cancel = default(CancellationToken))
{
public async Task<IEnumerable<Peer>> FindProvidersAsync(Cid id, int limit = 20, Action<Peer> providerFound = null, CancellationToken cancel = default(CancellationToken))
{
// TODO: providerFound action
var stream = await ipfs.PostDownloadAsync("dht/findprovs", cancel, id, $"num-providers={limit}");
return ProviderFromStream(stream, limit);
}

}

public Task<byte[]> GetAsync(byte[] key, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
}

public Task ProvideAsync(Cid cid, bool advertise = true, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
}

public Task PutAsync(byte[] key, out byte[] value, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
}

public Task<bool> TryGetAsync(byte[] key, out byte[] value, CancellationToken cancel = default(CancellationToken))
{
throw new NotImplementedException();
}

IEnumerable<Peer> ProviderFromStream(Stream stream, int limit = int.MaxValue)
{
using (var sr = new StreamReader(stream))
Expand Down
21 changes: 21 additions & 0 deletions src/CoreApi/PubSubApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ public async Task<IEnumerable<Peer>> PeersAsync(string topic = null, Cancellatio
return strings.Select(s => new Peer { Id = (string)s } );
}

public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
{
var url = new StringBuilder();
url.Append("/api/v0/pubsub/pub");
url.Append("?arg=");
url.Append(System.Net.WebUtility.UrlEncode(topic));
url.Append("&arg=");
var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length));
url.Append(data);
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel);
}

public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
{
using (MemoryStream ms = new MemoryStream())
{
message.CopyTo(ms);
return PublishAsync(topic, ms.ToArray(), cancel);
}
}

public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
{
var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
Expand Down
15 changes: 15 additions & 0 deletions src/IpfsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,21 @@ public async Task<string> DoCommandAsync(string command, CancellationToken cance
}
}

internal async Task DoCommandAsync(Uri url, CancellationToken cancel)
{
if (log.IsDebugEnabled)
log.Debug("POST " + url.ToString());
using (var response = await Api().PostAsync(url, null, cancel))
{
await ThrowOnErrorAsync(response);
var body = await response.Content.ReadAsStringAsync();
if (log.IsDebugEnabled)
log.Debug("RSP " + body);
return;
}
}


/// <summary>
/// Perform an <see href="https://ipfs.io/docs/api/">IPFS API command</see> returning
/// a specific <see cref="Type"/>.
Expand Down
4 changes: 2 additions & 2 deletions src/IpfsHttpClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Ipfs.Core" Version="0.40.0" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Ipfs.Core" Version="0.50.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'netstandard14'" />
<PackageReference Include="System.Net.Http" Version="4.3.3" Condition="'$(TargetFramework)' == 'net45'" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion test/CoreApi/DhtApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public async Task FindProviders()
{
var ipfs = TestFixture.Ipfs;
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
var providers = await ipfs.Dht.FindProvidersAsync(helloWorldID, 1, cts.Token);
var providers = await ipfs.Dht.FindProvidersAsync(helloWorldID, 1, cancel: cts.Token);
Assert.AreNotEqual(0, providers.Count());
}

Expand Down
55 changes: 55 additions & 0 deletions test/CoreApi/PubSubApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -173,5 +175,58 @@ public async Task Unsubscribe()
await Task.Delay(1000);
Assert.AreEqual(1, messageCount1);
}

[TestMethod]
public async Task Subscribe_BinaryMessage()
{
var messages = new List<IPublishedMessage>();
var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff };
var ipfs = TestFixture.Ipfs;
var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString();
var cs = new CancellationTokenSource();
try
{
await ipfs.PubSub.SubscribeAsync(topic, msg =>
{
messages.Add(msg);
}, cs.Token);
await ipfs.PubSub.PublishAsync(topic, expected);

await Task.Delay(1000);
Assert.AreEqual(1, messages.Count);
CollectionAssert.AreEqual(expected, messages[0].DataBytes);
}
finally
{
cs.Cancel();
}
}

[TestMethod]
public async Task Subscribe_StreamMessage()
{
var messages = new List<IPublishedMessage>();
var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff };
var ipfs = TestFixture.Ipfs;
var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString();
var cs = new CancellationTokenSource();
try
{
await ipfs.PubSub.SubscribeAsync(topic, msg =>
{
messages.Add(msg);
}, cs.Token);
var ms = new MemoryStream(expected, false);
await ipfs.PubSub.PublishAsync(topic, ms);

await Task.Delay(1000);
Assert.AreEqual(1, messages.Count);
CollectionAssert.AreEqual(expected, messages[0].DataBytes);
}
finally
{
cs.Cancel();
}
}
}
}

0 comments on commit 0996705

Please sign in to comment.