Skip to content

Commit

Permalink
Adds Func<Stream, Task> to replace Action<Stream> for GetObjectAsync() (
Browse files Browse the repository at this point in the history
  • Loading branch information
ebozduman committed Jan 7, 2023
1 parent c06d104 commit afa9dad
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 47 deletions.
126 changes: 94 additions & 32 deletions Minio.Functional.Tests/FunctionalTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
Expand Down Expand Up @@ -273,6 +271,23 @@ public static string GetRandomName(int length = 5)
return "minio-dotnet-example-" + result;
}

internal static void generateRandomFile(string fileName)
{
using (var fs = new FileStream(fileName, FileMode.Create, FileAccess.Write, FileShare.None, 4096, true))
{
var fileSize = 3L * 1024 * 1024 * 1024;
var segments = fileSize / 10000;
var last_seg = fileSize % 10000;
var br = new BinaryWriter(fs);

for (long i = 0; i < segments; i++)
br.Write(new byte[10000]);

br.Write(new byte[last_seg]);
br.Close();
}
}

// Return true if running in Mint mode
public static bool IsMintEnv()
{
Expand Down Expand Up @@ -4023,36 +4038,6 @@ internal static async Task CopyObject_Test7(MinioClient minio)
}
}

public static void objPrint(object obj)
{
foreach (PropertyDescriptor descriptor in TypeDescriptor.GetProperties(obj))
{
var name = descriptor.Name;
var value = descriptor.GetValue(obj);
Console.WriteLine("{0}={1}", name, value);
}
}

public static void Print(object obj)
{
foreach (var prop in obj.GetType()
.GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance))
{
var value = prop.GetValue(obj, new object[] { });
Console.WriteLine("{0} = {1}", prop.Name, value);
}

Console.WriteLine("DONE!\n\n");
}

public static void printDict(Dictionary<string, string> d)
{
if (d != null)
foreach (var kv in d)
Console.WriteLine(" {0} = {1}", kv.Key, kv.Value);
Console.WriteLine("DONE!\n\n");
}

internal static async Task CopyObject_Test8(MinioClient minio)
{
var startTime = DateTime.Now;
Expand Down Expand Up @@ -4788,6 +4773,83 @@ internal static async Task GetObject_3_OffsetLength_Tests(MinioClient minio)
}
}

