Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/PwrDrvr.LambdaDispatch.Extension/Function.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ private static async Task Main()

_logger.LogDebug("Lambda Extension Registering");
// The Extension API will not return so we cannot await this
await RegisterLambdaExtension().ConfigureAwait(false);
await RegisterLambdaExtension();
_logger.LogDebug("Lambda Extension Registered");

if (!_staticResponse)
{
_logger.LogDebug("Contained App - Skipping Startup, Waiting for Healthy");
// Wait for the health endpoint to return OK
await AwaitChildAppHealthy().ConfigureAwait(false);
await AwaitChildAppHealthy();
_logger.LogInformation("Contained App - Healthy");
}
else
Expand All @@ -80,7 +80,7 @@ private static async Task Main()
}
#if !SKIP_METRICS
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(() => MetricsRegistry.PrintMetrics(ctsShutdown.Token)).ConfigureAwait(false);
Task.Run(() => MetricsRegistry.PrintMetrics(ctsShutdown.Token));
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
#endif
Func<WaiterRequest, ILambdaContext, Task<WaiterResponse>> handler = FunctionHandler;
Expand Down Expand Up @@ -133,14 +133,14 @@ private static async Task RegisterLambdaExtension()
var execName = Path.GetFileName(Process.GetCurrentProcess().MainModule?.FileName ?? "lambda-dispatch");
_logger.LogInformation("Registering Lambda Extension with name: {}", execName);
registerRequest.Headers.Add("Lambda-Extension-Name", execName);
var registerResponse = await client.SendAsync(registerRequest).ConfigureAwait(false);
var registerResponse = await client.SendAsync(registerRequest);
if (!registerResponse.IsSuccessStatusCode)
{
throw new Exception($"Failed to register extension: {registerResponse.StatusCode}");
}
var extensionId = registerResponse.Headers.GetValues("Lambda-Extension-Identifier").FirstOrDefault();
// Discard the response body
await registerResponse.Content.CopyToAsync(Stream.Null).ConfigureAwait(false);
await registerResponse.Content.CopyToAsync(Stream.Null);
if (string.IsNullOrEmpty(extensionId))
{
throw new Exception($"Failed to get extension id");
Expand All @@ -153,7 +153,7 @@ private static async Task RegisterLambdaExtension()
{
var nextEventRequest = new HttpRequestMessage(HttpMethod.Get, nextEventUrl);
nextEventRequest.Headers.Add("Lambda-Extension-Identifier", extensionId);
var nextEventResponse = await client.SendAsync(nextEventRequest).ConfigureAwait(false);
var nextEventResponse = await client.SendAsync(nextEventRequest);
if (!nextEventResponse.IsSuccessStatusCode)
{
throw new Exception($"Failed to get next event: {nextEventResponse.StatusCode}");
Expand Down Expand Up @@ -196,7 +196,7 @@ private static async Task AwaitChildAppHealthy()
{
try
{
var response = await client.GetAsync(healthCheckUrl).ConfigureAwait(false);
var response = await client.GetAsync(healthCheckUrl);
if (response.StatusCode == HttpStatusCode.OK)
{
_logger.LogInformation("Contained App - Got OK result");
Expand All @@ -209,7 +209,7 @@ private static async Task AwaitChildAppHealthy()
_logger.LogDebug("Contained App - Healthcheck failed");
}

await Task.Delay(250).ConfigureAwait(false); // Wait for a second before polling again
await Task.Delay(250); // Wait for a second before polling again
}
while (true);

Expand Down Expand Up @@ -302,7 +302,7 @@ public static async Task<WaiterResponse> FunctionHandler(WaiterRequest request,
_logger.LogDebug("Getting request from Router");

(var outerStatus, var receivedRequest, var requestForResponse, var requestStreamForResponse, var duplexContent)
= await reverseRequester.GetRequest(channelId).ConfigureAwait(false);
= await reverseRequester.GetRequest(channelId);

lastWakeupTime = DateTime.Now;

Expand Down Expand Up @@ -361,7 +361,7 @@ public static async Task<WaiterResponse> FunctionHandler(WaiterRequest request,
{
// Return after headers are received
_logger.LogDebug("Sending request to Contained App");
using var response = await appHttpClient.SendAsync(receivedRequest, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using var response = await appHttpClient.SendAsync(receivedRequest, HttpCompletionOption.ResponseHeadersRead);

_logger.LogDebug("Got response from Contained App");

Expand All @@ -371,23 +371,23 @@ public static async Task<WaiterResponse> FunctionHandler(WaiterRequest request,
}

// Send the response back
await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId).ConfigureAwait(false);
await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId);
}
else
{
// NOTE: Static response is only for testing
// Read the bytes off the request body, if any
if (receivedRequest.Content != null)
{
await receivedRequest.Content.CopyToAsync(Stream.Null).ConfigureAwait(false);
await receivedRequest.Content.CopyToAsync(Stream.Null);
}

using var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new StringContent($"Hello from LambdaLB")
};

await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId).ConfigureAwait(false);
await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId);
}

#if !SKIP_METRICS
Expand All @@ -407,7 +407,7 @@ public static async Task<WaiterResponse> FunctionHandler(WaiterRequest request,
{
Content = new StringContent(ex.Message)
};
await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId).ConfigureAwait(false);
await reverseRequester.SendResponse(response, requestForResponse, requestStreamForResponse, duplexContent, channelId);
}
catch (Exception ex2)
{
Expand Down Expand Up @@ -517,7 +517,7 @@ public static async Task<WaiterResponse> FunctionHandler(WaiterRequest request,
{
try
{
var completedTask = await Task.WhenAny(tcsShutdown.Task, Task.Delay(TimeSpan.FromSeconds(5), pingCts.Token)).ConfigureAwait(false);
var completedTask = await Task.WhenAny(tcsShutdown.Task, Task.Delay(TimeSpan.FromSeconds(5), pingCts.Token));

if (completedTask == tcsShutdown.Task)
{
Expand Down
2 changes: 1 addition & 1 deletion src/PwrDrvr.LambdaDispatch.Extension/HttpDuplexContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
stream.Flush();
_waitForCompletion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_waitForStream.SetResult(stream);
await _waitForCompletion.Task.ConfigureAwait(false);
await _waitForCompletion.Task;
}

public Task<Stream> WaitForStreamAsync()
Expand Down
18 changes: 9 additions & 9 deletions src/PwrDrvr.LambdaDispatch.Extension/HttpReverseRequester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ public ValueTask DisposeAsync()
// Read the Response before sending the Request
// TODO: We can await the pair of the Response or Request closing to avoid deadlocks
//
var response = await _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
var response = await _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);

// Get the stream that we can write the response to
Stream requestStreamForResponse = await duplexContent.WaitForStreamAsync().ConfigureAwait(false);
Stream requestStreamForResponse = await duplexContent.WaitForStreamAsync();
if (response.StatusCode != HttpStatusCode.OK)
{
_logger.LogWarning("CLOSING - Got a {status} on the outer request LambdaId: {id}, ChannelId: {channelId}", response.StatusCode, _id, channelId);
Expand All @@ -173,7 +173,7 @@ public ValueTask DisposeAsync()
var headerBuffer = ArrayPool<byte>.Shared.Rent(32 * 1024);
try
{
var requestStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
var requestStream = await response.Content.ReadAsStreamAsync();

// Read up to max headers size of data
// Read until we fill the bufer OR we get an EOF
Expand All @@ -189,7 +189,7 @@ public ValueTask DisposeAsync()
break;
}

var bytesRead = await requestStream.ReadAsync(headerBuffer, totalBytesRead, headerBuffer.Length - totalBytesRead).ConfigureAwait(false);
var bytesRead = await requestStream.ReadAsync(headerBuffer, totalBytesRead, headerBuffer.Length - totalBytesRead);
if (bytesRead == 0)
{
// Done reading
Expand Down Expand Up @@ -445,12 +445,12 @@ public async Task SendResponse(HttpResponseMessage response, HttpRequestMessage
offset += endOfHeadersBytes.Length;

// Write the headers to the stream
await requestStreamForResponse.WriteAsync(headerBuffer.AsMemory(0, offset)).ConfigureAwait(false);
await requestStreamForResponse.WriteAsync(headerBuffer.AsMemory(0, offset));

// Copy the body from the request to the response
// NOTE: CopyToAsync will only start sending when EOF is read on the response stream
#if false
await response.Content.CopyToAsync(requestStreamForResponse).ConfigureAwait(false);
await response.Content.CopyToAsync(requestStreamForResponse);
#else
var bytes = ArrayPool<byte>.Shared.Rent(128 * 1024);
try
Expand All @@ -469,7 +469,7 @@ public async Task SendResponse(HttpResponseMessage response, HttpRequestMessage
ArrayPool<byte>.Shared.Return(bytes);
}
#endif
await requestStreamForResponse.FlushAsync().ConfigureAwait(false);
await requestStreamForResponse.FlushAsync();
requestStreamForResponse.Close();
duplexContent.Complete();
}
Expand Down Expand Up @@ -504,7 +504,7 @@ public async Task CloseInstance()
request.Headers.Host = $"lambdadispatch.local:{_uri.Port}";
request.Headers.Add("X-Lambda-Id", _id);

using var response = await _client.SendAsync(request).ConfigureAwait(false);
using var response = await _client.SendAsync(request);

if (response.StatusCode != HttpStatusCode.OK)
{
Expand Down Expand Up @@ -539,7 +539,7 @@ public async Task<bool> Ping()
request.Headers.Host = $"lambdadispatch.local:{_uri.Port}";
request.Headers.Add("X-Lambda-Id", _id);

using var response = await _client.SendAsync(request).ConfigureAwait(false);
using var response = await _client.SendAsync(request);

if (response.StatusCode != HttpStatusCode.OK)
{
Expand Down
4 changes: 2 additions & 2 deletions src/PwrDrvr.LambdaDispatch.Router/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public async Task AddRequest(HttpRequest incomingRequest, HttpResponse incomingR
}

// TODO: If we want to cancel we need to pass a token in here
runRequestResult = await lambdaConnection.RunRequest(incomingRequest, incomingResponse, accessLogProps, debugMode).ConfigureAwait(false);
runRequestResult = await lambdaConnection.RunRequest(incomingRequest, incomingResponse, accessLogProps, debugMode);
accessLogProps.StatusCode = incomingResponse.StatusCode;

if (debugMode)
Expand Down Expand Up @@ -770,7 +770,7 @@ private bool TryBackgroundDispatchOne(PendingRequest pendingRequest, ILambdaConn
"-"
);
}
}).ConfigureAwait(false);
});

return startedRequest;
}
Expand Down
24 changes: 12 additions & 12 deletions src/PwrDrvr.LambdaDispatch.Router/LambdaConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private async Task<long> ProxyRequestToLambda(HttpRequest incomingRequest, Acces
offset += 2;

// Send the headers to the Lambda
await Response.BodyWriter.WriteAsync(headerBuffer.AsMemory(0, offset), CTS.Token).ConfigureAwait(false);
await Response.BodyWriter.WriteAsync(headerBuffer.AsMemory(0, offset), CTS.Token);

if (debugMode)
{
Expand Down Expand Up @@ -353,14 +353,14 @@ private async Task<long> ProxyRequestToLambda(HttpRequest incomingRequest, Acces
ArrayPool<byte>.Shared.Return(bytes);
}

await incomingRequest.BodyReader.CompleteAsync().ConfigureAwait(false);
await incomingRequest.BodyReader.CompleteAsync();

_logger.LogDebug("Finished sending incoming request body to Lambda");
}

// Mark that the Request has been sent on the LambdaInstances
await Response.BodyWriter.CompleteAsync().ConfigureAwait(false);
await Response.CompleteAsync().ConfigureAwait(false);
await Response.BodyWriter.CompleteAsync();
await Response.CompleteAsync();

// Get the response from the lambda request and relay it back to the caller
_logger.LogDebug("Finished sending entire request to Lambda");
Expand Down Expand Up @@ -444,7 +444,7 @@ public async Task<RunRequestResult> RunRequest(HttpRequest incomingRequest, Http
// Wait for both to finish
// This allows us to continue sending request body while receiving
// and relaying the response body (duplex)
var completedTask = await Task.WhenAny(proxyRequestTask, proxyResponseTask).ConfigureAwait(false);
var completedTask = await Task.WhenAny(proxyRequestTask, proxyResponseTask);
if (completedTask.Exception != null)
{
_logger.LogError(completedTask.Exception, "{Method} {Url} {Protocol} {RemoteIP} {UserAgent} - {} Status - {} Bytes Received - {} Bytes Sent - RUNREQUEST - EXCEPTION",
Expand All @@ -464,14 +464,14 @@ public async Task<RunRequestResult> RunRequest(HttpRequest incomingRequest, Http
{
// ProxyRequestToLambda finished first
// Wait for RelayResponseFromLambda to finish
totalBytesWritten = await proxyResponseTask.ConfigureAwait(false);
totalBytesWritten = await proxyResponseTask;
totalBytesRead = completedTask.Result;
}
else
{
// RelayResponseFromLambda finished first
// Wait for ProxyRequestToLambda to finish
totalBytesRead = await proxyRequestTask.ConfigureAwait(false);
totalBytesRead = await proxyRequestTask;
totalBytesWritten = completedTask.Result;
}
}
Expand Down Expand Up @@ -555,7 +555,7 @@ public async Task<RunRequestResult> RunRequest(HttpRequest incomingRequest, Http

try
{
await incomingResponse.CompleteAsync().ConfigureAwait(false);
await incomingResponse.CompleteAsync();
}
catch { }

Expand Down Expand Up @@ -595,7 +595,7 @@ private async Task<long> RelayResponseFromLambda(HttpResponse incomingResponse,
break;
}

var bytesRead = await Request.Body.ReadAsync(headerBuffer.AsMemory(totalBytesRead, headerBuffer.Length - totalBytesRead), CTS.Token).ConfigureAwait(false);
var bytesRead = await Request.Body.ReadAsync(headerBuffer.AsMemory(totalBytesRead, headerBuffer.Length - totalBytesRead), CTS.Token);
if (bytesRead == 0)
{
// Done reading
Expand Down Expand Up @@ -734,7 +734,7 @@ private async Task<long> RelayResponseFromLambda(HttpResponse incomingResponse,
// There are bytes left in the buffer
// Copy them to the response
totalBodyBytesRead += totalBytesRead - startOfNextLine;
await incomingResponse.BodyWriter.WriteAsync(headerBuffer.AsMemory(startOfNextLine, totalBytesRead - startOfNextLine), CTS.Token).ConfigureAwait(false);
await incomingResponse.BodyWriter.WriteAsync(headerBuffer.AsMemory(startOfNextLine, totalBytesRead - startOfNextLine), CTS.Token);
totalBodyBytesWritten += totalBytesRead - startOfNextLine;
}
}
Expand Down Expand Up @@ -785,7 +785,7 @@ private async Task<long> RelayResponseFromLambda(HttpResponse incomingResponse,
{
try
{
bytesRead = await responseStream.ReadAsync(bytes, CTS.Token).ConfigureAwait(false);
bytesRead = await responseStream.ReadAsync(bytes, CTS.Token);
}
catch (Exception ex)
{
Expand All @@ -809,7 +809,7 @@ private async Task<long> RelayResponseFromLambda(HttpResponse incomingResponse,

try
{
await incomingResponse.Body.WriteAsync(bytes.AsMemory(0, bytesRead), CTS.Token).ConfigureAwait(false);
await incomingResponse.Body.WriteAsync(bytes.AsMemory(0, bytesRead), CTS.Token);
totalBodyBytesWritten += bytesRead;
}
catch (Exception ex)
Expand Down
2 changes: 1 addition & 1 deletion src/PwrDrvr.LambdaDispatch.Router/LambdaInstanceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public async Task<AddConnectionResult> AddConnectionForLambda(HttpRequest reques
{
// Add the connection to the instance
// The instance will eventually get rebalanced in the least outstanding queue
var result = await instance.AddConnection(request, response, channelId, dispatchMode).ConfigureAwait(false);
var result = await instance.AddConnection(request, response, channelId, dispatchMode);

if (result.WasRejected || result.Connection == null)
{
Expand Down
4 changes: 2 additions & 2 deletions src/PwrDrvr.LambdaDispatch.Router/LeastOutstandingQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private async Task RebalanceQueue()
// Wait for a short period before checking again
try
{
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationTokenSource.Token).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
Expand Down Expand Up @@ -443,7 +443,7 @@ private async Task LogQueueSizes()
// Wait a bit
try
{
await Task.Delay(TimeSpan.FromSeconds(10), cancellationTokenSource.Token).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(10), cancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
Expand Down
4 changes: 2 additions & 2 deletions src/PwrDrvr.LambdaDispatch.Router/MetricsRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ public async Task PrintMetrics()
{
while (true)
{
await Task.WhenAll(this.Metrics.ReportRunner.RunAllAsync()).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
await Task.WhenAll(this.Metrics.ReportRunner.RunAllAsync());
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/PwrDrvr.LambdaDispatch.Router/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void ConfigureServices(IServiceCollection services)
#if !SKIP_METRICS
if (_config.LogPeriodicMetrics)
{
Task.Run(metricsRegistry.PrintMetrics).ConfigureAwait(false);
Task.Run(metricsRegistry.PrintMetrics);
}
#endif
}
Expand Down
Loading
Loading