Skip to content

Commit

Permalink
Using new TimeProvider #2084 (#2108)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tratcher committed Apr 26, 2023
1 parent 444005a commit be58d84
Show file tree
Hide file tree
Showing 53 changed files with 541 additions and 743 deletions.
1 change: 1 addition & 0 deletions eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>
<!--Package versions-->
<PropertyGroup>
<MicrosoftBclTimeProviderVersion>8.0.0-preview.4.23220.7</MicrosoftBclTimeProviderVersion>
<MicrosoftDotNetXUnitExtensionsPackageVersion>8.0.0-beta.23224.1</MicrosoftDotNetXUnitExtensionsPackageVersion>
</PropertyGroup>
</Project>
4 changes: 2 additions & 2 deletions global.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"sdk": {
"version": "8.0.100-preview.3.23178.7"
"version": "8.0.100-preview.4.23221.1"
},
"tools": {
"dotnet": "8.0.100-preview.3.23178.7",
"dotnet": "8.0.100-preview.4.23221.1",
"runtimes": {
"dotnet": [
"6.0.14",
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Combined/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Combined/Yarp.Kubernetes.IngressController.csproj
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Ingress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Ingress/Yarp.Kubernetes.Ingress.csproj
Expand Down
1 change: 1 addition & 0 deletions samples/KubernetesIngress.Sample/Monitor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY ["src/Kubernetes.Controller/Yarp.Kubernetes.Controller.csproj", "src/Kubern
COPY ["src/Directory.Build.props", "src/"]
COPY ["Directory.Build.*", "./"]
COPY ["NuGet.config", ""]
COPY ["eng/Versions.props", "eng/"]

# Build a cache layer with all of the nuget packages
RUN /root/.dotnet/dotnet restore samples/KubernetesIngress.Sample/Monitor/Yarp.Kubernetes.Monitor.csproj
Expand Down
20 changes: 10 additions & 10 deletions src/Kubernetes.Controller/Rate/Limiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Limiter
{
private readonly object _sync = new object();
private readonly Limit _limit;
private readonly ISystemClock _clock;
private readonly TimeProvider _timeProvider;
private readonly int _burst;
private double _tokens;

Expand All @@ -63,12 +63,12 @@ public class Limiter
/// </summary>
/// <param name="limit">The count per second which is allowed.</param>
/// <param name="burst">The burst.</param>
/// <param name="systemClock">Accessor for the current UTC time.</param>
public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
/// <param name="timeProvider">Accessor for the current UTC time.</param>
public Limiter(Limit limit, int burst, TimeProvider timeProvider = default)
{
_limit = limit;
_burst = burst;
_clock = systemClock ?? new SystemClock();
_timeProvider = timeProvider ?? TimeProvider.System;
}

/// <summary>
Expand All @@ -77,7 +77,7 @@ public Limiter(Limit limit, int burst, ISystemClock systemClock = default)
/// <returns><c>true</c> if a token is available and used, <c>false</c> otherwise.</returns>
public bool Allow()
{
return AllowN(_clock.UtcNow, 1);
return AllowN(_timeProvider.GetUtcNow(), 1);
}

/// <summary>
Expand All @@ -98,7 +98,7 @@ public bool AllowN(DateTimeOffset now, int number)
/// <returns>Reservation.</returns>
public Reservation Reserve()
{
return Reserve(_clock.UtcNow, 1);
return Reserve(_timeProvider.GetUtcNow(), 1);
}

/// <summary>
Expand Down Expand Up @@ -165,7 +165,7 @@ public async Task WaitAsync(int count, CancellationToken cancellationToken)

while (true)
{
var now = _clock.UtcNow;
var now = _timeProvider.GetUtcNow();
var r = ReserveImpl(now, count, waitLimit);
if (r.Ok)
{
Expand Down Expand Up @@ -198,7 +198,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
if (_limit == Limit.Max)
{
return new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: true,
tokens: number,
Expand All @@ -225,7 +225,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
if (ok)
{
var reservation = new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: true,
tokens: number,
Expand All @@ -241,7 +241,7 @@ private Reservation ReserveImpl(DateTimeOffset now, int number, TimeSpan maxFutu
else
{
var reservation = new Reservation(
clock: _clock,
timeProvider: _timeProvider,
limiter: this,
ok: false,
limit: _limit);
Expand Down
10 changes: 5 additions & 5 deletions src/Kubernetes.Controller/Rate/Reservation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ namespace Yarp.Kubernetes.Controller.Rate;
/// </summary>
public class Reservation
{
private readonly ISystemClock _clock;
private readonly TimeProvider _timeProvider;
private readonly Limiter _limiter;
private readonly Limit _limit;
private readonly double _tokens;

/// <summary>
/// Initializes a new instance of the <see cref="Reservation"/> class.
/// </summary>
/// <param name="clock">A system clock.</param>
/// <param name="timeProvider">Gets the system time.</param>
/// <param name="limiter">The limiter.</param>
/// <param name="ok">if set to <c>true</c> [ok].</param>
/// <param name="tokens">The tokens.</param>
/// <param name="timeToAct">The time to act.</param>
/// <param name="limit">The limit.</param>
public Reservation(
ISystemClock clock,
TimeProvider timeProvider,
Limiter limiter,
bool ok,
double tokens = default,
DateTimeOffset timeToAct = default,
Limit limit = default)
{
_clock = clock;
_timeProvider = timeProvider;
_limiter = limiter;
Ok = ok;
_tokens = tokens;
Expand All @@ -63,7 +63,7 @@ public class Reservation
/// <returns>TimeSpanOffset.</returns>
public TimeSpan Delay()
{
return DelayFrom(_clock.UtcNow);
return DelayFrom(_timeProvider.GetUtcNow());
}

/// <summary>
Expand Down
14 changes: 7 additions & 7 deletions src/ReverseProxy/Forwarder/HttpForwarder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ internal sealed class HttpForwarder : IHttpForwarder
private static readonly Version DefaultVersion = HttpVersion.Version20;
private static readonly HttpVersionPolicy DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrLower;
private readonly ILogger _logger;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;

public HttpForwarder(ILogger<HttpForwarder> logger, IClock clock)
public HttpForwarder(ILogger<HttpForwarder> logger, TimeProvider timeProvider)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
}

/// <summary>
Expand Down Expand Up @@ -577,7 +577,7 @@ private void FixupUpgradeRequestHeaders(HttpContext context, HttpRequestMessage
return new StreamCopyHttpContent(
request: request,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
clock: _clock,
timeProvider: _timeProvider,
activityToken);
}

Expand Down Expand Up @@ -737,10 +737,10 @@ private static ValueTask<bool> CopyResponseStatusAndHeadersAsync(HttpResponseMes
// :: Step 7-A-2: Copy duplex streams
using var destinationStream = await destinationResponse.Content.ReadAsStreamAsync(activityCancellationSource.Token);

var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _clock, activityCancellationSource,
var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource,
// HTTP/2 HttpClient request streams buffer by default.
autoFlush: destinationResponse.Version == HttpVersion.Version20, activityCancellationSource.Token).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token).AsTask();
var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token).AsTask();

// Make sure we report the first failure.
var firstTask = await Task.WhenAny(requestTask, responseTask);
Expand Down Expand Up @@ -864,7 +864,7 @@ private ForwarderError FixupUpgradeResponseHeaders(HttpContext context, HttpResp
{
using var destinationResponseStream = await destinationResponseContent.ReadAsStreamAsync(activityCancellationSource.Token);
// The response content-length is enforced by the server.
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _clock, activityCancellationSource, activityCancellationSource.Token);
return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, StreamCopier.UnknownLength, _timeProvider, activityCancellationSource, activityCancellationSource.Token);
}

return (StreamCopyResult.Success, null);
Expand Down
56 changes: 27 additions & 29 deletions src/ReverseProxy/Forwarder/StreamCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ internal static class StreamCopier
private const int DefaultBufferSize = 65536;
public const long UnknownLength = -1;

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
=> CopyAsync(isRequest, input, output, promisedContentLength, clock, activityToken, autoFlush: false, cancellation);
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, CancellationToken cancellation)
=> CopyAsync(isRequest, input, output, promisedContentLength, timeProvider, activityToken, autoFlush: false, cancellation);

public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, IClock clock, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
public static ValueTask<(StreamCopyResult, Exception?)> CopyAsync(bool isRequest, Stream input, Stream output, long promisedContentLength, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken, bool autoFlush, CancellationToken cancellation)
{
Debug.Assert(input is not null);
Debug.Assert(output is not null);
Debug.Assert(clock is not null);
Debug.Assert(timeProvider is not null);
Debug.Assert(activityToken is not null);

// Avoid capturing 'isRequest' and 'clock' in the state machine when telemetry is disabled
// Avoid capturing 'isRequest' and 'timeProvider' in the state machine when telemetry is disabled
var telemetry = ForwarderTelemetry.Log.IsEnabled(EventLevel.Informational, EventKeywords.All)
? new StreamCopierTelemetry(isRequest, clock)
? new StreamCopierTelemetry(isRequest, timeProvider)
: null;

return CopyAsync(input, output, promisedContentLength, telemetry, activityToken, autoFlush, cancellation);
Expand Down Expand Up @@ -142,48 +142,46 @@ private static async ValueTask<(StreamCopyResult, Exception?)> CopyAsync(Stream

private sealed class StreamCopierTelemetry
{
private static readonly TimeSpan _timeBetweenTransferringEvents = TimeSpan.FromSeconds(1);

private readonly bool _isRequest;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;
private long _contentLength;
private long _iops;
private TimeSpan _readTime;
private TimeSpan _writeTime;
private TimeSpan _firstReadTime;
private TimeSpan _lastTime;
private TimeSpan _nextTransferringEvent;
private long _readTime;
private long _writeTime;
private long _firstReadTime;
private long _lastTime;
private long _nextTransferringEvent;

public StreamCopierTelemetry(bool isRequest, IClock clock)
public StreamCopierTelemetry(bool isRequest, TimeProvider timeProvider)
{
_isRequest = isRequest;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_firstReadTime = new TimeSpan(-1);
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_firstReadTime = -1;

ForwarderTelemetry.Log.ForwarderStage(isRequest ? ForwarderStage.RequestContentTransferStart : ForwarderStage.ResponseContentTransferStart);

_lastTime = clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
_lastTime = timeProvider.GetTimestamp();
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
}

public void AfterRead(long contentLength)
{
_contentLength = contentLength;
_iops++;

var readStop = _clock.GetStopwatchTime();
var readStop = _timeProvider.GetTimestamp();
var currentReadTime = readStop - _lastTime;
_lastTime = readStop;
_readTime += currentReadTime;
if (_firstReadTime.Ticks < 0)
if (_firstReadTime < 0)
{
_firstReadTime = currentReadTime;
}
}

public void AfterWrite()
{
var writeStop = _clock.GetStopwatchTime();
var writeStop = _timeProvider.GetTimestamp();
_writeTime += writeStop - _lastTime;
_lastTime = writeStop;

Expand All @@ -193,12 +191,12 @@ public void AfterWrite()
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks);
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
_timeProvider.GetElapsedTime(0, _writeTime).Ticks);

// Avoid attributing the time taken by logging ContentTransferring to the next read call
_lastTime = _clock.GetStopwatchTime();
_nextTransferringEvent = _lastTime + _timeBetweenTransferringEvents;
_lastTime = _timeProvider.GetTimestamp();
_nextTransferringEvent = _lastTime + _timeProvider.TimestampFrequency;
}
}

Expand All @@ -208,9 +206,9 @@ public void Stop()
_isRequest,
_contentLength,
_iops,
_readTime.Ticks,
_writeTime.Ticks,
Math.Max(0, _firstReadTime.Ticks));
_timeProvider.GetElapsedTime(0, _readTime).Ticks,
_timeProvider.GetElapsedTime(0, _writeTime).Ticks,
_timeProvider.GetElapsedTime(0, Math.Max(0, _firstReadTime)).Ticks);
}
}
}
8 changes: 4 additions & 4 deletions src/ReverseProxy/Forwarder/StreamCopyHttpContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ internal sealed class StreamCopyHttpContent : HttpContent
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
private readonly bool _autoFlushHttpClientOutgoingStream;
private readonly IClock _clock;
private readonly TimeProvider _timeProvider;
private readonly ActivityCancellationTokenSource _activityToken;
private readonly TaskCompletionSource<(StreamCopyResult, Exception?)> _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _started;

public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, IClock clock, ActivityCancellationTokenSource activityToken)
public StreamCopyHttpContent(HttpRequest request, bool autoFlushHttpClientOutgoingStream, TimeProvider timeProvider, ActivityCancellationTokenSource activityToken)
{
_request = request ?? throw new ArgumentNullException(nameof(request));
_autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));

_activityToken = activityToken;
}
Expand Down Expand Up @@ -164,7 +164,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
}

