Skip to content

Commit

Permalink
feat(storage): Add support of streamed <In|Out|RA>Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dr1rrb committed Oct 9, 2020
1 parent cd2b766 commit aecc0f1
Show file tree
Hide file tree
Showing 14 changed files with 904 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@
#pragma warning disable 114 // new keyword hiding
namespace Windows.Storage
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
[global::Uno.NotImplemented]
#endif
public partial class StreamedFileDataRequest : global::Windows.Storage.Streams.IOutputStream,global::System.IDisposable,global::Windows.Storage.IStreamedFileDataRequest
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
[global::Uno.NotImplemented("__ANDROID__", "__IOS__", "NET461", "__WASM__", "__SKIA__", "__NETSTD_REFERENCE__", "__MACOS__")]
public global::Windows.Foundation.IAsyncOperationWithProgress<uint, uint> WriteAsync( global::Windows.Storage.Streams.IBuffer buffer)
{
throw new global::System.NotImplementedException("The member IAsyncOperationWithProgress<uint, uint> StreamedFileDataRequest.WriteAsync(IBuffer buffer) is not implemented in Uno.");
}
#endif
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
[global::Uno.NotImplemented("__ANDROID__", "__IOS__", "NET461", "__WASM__", "__SKIA__", "__NETSTD_REFERENCE__", "__MACOS__")]
public global::Windows.Foundation.IAsyncOperation<bool> FlushAsync()
{
throw new global::System.NotImplementedException("The member IAsyncOperation<bool> StreamedFileDataRequest.FlushAsync() is not implemented in Uno.");
}
#endif
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
[global::Uno.NotImplemented("__ANDROID__", "__IOS__", "NET461", "__WASM__", "__SKIA__", "__NETSTD_REFERENCE__", "__MACOS__")]
public void Dispose()
{
global::Windows.Foundation.Metadata.ApiInformation.TryRaiseNotImplemented("Windows.Storage.StreamedFileDataRequest", "void StreamedFileDataRequest.Dispose()");
}
#endif
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
[global::Uno.NotImplemented("__ANDROID__", "__IOS__", "NET461", "__WASM__", "__SKIA__", "__NETSTD_REFERENCE__", "__MACOS__")]
public void FailAndClose( global::Windows.Storage.StreamedFileFailureMode failureMode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#pragma warning disable 114 // new keyword hiding
namespace Windows.Storage
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
public delegate void StreamedFileDataRequestedHandler(global::Windows.Storage.StreamedFileDataRequest @stream);
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#pragma warning disable 114 // new keyword hiding
namespace Windows.Storage
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if false
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
[global::Uno.NotImplemented]
#endif
Expand Down
58 changes: 58 additions & 0 deletions src/Uno.UWP/Storage/StreamedFileDataRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#nullable enable

using System;
using System.IO;
using System.Threading;
using Windows.ApplicationModel.Appointments;
using Windows.Foundation;
using Windows.Storage.Streams;

namespace Windows.Storage
{
public sealed partial class StreamedFileDataRequest : IOutputStream, IDisposable, IStreamedFileDataRequest
{
private readonly StreamedRandomAccessStream _owner;
private readonly Stream _tempFile;
private readonly CancellationTokenSource _ct;

internal StreamedFileDataRequest(StreamedRandomAccessStream owner, Stream tempFile)
{
_owner = owner;
_tempFile = tempFile;
_ct = new CancellationTokenSource();
}

internal CancellationToken CancellationToken => _ct.Token;

internal void Abort() => _ct.Cancel();

public IAsyncOperationWithProgress<uint, uint> WriteAsync(IBuffer buffer)
=> new AsyncOperationWithProgress<uint, uint>(async (ct, op) =>
{
var write = _tempFile.WriteAsync(buffer);
write.Progress = (snd, p) => op.NotifyProgress(p);
var written = await write.AsTask(ct);
// We make sure to write the data to the disk before allow read to access it
_tempFile.FlushAsync(ct);
//_owner.OnDataLoadProgress(written);
return written;
});

public IAsyncOperation<bool> FlushAsync()
=> _tempFile.FlushAsyncOp(); // This is actually useless as we flush on each Write.

public void FailAndClose(StreamedFileFailureMode failureMode)
{
//_owner.OnDataLoadCompleted(failureMode);
}

public void Dispose()
{
_ct.Dispose();
//_owner.OnDataLoadCompleted(default);
}
}
}
80 changes: 80 additions & 0 deletions src/Uno.UWP/Storage/StreamedFileDataRequestedHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#nullable enable

using System;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Uno;
using Uno.Extensions;

namespace Windows.Storage
{
public delegate void StreamedFileDataRequestedHandler(StreamedFileDataRequest stream);

//internal static class StreamedFileDataRequestedHandlerHelper
//{
// public static StreamedFileDataRequestedHandler CreateFromUri(
// Uri uri,
// HttpMethod? method = null,
// HttpClient? client = null,
// ActionAsync<HttpResponseMessage?, Exception?>? onReady = null)
// {
// method ??= HttpMethod.Get;
// client ??= new HttpClient();

// return req => Task.Run(() => FetchAsync(req, uri, method, client, onReady, req.CancellationToken));
// }

// private static async Task FetchAsync(
// StreamedFileDataRequest req,
// Uri uri,
// HttpMethod method,
// HttpClient client,
// ActionAsync<HttpResponseMessage?, Exception?>? onReady,
// CancellationToken ct)
// {
// try
// {
// HttpResponseMessage response;
// try
// {
// var request = new HttpRequestMessage(method, uri);
// response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ct);

// response.EnsureSuccessStatusCode();

// if (onReady is {})
// {
// await onReady(ct, response, null);
// }
// }
// catch (Exception e) when (onReady is {})
// {
// await onReady(ct, null, e);
// throw;
// }

// if (response.Content is { } content)
// {
// var responseStream = await content.ReadAsStreamAsync();
// await responseStream.CopyToAsync(req.AsStreamForWrite(), 8192, ct);
// }
// }
// catch (Exception e)
// {
// if (req.Log().IsEnabled(LogLevel.Warning))
// {
// req.Log().LogWarning("Failed to load content", e);
// }

// req.FailAndClose(StreamedFileFailureMode.Failed);
// }
// finally
// {
// req.Dispose();
// }
// }
//}
}
22 changes: 22 additions & 0 deletions src/Uno.UWP/Storage/StreamedFileFailureMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma warning disable 108 // new keyword hiding
#pragma warning disable 114 // new keyword hiding
namespace Windows.Storage
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
[global::Uno.NotImplemented]
#endif
public enum StreamedFileFailureMode
{
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
Failed,
#endif
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
CurrentlyUnavailable,
#endif
#if __ANDROID__ || __IOS__ || NET461 || __WASM__ || __SKIA__ || __NETSTD_REFERENCE__ || __MACOS__
Incomplete,
#endif
}
#endif
}
46 changes: 46 additions & 0 deletions src/Uno.UWP/Storage/Streams/IStreamedDataLoader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#nullable enable

