Skip to content

Commit

Permalink
Make Receiver's HttpClient extensible (#2043)
Browse files Browse the repository at this point in the history
* Try to make Receiver's HttpClient extensible

* Add ClientTimeout and ClientHttpHandler

* use HttpMessageInvoker instead of HttpClient

* Use ResponseHeadersRead

* Revert ResponseHeadersRead change

---------

Co-authored-by: Miha Zupan <mihazupan.zupan1@gmail.com>
  • Loading branch information
catcherwong and MihaZupan committed Feb 28, 2023
1 parent ac2721d commit 754ceec
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
22 changes: 19 additions & 3 deletions src/Kubernetes.Controller/Protocol/Receiver.cs
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT License.

using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
Expand All @@ -13,6 +15,7 @@
using Yarp.Kubernetes.Controller.Configuration;
using Yarp.Kubernetes.Controller.Hosting;
using Yarp.Kubernetes.Controller.Rate;
using Yarp.ReverseProxy.Forwarder;

namespace Yarp.Kubernetes.Protocol;

Expand All @@ -35,15 +38,26 @@ public class Receiver : BackgroundHostedService

_options = options.Value;

if (_options.Client == null)
{
_options.Client = new(new SocketsHttpHandler
{
UseProxy = false,
AllowAutoRedirect = false,
AutomaticDecompression = DecompressionMethods.None,
UseCookies = false,
ActivityHeadersPropagator = new ReverseProxyPropagator(DistributedContextPropagator.Current),
ConnectTimeout = TimeSpan.FromSeconds(15),
});
}

// two requests per second after third failure
_limiter = new Limiter(new Limit(2), 3);
_proxyConfigProvider = proxyConfigProvider;
}

public override async Task RunAsync(CancellationToken cancellationToken)
{
using var client = new HttpClient();

while (!cancellationToken.IsCancellationRequested)
{
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -53,7 +67,9 @@ public override async Task RunAsync(CancellationToken cancellationToken)

try
{
using var stream = await client.GetStreamAsync(_options.ControllerUrl, cancellationToken).ConfigureAwait(false);
var requestMessage = new HttpRequestMessage(HttpMethod.Get, _options.ControllerUrl);
var responseMessage = await _options.Client.SendAsync(requestMessage, cancellationToken).ConfigureAwait(false);
using var stream = await responseMessage.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
using var reader = new StreamReader(stream, Encoding.UTF8, leaveOpen: true);
using var cancellation = cancellationToken.Register(stream.Close);
while (true)
Expand Down
3 changes: 3 additions & 0 deletions src/Kubernetes.Controller/Protocol/ReceiverOptions.cs
Expand Up @@ -2,10 +2,13 @@
// Licensed under the MIT License.

using System;
using System.Net.Http;

namespace Yarp.Kubernetes.Protocol;

public class ReceiverOptions
{
public Uri ControllerUrl { get; set; }

public HttpMessageInvoker Client { get; set; }
}

0 comments on commit 754ceec

Please sign in to comment.