Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/CoreApi/DagApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,32 @@ public async Task<DagStatSummary> StatAsync(string cid, IProgress<DagStatSummary

public Task<Stream> ExportAsync(string path, CancellationToken cancellationToken = default)
{
return ipfs.DownloadAsync("dag/export", cancellationToken, path);
// Kubo expects POST for dag/export
return ipfs.PostDownloadAsync("dag/export", cancellationToken, path);
}

public async Task<CarImportOutput> ImportAsync(Stream stream, bool? pinRoots = null, bool stats = false, CancellationToken cancellationToken = default)
{
string[] options = [
$"pin-roots={pinRoots.ToString().ToLowerInvariant()}",
$"stats={stats.ToString().ToLowerInvariant()}"
];
// Respect Kubo default (pin roots = true) by omitting the flag when null.
var optionsList = new System.Collections.Generic.List<string>();
if (pinRoots.HasValue)
optionsList.Add($"pin-roots={pinRoots.Value.ToString().ToLowerInvariant()}");

optionsList.Add($"stats={stats.ToString().ToLowerInvariant()}");
var options = optionsList.ToArray();

using var resultStream = await ipfs.Upload2Async("dag/import", cancellationToken, stream, null, options);

// Read line-by-line
using var reader = new StreamReader(resultStream);

// First output is always of type CarImportOutput
// First output line may be absent on older Kubo when pin-roots=false
var json = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(json))
{
return new CarImportOutput();
}

var res = JsonConvert.DeserializeObject<CarImportOutput>(json);
if (res is null)
throw new InvalidDataException($"The response did not deserialize to {nameof(CarImportOutput)}.");
Expand All @@ -179,11 +188,14 @@ public async Task<CarImportOutput> ImportAsync(Stream stream, bool? pinRoots = n
if (stats)
{
json = await reader.ReadLineAsync();
var importStats = JsonConvert.DeserializeObject<CarImportStats>(json);
if (importStats is null)
throw new InvalidDataException($"The response did not deserialize a {nameof(CarImportStats)}.");
if (!string.IsNullOrEmpty(json))
{
var importStats = JsonConvert.DeserializeObject<CarImportStats>(json);
if (importStats is null)
throw new InvalidDataException($"The response did not deserialize a {nameof(CarImportStats)}.");

res.Stats = importStats;
res.Stats = importStats;
}
}

return res;
Expand Down
9 changes: 6 additions & 3 deletions src/CoreApi/FileSystemApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ private string[] ToApiOptions(AddFileOptions? options)
opts.Add($"nocopy={options.NoCopy.ToString().ToLowerInvariant()}");

if (options.Pin is not null)
opts.Add("pin=false");
opts.Add($"pin={options.Pin.ToString().ToLowerInvariant()}");

if (!string.IsNullOrEmpty(options.PinName))
opts.Add($"pin-name={options.PinName}");

if (options.Wrap is not null)
opts.Add($"wrap-with-directory={options.Wrap.ToString().ToLowerInvariant()}");
Expand All @@ -291,10 +294,10 @@ private string[] ToApiOptions(AddFileOptions? options)
opts.Add("progress=true");

if (options.Hash is not null)
opts.Add($"hash=${options.Hash}");
opts.Add($"hash={options.Hash}");

if (options.FsCache is not null)
opts.Add($"fscache={options.Wrap.ToString().ToLowerInvariant()}");
opts.Add($"fscache={options.FsCache.ToString().ToLowerInvariant()}");

if (options.ToFiles is not null)
opts.Add($"to-files={options.ToFiles}");
Expand Down
185 changes: 160 additions & 25 deletions src/CoreApi/PinApi.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using Google.Protobuf;
using Ipfs.CoreApi;
using Newtonsoft.Json.Linq;
using Ipfs.CoreApi;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System;
using System.IO;
using Newtonsoft.Json;

#nullable enable
namespace Ipfs.Http
{
class PinApi : IPinApi
Expand All @@ -17,41 +19,174 @@ internal PinApi(IpfsClient ipfs)
this.ipfs = ipfs;
}

public async Task<IEnumerable<Cid>> AddAsync(string path, bool recursive = true, CancellationToken cancel = default(CancellationToken))
public async Task<IEnumerable<Cid>> AddAsync(string path, PinAddOptions options, CancellationToken cancel = default)
{
var opts = "recursive=" + recursive.ToString().ToLowerInvariant();
var json = await ipfs.DoCommandAsync("pin/add", cancel, path, opts);
return ((JArray)JObject.Parse(json)["Pins"])
.Select(p => (Cid)(string)p);
options ??= new PinAddOptions();
var optList = new List<string>
{
"recursive=" + options.Recursive.ToString().ToLowerInvariant()
};
if (!string.IsNullOrEmpty(options.Name))
{
optList.Add("name=" + options.Name);
}
var json = await ipfs.DoCommandAsync("pin/add", cancel, path, optList.ToArray());
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(json);
var pins = dto?.Pins ?? new List<string>();
return pins.Select(p => (Cid)p);
}

public async Task<IEnumerable<Cid>> ListAsync(CancellationToken cancel = default(CancellationToken))
public async Task<IEnumerable<Cid>> AddAsync(string path, PinAddOptions options, IProgress<BlocksPinnedProgress> progress, CancellationToken cancel = default)
{
var json = await ipfs.DoCommandAsync("pin/ls", cancel);
var keys = (JObject)(JObject.Parse(json)["Keys"]);
return keys
.Properties()
.Select(p => (Cid)p.Name);
options ??= new PinAddOptions();
var optList = new List<string>
{
"recursive=" + options.Recursive.ToString().ToLowerInvariant(),
"progress=true"
};
if (!string.IsNullOrEmpty(options.Name))
{
optList.Add("name=" + options.Name);
}
var pinned = new List<Cid>();
var stream = await ipfs.PostDownloadAsync("pin/add", cancel, path, optList.ToArray());
using var sr = new StreamReader(stream);
while (!sr.EndOfStream && !cancel.IsCancellationRequested)
{
var line = await sr.ReadLineAsync();
if (string.IsNullOrWhiteSpace(line))
continue;
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(line);
if (dto is null)
continue;
if (dto.Progress.HasValue)
{
progress?.Report(new BlocksPinnedProgress { BlocksPinned = dto.Progress.Value });
}
if (dto.Pins != null)
{
foreach (var p in dto.Pins)
{
pinned.Add((Cid)p);
}
}
}
return pinned;
}

public async Task<IEnumerable<Cid>> ListAsync(PinType type, CancellationToken cancel = default(CancellationToken))
public async IAsyncEnumerable<PinListItem> ListAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
{
var typeOpt = type.ToString().ToLowerInvariant();
var json = await ipfs.DoCommandAsync("pin/ls", cancel,
null,
$"type={typeOpt}");
var keys = (JObject)(JObject.Parse(json)["Keys"]);
return keys
.Properties()
.Select(p => (Cid)p.Name);
// Default non-streaming, no names
foreach (var item in await ListItemsOnceAsync(null, new List<string>(), cancel))
{
yield return item;
}
}

public async IAsyncEnumerable<PinListItem> ListAsync(PinType type, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
{
var opts = new List<string> { $"type={type.ToString().ToLowerInvariant()}" };
foreach (var item in await ListItemsOnceAsync(null, opts, cancel))
{
yield return item;
}
}

public async IAsyncEnumerable<PinListItem> ListAsync(PinListOptions options, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
{
options ??= new PinListOptions();
var opts = new List<string>();
if (options.Type != PinType.All)
opts.Add($"type={options.Type.ToString().ToLowerInvariant()}");
if (!string.IsNullOrEmpty(options.Name))
{
opts.Add($"name={options.Name}");
opts.Add("names=true");
}
else if (options.Names)
{
opts.Add("names=true");
}

if (options.Stream)
{
await foreach (var item in ListItemsStreamAsync(null, opts, options.Names, cancel))
{
yield return item;
}
}
else
{
foreach (var item in await ListItemsOnceAsync(null, opts, cancel))
{
yield return item;
}
}
}

public async Task<IEnumerable<Cid>> RemoveAsync(Cid id, bool recursive = true, CancellationToken cancel = default(CancellationToken))
{
var opts = "recursive=" + recursive.ToString().ToLowerInvariant();
var json = await ipfs.DoCommandAsync("pin/rm", cancel, id, opts);
return ((JArray)JObject.Parse(json)["Pins"])
.Select(p => (Cid)(string)p);
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(json);
var pins = dto?.Pins ?? new List<string>();
return pins.Select(p => (Cid)p);
}

// Internal helper used by ListAsync overloads

async IAsyncEnumerable<PinListItem> ListItemsStreamAsync(string? path, List<string> opts, bool includeNames, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel)
{
opts = new List<string>(opts) { "stream=true" };
var stream = await ipfs.PostDownloadAsync("pin/ls", cancel, path, opts.ToArray());
using var sr = new StreamReader(stream);
while (!sr.EndOfStream && !cancel.IsCancellationRequested)
{
var line = await sr.ReadLineAsync();
if (string.IsNullOrWhiteSpace(line))
continue;
var dto = JsonConvert.DeserializeObject<PinLsObjectDto>(line);
if (dto is null || string.IsNullOrEmpty(dto.Cid))
continue;
yield return new PinListItem
{
Cid = (Cid)dto.Cid!,
Type = ParseType(dto.Type),
Name = dto.Name
};
}
}

async Task<IEnumerable<PinListItem>> ListItemsOnceAsync(string? path, List<string> opts, CancellationToken cancel)
{
var json = await ipfs.DoCommandAsync("pin/ls", cancel, path, opts.ToArray());
var root = JsonConvert.DeserializeObject<PinListResponseDto>(json);
var list = new List<PinListItem>();
if (root?.Keys != null)
{
foreach (var kv in root.Keys)
{
list.Add(new PinListItem
{
Cid = (Cid)kv.Key!,
Type = ParseType(kv.Value?.Type),
Name = string.IsNullOrEmpty(kv.Value?.Name) ? null : kv.Value!.Name
});
}
}
return list;
}

static PinType ParseType(string? t)
{
return t?.ToLowerInvariant() switch
{
"direct" => PinType.Direct,
"indirect" => PinType.Indirect,
"recursive" => PinType.Recursive,
"all" => PinType.All,
_ => PinType.All
};
}

}
Expand Down
41 changes: 41 additions & 0 deletions src/CoreApi/PinDto.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Collections.Generic;

#nullable enable
namespace Ipfs.Http
{
/// <summary>
/// Non-streaming response DTO for /api/v0/pin/ls.
/// </summary>
internal record PinListResponseDto
{
public Dictionary<string, PinInfoDto>? Keys { get; init; }
}

/// <summary>
/// DTO for entry value in PinListResponseDto.Keys.
/// </summary>
internal record PinInfoDto
{
public string? Name { get; init; }
public string? Type { get; init; }
}

/// <summary>
/// Streaming response DTO for /api/v0/pin/ls?stream=true.
/// </summary>
internal record PinLsObjectDto
{
public string? Cid { get; init; }
public string? Name { get; init; }
public string? Type { get; init; }
}

/// <summary>
/// Response DTO for /api/v0/pin/add and /api/v0/pin/rm which both return a Pins array.
/// </summary>
internal record PinChangeResponseDto
{
public int? Progress { get; init; }
public List<string>? Pins { get; init; }
}
}
2 changes: 1 addition & 1 deletion src/IpfsHttpClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Added missing IFileSystemApi.ListAsync. Doesn't fully replace the removed IFileS
</ItemGroup>

<ItemGroup>
<PackageReference Include="IpfsShipyard.Ipfs.Core" Version="0.7.0" />
<PackageReference Include="IpfsShipyard.Ipfs.Core" Version="0.8.0" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Multiformats.Base" Version="2.0.2" />
Expand Down
24 changes: 24 additions & 0 deletions test/AsyncEnumerableTestHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Ipfs.Http
{
internal static class AsyncEnumerableTestHelpers
{
public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source)
{
return source.ToArrayAsync().GetAwaiter().GetResult();
}

public static async Task<T[]> ToArrayAsync<T>(this IAsyncEnumerable<T> source)
{
var list = new List<T>();
await foreach (var item in source)
{
list.Add(item);
}
return list.ToArray();
}
}
}
Loading