using System;
using System.Linq;
using Windows.Foundation;

namespace Windows.Storage.Streams
{
/// <summary>
/// This is responsible to asynchronously load the content of a remote content into a temporary file
/// </summary>
/// <remarks>
/// The temporary file belong to this downloader.
/// It's is responsibility to delete it on dispose.
/// Users have to keep an active reference on this downloader to maintain the file alive.
/// It might however has to share the file with an <see cref="IStreamedDataUploader"/>.
/// </remarks>
internal interface IStreamedDataLoader
{
/// <summary>
/// An event raised when some data has been saved into the temporary file.
/// </summary>
public event TypedEventHandler<IStreamedDataLoader, object?>? DataUpdated;

/// <summary>
/// Gets the temporary file in which data is loaded
/// </summary>
TemporaryFile File { get; }

/// <summary>
/// The content type of the loaded data
/// </summary>
string? ContentType { get; }

/// <summary>
/// Throws an exception if the load failed
/// </summary>
void CheckState();

/// <summary>
/// Indicates if the given position has been or not yet,
/// **or** the load is now completed and the given position will never be present.
/// </summary>
bool CanRead(ulong position);
}
}
36 changes: 36 additions & 0 deletions src/Uno.UWP/Storage/Streams/IStreamedDataUploader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#nullable enable

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Windows.Storage.Streams
{
/// <summary>
/// This is responsible to asynchronously upload the content of a remote
/// </summary>
/// <remarks>
/// The temporary file belong to this uploader.
/// It's is responsibility to delete it on dispose.
/// Users have to keep an active reference on this uploader to maintain the file alive.
/// It might however has to share the file with an <see cref="IStreamedDataLoader"/>.
/// </remarks>
internal interface IStreamedDataUploader
{
/// <summary>
/// Gets the temporary file in which data is loaded
/// </summary>
TemporaryFile File { get; }

/// <summary>
/// Throws an exception if the load failed
/// </summary>
void CheckState();

/// <summary>
/// Send a chunk of data to the remote
/// </summary>
public Task<bool> Push(ulong index, ulong length, CancellationToken ct);
}
}
92 changes: 92 additions & 0 deletions src/Uno.UWP/Storage/Streams/StreamedInputStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#nullable enable

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Windows.Foundation;

