Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 85 additions & 29 deletions Storage/Extensions/HttpClientProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Threading.Tasks;
using BirdMessenger;
using BirdMessenger.Collections;
using BirdMessenger.Delegates;
using BirdMessenger.Infrastructure;
using Newtonsoft.Json;
using Supabase.Storage.Exceptions;

Expand Down Expand Up @@ -175,7 +177,7 @@ public static async Task<HttpResponseMessage> UploadAsync(
}
}

var response = await client.PostAsync(uri, content, cancellationToken);
var response = await client.PostAsync(uri, content, cancellationToken);

if (!response.IsSuccessStatusCode)
{
Expand All @@ -199,8 +201,8 @@ public static Task<HttpResponseMessage> UploadOrContinueFileAsync(
this HttpClient client,
Uri uri,
string filePath,
MetadataCollection metadata,
Dictionary<string, string>? headers = null,
MetadataCollection? metadata = null,
Progress<float>? progress = null,
CancellationToken cancellationToken = default
)
Expand All @@ -210,8 +212,8 @@ public static Task<HttpResponseMessage> UploadOrContinueFileAsync(
client,
uri,
fileStream,
headers,
metadata,
headers,
progress,
cancellationToken
);
Expand All @@ -221,8 +223,8 @@ public static Task<HttpResponseMessage> UploadOrContinueByteAsync(
this HttpClient client,
Uri uri,
byte[] data,
MetadataCollection metadata,
Dictionary<string, string>? headers = null,
MetadataCollection? metadata = null,
Progress<float>? progress = null,
CancellationToken cancellationToken = default
)
Expand All @@ -232,8 +234,8 @@ public static Task<HttpResponseMessage> UploadOrContinueByteAsync(
client,
uri,
stream,
headers,
metadata,
headers,
progress,
cancellationToken
);
Expand All @@ -243,8 +245,8 @@ private static async Task<HttpResponseMessage> ResumableUploadAsync(
this HttpClient client,
Uri uri,
Stream fileStream,
MetadataCollection metadata,
Dictionary<string, string>? headers = null,
MetadataCollection? metadata = null,
IProgress<float>? progress = null,
CancellationToken cancellationToken = default
)
Expand All @@ -266,32 +268,51 @@ private static async Task<HttpResponseMessage> ResumableUploadAsync(
}
}

var createOption = new TusCreateRequestOption()
var cacheKey =
$"{metadata["bucketName"]}/{metadata["objectName"]}/{metadata["contentType"]}";

UploadMemoryCache.TryGet(cacheKey, out var upload);
Uri? fileLocation = null;
if (upload == null)
{
Endpoint = uri,
Metadata = metadata,
UploadLength = fileStream.Length,
};
var createOption = new TusCreateRequestOption()
{
Endpoint = uri,
Metadata = metadata,
UploadLength = fileStream.Length,
};

var responseCreate = await client.TusCreateAsync(createOption, cancellationToken);
try
{
var responseCreate = await client.TusCreateAsync(
createOption,
cancellationToken
);

fileLocation = responseCreate.FileLocation;
UploadMemoryCache.Set(cacheKey, fileLocation.ToString());
}
catch (TusException error)
{
throw await HandleResponseError(error);
}
}

if (upload != null)
fileLocation = new Uri(upload);

var patchOption = new TusPatchRequestOption
{
FileLocation = responseCreate.FileLocation,
FileLocation = fileLocation,
Stream = fileStream,
UploadBufferSize = 6 * 1024 * 1024,
UploadType = UploadType.Chunk,
OnProgressAsync = x =>
OnProgressAsync = x => ReportProgressAsync(progress, x),
OnCompletedAsync = _ =>
{
if (progress == null)
return Task.CompletedTask;

var uploadedProgress = (float)x.UploadedSize / x.TotalSize * 100f;
progress.Report(uploadedProgress);

UploadMemoryCache.Remove(cacheKey);
return Task.CompletedTask;
},
OnCompletedAsync = _ => Task.CompletedTask,
OnFailedAsync = _ => Task.CompletedTask,
};

Expand All @@ -300,19 +321,54 @@ private static async Task<HttpResponseMessage> ResumableUploadAsync(
if (responsePatch.OriginResponseMessage.IsSuccessStatusCode)
return responsePatch.OriginResponseMessage;

var httpContent = await responsePatch.OriginResponseMessage.Content.ReadAsStringAsync();
throw await HandleResponseError(responsePatch.OriginResponseMessage);
}

private static Task ReportProgressAsync(
IProgress<float>? progress,
UploadProgressEvent progressInfo
)
{
if (progress == null)
return Task.CompletedTask;

var uploadedProgress = (float)progressInfo.UploadedSize / progressInfo.TotalSize * 100f;
progress.Report(uploadedProgress);

return Task.CompletedTask;
}

private static async Task<SupabaseStorageException> HandleResponseError(
HttpResponseMessage response
)
{
var httpContent = await response.Content.ReadAsStringAsync();
var errorResponse = JsonConvert.DeserializeObject<ErrorResponse>(httpContent);
var e = new SupabaseStorageException(errorResponse?.Message ?? httpContent)
var error = new SupabaseStorageException(errorResponse?.Message ?? httpContent)
{
Content = httpContent,
Response = response,
StatusCode = errorResponse?.StatusCode ?? (int)response.StatusCode,
};
error.AddReason();

return error;
}

