Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using new TimeProvider #2108

Merged
merged 9 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>
<!--Package versions-->
<PropertyGroup>
<MicrosoftBclTimeProviderVersion>8.0.0-preview.4.23220.7</MicrosoftBclTimeProviderVersion>
<MicrosoftDotNetXUnitExtensionsPackageVersion>8.0.0-beta.23224.1</MicrosoftDotNetXUnitExtensionsPackageVersion>
</PropertyGroup>
</Project>
4 changes: 2 additions & 2 deletions global.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"sdk": {
"version": "8.0.100-preview.3.23178.7"
"version": "8.0.100-preview.4.23221.1"
},
"tools": {
"dotnet": "8.0.100-preview.3.23178.7",
"dotnet": "8.0.100-preview.4.23221.1",
"runtimes": {
"dotnet": [
"6.0.14",
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Combined/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Combined/Yarp.Kubernetes.IngressController.csproj
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Ingress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Ingress/Yarp.Kubernetes.Ingress.csproj
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Monitor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Monitor/Yarp.Kubernetes.Monitor.csproj
Expand Down
20 changes: 10 additions & 10 deletions src/Kubernetes.Controller/Rate/Limiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Limiter
{
private readonly object _sync = new object();
private readonly Limit _limit;
private readonly ISystemClock _clock;
private readonly TimeProvider _timeProvider;
private readonly int _burst;
private double _tokens;

Expand All @@ -63,12 +63,12 @@ public class Limiter
/// </summary>
/// <param name="limit">The count per second which is allowed.</param>
/// <param name="burst">The burst.</param>
/// <param name="systemClock">Accessor for the current UTC time.</param>
public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
/// <param name="timeProvider">Accessor for the current UTC time.</param>
public Limiter(Limit limit, int burst, TimeProvider timeProvider = default)
{
_limit = limit;
_burst = burst;
_clock = systemClock ?? new SystemClock();
_timeProvider = timeProvider ?? TimeProvider.System;
}

/// <summary>
Expand All @@ -77,7 +77,7 @@ public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
/// <returns><c>true</c> if a token is available and used, <c>false</c> otherwise.</returns>
public bool Allow()
{
return AllowN(_clock.UtcNow, 1);
return AllowN(_timeProvider.GetUtcNow(), 1);
}

/// <summary>
Expand All @@ -98,7 +98,7 @@ public bool AllowN(DateTimeOffset now, int number)
/// <returns>Reservation.</returns>
public Reservation Reserve()
{
return Reserve(_clock.UtcNow, 1);
return Reserve(_timeProvider.GetUtcNow(), 1);
}

/// <summary>
Expand Down Expand Up @@ -165,7 +165,7 @@ public async Task WaitAsync(int count, CancellationToken cancellationToken)

while (true)
{
var now = _clock.UtcNow;
var now = _timeProvider.GetUtcNow();
var r = ReserveImpl(now, count, waitLimit);
if (r.Ok)
{
Expand Down Expand Up @@ -198,7 +198,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
if (_limit == Limit.Max)
{
return new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: true,
tokens: number,
Expand All @@ -225,7 +225,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
if (ok)
{
var reservation = new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: true,
tokens: number,
Expand All @@ -241,7 +241,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
else
{
var reservation = new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: false,
limit: _limit);
Expand Down
10 changes: 5 additions & 5 deletions src/Kubernetes.Controller/Rate/Reservation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ namespace Yarp.Kubernetes.Controller.Rate;
/// </summary>
public class Reservation
{
private readonly ISystemClock _clock;
private readonly TimeProvider _timeProvider;
private readonly Limiter _limiter;
private readonly Limit _limit;
private readonly double _tokens;

/// <summary>
/// Initializes a new instance of the <see cref="Reservation"/> class.
/// </summary>
/// <param name="clock">A system clock.</param>
/// <param name="timeProvider">Gets the system time.</param>
/// <param name="limiter">The limiter.</param>
/// <param name="ok">if set to <c>true</c> [ok].</param>
/// <param name="tokens">The tokens.</param>
/// <param name="timeToAct">The time to act.</param>
/// <param name="limit">The limit.</param>
public Reservation(
ISystemClock clock,
TimeProvider timeProvider,
Limiter limiter,
bool ok,
double tokens = default,
DateTimeOffset timeToAct = default,
Limit limit = default)
{
_clock = clock;
_timeProvider = timeProvider;
_limiter = limiter;
Ok = ok;
_tokens = tokens;
Expand All @@ -63,7 +63,7 @@ public class Reservation
/// <returns>TimeSpanOffset.</returns>
public TimeSpan Delay()
{
return DelayFrom(_clock.UtcNow);
return DelayFrom(_timeProvider.GetUtcNow());
}

/// <summary>
Expand Down
14 changes: 7 additions & 7 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ internal sealed class HttpForwarder : IHttpForwarder
private static readonly Version DefaultVersion = HttpVersion.Version20;
private static readonly HttpVersionPolicy DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
private readonly ILogger _logger;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;

public HttpForwarder(ILogger<HttpForwarder> logger, IClock clock)
public HttpForwarder(ILogger<HttpForwarder> logger, TimeProvider timeProvider)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
}

/// <summary>
Expand Down Expand Up @@ -577,7 +577,7 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
return new StreamCopyHttpContent(
request: request,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
clock: _clock,
timeProvider: _timeProvider,
activityToken);
}

Expand Down Expand Up @@ -737,10 +737,10 @@ private static ValueTask<bool> CopyResponseStatusAndHeadersAsync(HttpResponseMes
// :: Step 7-A-2: Copy duplex streams
using var destinationStream = await destinationResponse.Content.ReadAsStreamAsync(activityCancellationSource.Token);

var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _clock, activityCancellationSource,
var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource,
// HTTP/2 HttpClient request streams buffer by default.
autoFlush: destinationResponse.Version == HttpVersion.Version20, activityCancellationSource.Token).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token).AsTask();

// Make sure we report the first failure.
var firstTask = await Task.WhenAny(requestTask, responseTask);
Expand Down Expand Up @@ -864,7 +864,7 @@ private ForwarderError FixupUpgradeResponseHeaders(HttpContext context, HttpResp
{
using var destinationResponseStream = await destinationResponseContent.ReadAsStreamAsync(activityCancellationSource.Token);
// The response content-length is enforced by the server.
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token);
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token);
}

return (StreamCopyResult.Success, null);
Expand Down
56 changes: 27 additions & 29 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ internal static class StreamCopier
private const int DefaultBufferSize = 65536;
public const long UnknownLength = -1;

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
=> CopyAsync(isRequest, input, output, promisedContentLength, clock, activityToken, autoFlush: false, cancellation);
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
=> CopyAsync(isRequest, input, output, promisedContentLength, timeProvider, activityToken, autoFlush: false, cancellation);

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
{
Debug.Assert(input is not null);
Debug.Assert(output is not null);
Debug.Assert(clock is not null);
Debug.Assert(timeProvider is not null);
Debug.Assert(activityToken is not null);

// Avoid capturing 'isRequest' and 'clock' in the state machine when telemetry is disabled
// Avoid capturing 'isRequest' and 'timeProvider' in the state machine when telemetry is disabled
var telemetry = ForwarderTelemetry.Log.IsEnabled(EventLevel.Informational, EventKeywords.All)
? new StreamCopierTelemetry(isRequest, clock)
? new StreamCopierTelemetry(isRequest, timeProvider)
: null;

return CopyAsync(input, output, promisedContentLength, telemetry, activityToken, autoFlush, cancellation);
Expand Down Expand Up @@ -142,48 +142,46 @@ private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream

private sealed class StreamCopierTelemetry
{
private static readonly TimeSpan _timeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

private readonly bool _isRequest;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;
private long _contentLength;
private long _iops;
private TimeSpan _readTime;
private TimeSpan _writeTime;
private TimeSpan _firstReadTime;
private TimeSpan _lastTime;
private TimeSpan _nextTransferringEvent;
private long _readTime;
private long _writeTime;
private long _firstReadTime;
private long _lastTime;
private long _nextTransferringEvent;

public StreamCopierTelemetry(bool isRequest, IClock clock)
public StreamCopierTelemetry(bool isRequest, TimeProvider timeProvider)
{
_isRequest = isRequest;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_firstReadTime = new TimeSpan(-1);
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_firstReadTime = -1;

ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

_lastTime = clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
_lastTime = timeProvider.GetTimestamp();
Tratcher marked this conversation as resolved.
Show resolved Hide resolved
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
}

public void AfterRead(long contentLength)
{
_contentLength = contentLength;
_iops++;

var readStop = _clock.GetStopwatchTime();
var readStop = _timeProvider.GetTimestamp();
var currentReadTime = readStop - _lastTime;
_lastTime = readStop;
_readTime += currentReadTime;
if (_firstReadTime.Ticks < 0)
if (_firstReadTime < 0)
{
_firstReadTime = currentReadTime;
}
}

public void AfterWrite()
{
var writeStop = _clock.GetStopwatchTime();
var writeStop = _timeProvider.GetTimestamp();
_writeTime += writeStop - _lastTime;
_lastTime = writeStop;

Expand All @@ -193,12 +191,12 @@ public void AfterWrite()
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks);
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
_timeProvider.GetElapsedTime(0, _writeTime).Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
_lastTime = _clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
_lastTime = _timeProvider.GetTimestamp();
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
}
}

Expand All @@ -208,9 +206,9 @@ public void Stop()
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks,
Math.Max(0, _firstReadTime.Ticks));
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
_timeProvider.GetElapsedTime(0, _writeTime).Ticks,
_timeProvider.GetElapsedTime(0, Math.Max(0, _firstReadTime)).Ticks);
}
}
}
8 changes: 4 additions & 4 deletions src/ReverseProxy/Forwarder/StreamCopyHttpContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ internal sealed class StreamCopyHttpContent : HttpContent
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
private readonly bool _autoFlushHttpClientOutgoingStream;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;
private readonly ActivityCancellationTokenSource _activityToken;
private readonly TaskCompletionSource<(StreamCopyResult, Exception?)> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _started;

public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, IClock clock, ActivityCancellationTokenSource activityToken)
public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken)
{
_request = request ?? throw new ArgumentNullException(nameof(request));
_autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));