namespace Windows.Storage.Streams
{
internal class StreamedInputStream : IInputStream
{
private readonly IStreamedDataLoader _loader;
private readonly Stream _stream;
private readonly bool _hasStreamOwnership;

private ulong? _initialPosition;

public StreamedInputStream(IStreamedDataLoader loader, ulong position)
{
_loader = loader;
_stream = loader.File.Open(FileAccess.Read);
_hasStreamOwnership = true;
_initialPosition = position;
}

public StreamedInputStream(Stream stream, IStreamedDataLoader loader)
{
_loader = loader;
_stream = stream;
_hasStreamOwnership = false;
}

public IAsyncOperationWithProgress<IBuffer, uint> ReadAsync(IBuffer buffer, uint count, InputStreamOptions options)
=> new AsyncOperationWithProgress<IBuffer, uint>(async (ct, op) =>
{
var endPosition = (ulong)_stream.Position + (_initialPosition ?? 0) + count;
if (!_loader.CanRead(endPosition))
{
// The data is not ready yet.
// We have to wait for the data to be written into the temporary file before reading the value.
var asyncLoad = new TaskCompletionSource<object?>();
try
{
_loader.DataUpdated += OnDataLoaded;
if (!_loader.CanRead(endPosition))
{
using var ctReg = ct.Register(() => asyncLoad.TrySetCanceled(ct));
await asyncLoad.Task;
}
}
finally
{
_loader.DataUpdated -= OnDataLoaded;
}
void OnDataLoaded(IStreamedDataLoader loader, object? _)
{
if (_loader.CanRead(endPosition))
{
asyncLoad.TrySetResult(_);
}
}
}
if (_initialPosition.HasValue)
{
_stream.Seek((long)_initialPosition.Value, SeekOrigin.Begin);
_initialPosition = null;
}
var read = _stream.ReadAsync(buffer, count, options);
read.Progress = (_, progress) => op.NotifyProgress(progress);
return await read.AsTask(ct);
});

public void Dispose()
{
// Note: We DO NOT dispose the _loader as it might been used by some other stream (it's designed to be self-disposable)

if (_hasStreamOwnership)
{
_stream.Dispose();
}
}

~StreamedInputStream()
=> Dispose();
}
}
Loading

0 comments on commit aecc0f1

Please sign in to comment.