private static async Task<SupabaseStorageException> HandleResponseError(
TusException response
)
{
var httpContent = await response.OriginHttpResponse.Content.ReadAsStringAsync();
var error = new SupabaseStorageException(httpContent)
{
Content = httpContent,
Response = responsePatch.OriginResponseMessage,
StatusCode =
errorResponse?.StatusCode
?? (int)responsePatch.OriginResponseMessage.StatusCode,
Response = response.OriginHttpResponse,
StatusCode = (int)response.OriginHttpResponse.StatusCode,
};
error.AddReason();

e.AddReason();
throw e;
return error;
}
}
}
4 changes: 2 additions & 2 deletions Storage/StorageFileApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,8 @@ private async Task UploadOrContinue(
await Helpers.HttpUploadClient!.UploadOrContinueFileAsync(
uri,
localPath,
headers,
metadata,
headers,
progress,
cancellationToken
);
Expand Down Expand Up @@ -792,8 +792,8 @@ private async Task UploadOrContinue(
await Helpers.HttpUploadClient!.UploadOrContinueByteAsync(
uri,
data,
headers,
metadata,
headers,
progress,
cancellationToken
);
Expand Down
139 changes: 139 additions & 0 deletions Storage/UploadMemoryCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System;
using System.Collections.Concurrent;
using System.Threading;

namespace Supabase.Storage;

/// <summary>
/// Provides thread-safe in-memory caching for resumable upload URLs with sliding expiration.
/// </summary>
public class UploadMemoryCache
{
private static readonly ConcurrentDictionary<string, CacheEntry> _cache = new();

private static TimeSpan _defaultTtl = TimeSpan.FromMinutes(60);

private static long _version; // helps with testing/observability if needed

private sealed class CacheEntry
{
public string Url { get; }
public DateTimeOffset Expiration { get; private set; }
public TimeSpan Ttl { get; }

public CacheEntry(string url, TimeSpan ttl)
{
Url = url;
Ttl = ttl <= TimeSpan.Zero ? TimeSpan.FromMinutes(5) : ttl;
Touch();
}

public void Touch()
{
Expiration = DateTimeOffset.UtcNow.Add(Ttl);
}

public bool IsExpired() => DateTimeOffset.UtcNow >= Expiration;
}

/// <summary>
/// Sets the default time-to-live duration for future cache entries.
/// </summary>
/// <param name="ttl">The time-to-live duration. If less than or equal to zero, defaults to 5 minutes.</param>
public static void SetDefaultTtl(TimeSpan ttl)
{
_defaultTtl = ttl <= TimeSpan.Zero ? TimeSpan.FromMinutes(5) : ttl;
}

// Store or upate the resumable upload URL for the provided key.
/// <summary>
/// Stores or updates a resumable upload URL in the cache for the specified key.
/// </summary>
/// <param name="key">The unique identifier for the cached URL.</param>
/// <param name="url">The resumable upload URL to cache.</param>
/// <param name="ttl">Optional time-to-live duration. If not specified, uses the default TTL.</param>
/// <exception cref="ArgumentException">Thrown when key or url is null, empty, or whitespace.</exception>
public static void Set(string key, string url, TimeSpan? ttl = null)
{
if (string.IsNullOrWhiteSpace(key))
throw new ArgumentException("Key must be provided.", nameof(key));
if (string.IsNullOrWhiteSpace(url))
throw new ArgumentException("Url must be provided.", nameof(url));

var entryTtl = ttl.GetValueOrDefault(_defaultTtl);
_cache.AddOrUpdate(
key,
_ => new CacheEntry(url, entryTtl),
(_, existing) => new CacheEntry(url, entryTtl)
);

Interlocked.Increment(ref _version);
CleanupIfNeeded();
}

/// <summary>
/// Attempts to retrieve a cached URL by its key. Updates the sliding expiration on successful retrieval.
/// </summary>
/// <param name="key">The unique identifier for the cached URL.</param>
/// <param name="url">When this method returns, contains the cached URL if found; otherwise, null.</param>
/// <returns>True if the URL was found in the cache; otherwise, false.</returns>
public static bool TryGet(string key, out string? url)
{
url = null;
if (string.IsNullOrWhiteSpace(key))
return false;

if (!_cache.TryGetValue(key, out var entry)) return false;
if (entry.IsExpired())
{
_cache.TryRemove(key, out _);
return false;
}

entry.Touch();
url = entry.Url;
return true;

}

/// <summary>
/// Removes a cached URL by its key.
/// </summary>
/// <param name="key">The unique identifier for the cached URL to remove.</param>
/// <returns>True if the URL was successfully removed; otherwise, false.</returns>
public static bool Remove(string key)
{
if (string.IsNullOrWhiteSpace(key))
return false;

var removed = _cache.TryRemove(key, out _);
if (removed)
Interlocked.Increment(ref _version);
return removed;
}

/// <summary>
/// Removes all cached URLs from the cache.
/// </summary>
public static void Clear()
{
_cache.Clear();
Interlocked.Increment(ref _version);
}

/// <summary>
/// Gets the current number of entries in the cache.
/// </summary>
public static int Count => _cache.Count;

private static void CleanupIfNeeded()
{
foreach (var kvp in _cache)
{
if (kvp.Value.IsExpired())
{
_cache.TryRemove(kvp.Key, out _);
}
}
}
}
5 changes: 4 additions & 1 deletion StorageTests/StorageFileTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public async Task UploadOrResumeByteWithInterruptionAndResume()

var options = new FileOptions { Duplex = "duplex", Metadata = metadata };

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
using var cts = new CancellationTokenSource();

try
{
Expand All @@ -280,6 +280,9 @@ await _bucket.UploadOrResume(
options,
(_, progress) =>
{
if (progress > 20)
cts.Cancel();

Console.WriteLine($"First upload progress: {progress}");
firstUploadProgressTriggered.TrySetResult(true);
},
Expand Down
Loading