// Check that the content-length matches the request body size. This can be removed in .NET 7 now that SocketsHttpHandler enforces this: https://github.com/dotnet/runtime/issues/62258.
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _clock, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _request.Body, stream, Headers.ContentLength ?? StreamCopier.UnknownLength, _timeProvider, _activityToken, _autoFlushHttpClientOutgoingStream, cancellationToken);
_tcs.TrySetResult((result, error));

// Check for errors that weren't the result of the destination failing.
Expand Down
4 changes: 2 additions & 2 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ internal partial class ActiveHealthCheckMonitor : IActiveHealthCheckMonitor, ICl
IOptions<ActiveHealthCheckMonitorOptions> monitorOptions,
IEnumerable<IActiveHealthCheckPolicy> policies,
IProbingRequestFactory probingRequestFactory,
ITimerFactory timerFactory,
TimeProvider timeProvider,
ILogger<ActiveHealthCheckMonitor> logger)
{
_monitorOptions = monitorOptions?.Value ?? throw new ArgumentNullException(nameof(monitorOptions));
_policies = policies?.ToDictionaryByUniqueId(p => p.Name) ?? throw new ArgumentNullException(nameof(policies));
_probingRequestFactory = probingRequestFactory ?? throw new ArgumentNullException(nameof(probingRequestFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Scheduler = new EntityActionScheduler<ClusterState>(cluster => ProbeCluster(cluster), autoStart: false, runOnce: false, timerFactory);
Scheduler = new EntityActionScheduler<ClusterState>(cluster => ProbeCluster(cluster), autoStart: false, runOnce: false, timeProvider);
}

public bool InitialProbeCompleted { get; private set; }
Expand Down

0 comments on commit be58d84

Please sign in to comment.