internal static async Task GetObject_AsyncCallback_Test1(MinioClient minio)
{
var startTime = DateTime.Now;
var bucketName = GetRandomName(15);
var objectName = GetRandomObjectName(10);
string contentType = null;
var fileName = GetRandomName(10);
var destFileName = GetRandomName(10);
var args = new Dictionary<string, string>
{
{ "bucketName", bucketName },
{ "objectName", objectName },
{ "contentType", contentType }
};

try
{
// Create a large local file
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) generateRandomFile(fileName);
else Bash("truncate -s 2G " + fileName);

// Create the bucket
await Setup_Test(minio, bucketName);

using (var filestream = new FileStream(File.OpenHandle(fileName), FileAccess.Read))
{
// Upload the large file, "fileName", into the bucket
var size = filestream.Length;
long file_read_size = 0;
var putObjectArgs = new PutObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithStreamData(filestream)
.WithObjectSize(filestream.Length)
.WithContentType(contentType);

await minio.PutObjectAsync(putObjectArgs).ConfigureAwait(false);

var callbackAsync = async delegate(Stream stream, CancellationToken cancellationToken)
{
using (var dest = new FileStream(destFileName, FileMode.Create, FileAccess.Write))
{
await stream.CopyToAsync(dest);
}
};

var getObjectArgs = new GetObjectArgs()
.WithBucket(bucketName)
.WithObject(objectName)
.WithCallbackStream(async (stream, cancellationToken) => await callbackAsync(stream, default));

await minio.GetObjectAsync(getObjectArgs).ConfigureAwait(false);
var writtenInfo = new FileInfo(destFileName);
file_read_size = writtenInfo.Length;
Assert.AreEqual(size, file_read_size);

new MintLogger("GetObject_LargeFile_Test0", getObjectSignature,
"Tests whether GetObject as stream works",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
}
catch (Exception ex)
{
new MintLogger("GetObject_LargeFile_Test0", getObjectSignature, "Tests whether GetObject as stream works",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message, ex.ToString(), args: args).Log();
throw;
}
finally
{
if (File.Exists(fileName))
File.Delete(fileName);
if (File.Exists(destFileName))
File.Delete(destFileName);
await TearDown(minio, bucketName);
}
}

internal static async Task FGetObject_Test1(MinioClient minio)
{
var startTime = DateTime.Now;
Expand Down
2 changes: 2 additions & 0 deletions Minio.Functional.Tests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public static void Main(string[] args)
// and length parameters. Tests will be reported as GetObject_Test3,
// GetObject_Test4 and GetObject_Test5.
FunctionalTest.GetObject_3_OffsetLength_Tests(minioClient).Wait();
// Test async callback function to download an object
FunctionalTest.GetObject_AsyncCallback_Test1(minioClient).Wait();

// Test File GetObject and PutObject functions
FunctionalTest.FGetObject_Test1(minioClient).Wait();
Expand Down
16 changes: 14 additions & 2 deletions Minio/DataModel/ObjectOperationsArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Linq;
using Minio.DataModel;
Expand Down Expand Up @@ -509,6 +511,7 @@ public GetObjectArgs()
}

internal Action<Stream> CallBack { get; private set; }
internal Func<Stream, CancellationToken, Task> FuncCallBack { get; private set; }
internal long ObjectOffset { get; private set; }
internal long ObjectLength { get; private set; }
internal string FileName { get; private set; }
Expand All @@ -517,7 +520,7 @@ public GetObjectArgs()
internal override void Validate()
{
base.Validate();
if (CallBack == null && string.IsNullOrEmpty(FileName))
if (CallBack == null && FuncCallBack == null && string.IsNullOrEmpty(FileName))
throw new MinioException("Atleast one of " + nameof(CallBack) + ", CallBack method or " + nameof(FileName) +
" file path to save need to be set for GetObject operation.");
if (OffsetLengthSet)
Expand Down Expand Up @@ -551,7 +554,10 @@ private void Populate()
internal override HttpRequestMessageBuilder BuildRequest(HttpRequestMessageBuilder requestMessageBuilder)
{
if (!string.IsNullOrEmpty(VersionId)) requestMessageBuilder.AddQueryParameter("versionId", $"{VersionId}");
requestMessageBuilder.ResponseWriter = CallBack;

if (CallBack is not null) requestMessageBuilder.ResponseWriter = CallBack;
else requestMessageBuilder.FunctionResponseWriter = FuncCallBack;

if (Headers.ContainsKey(S3ZipExtractKey))
requestMessageBuilder.AddQueryParameter(S3ZipExtractKey, Headers[S3ZipExtractKey]);

Expand All @@ -565,6 +571,12 @@ public GetObjectArgs WithCallbackStream(Action<Stream> cb)
return this;
}

public GetObjectArgs WithCallbackStream(Func<Stream, CancellationToken, Task> cb)
{
FuncCallBack = cb;
return this;
}

public GetObjectArgs WithOffsetAndLength(long offset, long length)
{
OffsetLengthSet = true;
Expand Down
50 changes: 37 additions & 13 deletions Minio/Helper/OperationsHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ private async Task<ObjectStat> getObjectHelper(GetObjectArgs args, CancellationT
args.Validate();
if (args.FileName != null)
await getObjectFileAsync(args, objStat, cancellationToken);
else
else if (args.CallBack is not null)
await getObjectStreamAsync(args, objStat, args.CallBack, cancellationToken);
else await getObjectStreamAsync(args, objStat, args.FuncCallBack, cancellationToken);
return objStat;
}

Expand All @@ -70,26 +71,26 @@ private async Task<ObjectStat> getObjectHelper(GetObjectArgs args, CancellationT
var length = objectStat.Size;
var etag = objectStat.ETag;

long tempFileSize = 0;
var tempFileName = $"{args.FileName}.{etag}.part.minio";
if (!string.IsNullOrEmpty(args.VersionId)) tempFileName = $"{args.FileName}.{etag}.{args.VersionId}.part.minio";
if (File.Exists(args.FileName)) File.Delete(args.FileName);

utils.ValidateFile(tempFileName);
if (File.Exists(tempFileName)) File.Delete(tempFileName);

args = args.WithCallbackStream(stream =>
var callbackAsync = async delegate(Stream stream, CancellationToken cancellationToken)
{
var fileStream = File.Create(tempFileName);
stream.CopyTo(fileStream);
fileStream.Dispose();
var writtenInfo = new FileInfo(tempFileName);
var writtenSize = writtenInfo.Length;
if (writtenSize != length - tempFileSize)
throw new IOException(tempFileName +
": Unexpected data written. Expected = "
+ (length - tempFileSize)
+ ", written = " + writtenSize);
using (var dest = new FileStream(tempFileName, FileMode.Create, FileAccess.Write))
{
await stream.CopyToAsync(dest);
}
};

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromMilliseconds(15));
args.WithCallbackStream(async (stream, cancellationToken) =>
{
await callbackAsync(stream, cts.Token);
utils.MoveWithReplace(tempFileName, args.FileName);
});
return getObjectStreamAsync(args, objectStat, null, cancellationToken);
Expand All @@ -114,6 +115,29 @@ private async Task<ObjectStat> getObjectHelper(GetObjectArgs args, CancellationT
.ConfigureAwait(false);
}

/// <summary>
/// private helper method. It returns the specified portion or full object from the bucket
/// </summary>
/// <param name="args">GetObjectArgs Arguments Object encapsulates information like - bucket name, object name etc </param>
/// <param name="objectStat">
/// ObjectStat object encapsulates information like - object name, size, etag etc, represents
/// Object Information
/// </param>
/// <param name="cb">
/// Callback function to send/process Object contents using
/// async Func object which takes Stream and CancellationToken as input
/// and Task as output, if assigned
/// </param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
private async Task getObjectStreamAsync(GetObjectArgs args, ObjectStat objectStat,
Func<Stream, CancellationToken, Task> cb,
CancellationToken cancellationToken = default)
{
var requestMessageBuilder = await CreateRequest(args).ConfigureAwait(false);
using var response = await ExecuteTaskAsync(NoErrorHandlers, requestMessageBuilder, cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// private helper method to remove list of objects from bucket
/// </summary>
Expand Down

0 comments on commit afa9dad

Please sign in to comment.