_activityToken = activityToken;
}
Expand Down Expand Up @@ -164,7 +164,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
}

// Check that the content-length matches the request body size. This can be removed in .NET 7 now that SocketsHttpHandler enforces this: https://github.com/dotnet/runtime/issues/62258.
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _clock, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _timeProvider, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
_tcs.TrySetResult((result, error));

// Check for errors that weren't the result of the destination failing.
Expand Down
4 changes: 2 additions & 2 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ internal partial class ActiveHealthCheckMonitor : IActiveHealthCheckMonitor, ICl
IOptions<ActiveHealthCheckMonitorOptions> monitorOptions,
IEnumerable<IActiveHealthCheckPolicy> policies,
IProbingRequestFactory probingRequestFactory,
ITimerFactory timerFactory,
TimeProvider timeProvider,
ILogger<ActiveHealthCheckMonitor> logger)
{
_monitorOptions = monitorOptions?.Value ?? throw new ArgumentNullException(nameof(monitorOptions));
_policies = policies?.ToDictionaryByUniqueId(p => p.Name) ?? throw new ArgumentNullException(nameof(policies));
_probingRequestFactory = probingRequestFactory ?? throw new ArgumentNullException(nameof(probingRequestFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Scheduler = new EntityActionScheduler<ClusterState>(cluster => ProbeCluster(cluster), autoStart: false, runOnce: false, timerFactory);
Scheduler = new EntityActionScheduler<ClusterState>(cluster => ProbeCluster(cluster), autoStart: false, runOnce: false, timeProvider);
}

public bool InitialProbeCompleted { get; private set; }
Expand Down