Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[151] Runner executor #404

Merged
merged 83 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
1e625a3
initial commit
giventocode Sep 7, 2023
b549b02
initial implementation
giventocode Sep 13, 2023
0992248
test refactoring
giventocode Sep 14, 2023
6ede9ab
test cleanup and initial basic implementation
giventocode Sep 15, 2023
092ff37
mrege and refactoring
giventocode Sep 19, 2023
feef1e4
enable runner execution
giventocode Sep 19, 2023
a630114
script changes
giventocode Sep 19, 2023
49ff02c
fixed image tag setting
giventocode Sep 19, 2023
d2f2e78
escaping fixes
giventocode Sep 19, 2023
22fbf8e
node task converter refactoring
giventocode Sep 20, 2023
4fcf692
converter refactoring
giventocode Sep 21, 2023
e6baefb
test refactoring
giventocode Sep 21, 2023
09985da
node execution command.
giventocode Sep 21, 2023
31347c1
refactoring and wget line fixes
giventocode Sep 21, 2023
96e8db0
add script ending and test
giventocode Sep 22, 2023
d9a0a04
logging updates
giventocode Sep 22, 2023
be60f8f
Merge branch 'ja-nodetask-builder' of https://github.com/microsoft/ga…
giventocode Sep 22, 2023
3cc4ccc
updated metrics nmae
giventocode Sep 22, 2023
3892a09
run command fixes.
giventocode Sep 22, 2023
3fefd2a
additional tests, formatting and refactoring
giventocode Sep 23, 2023
35900c2
refactoring and additional tests
giventocode Sep 24, 2023
15ce271
pr feedback
giventocode Sep 26, 2023
24ccc91
test fixes
giventocode Sep 26, 2023
6c3d171
Update Azure.Identity, Azure.Core, and Microsoft.Azure.Batch nuget pa…
MattMcL4475 Sep 26, 2023
a8971f6
Use Storage Blob Data Contributor instead of Reader
MattMcL4475 Sep 26, 2023
950b304
handle external storage accounts
giventocode Sep 27, 2023
d7c7493
arm strategy skips urls with sas tokens
giventocode Sep 27, 2023
650163d
enabling setting the client id for mi
giventocode Sep 28, 2023
26f8605
Merge branch 'ja-nodetask-builder' of https://github.com/microsoft/ga…
giventocode Sep 28, 2023
ff72ece
merge fix
giventocode Sep 28, 2023
fbc2602
resourceid for mi, and digest image name
giventocode Sep 28, 2023
0406f64
pull retries and uses global identity as fallback
giventocode Sep 29, 2023
80a1435
logging statements
giventocode Sep 29, 2023
18fb6f3
join fix
giventocode Sep 29, 2023
4ba6ae3
creds manager with retries
giventocode Sep 29, 2023
36d03c5
uploads tes task
giventocode Sep 29, 2023
ab01511
test fixes
giventocode Sep 29, 2023
b58532e
external accounts as local paths
giventocode Sep 29, 2023
1a90a36
formatting
giventocode Sep 29, 2023
9148ff8
multiplexed stream reading
giventocode Sep 30, 2023
6ffdbcf
code clean up
giventocode Oct 1, 2023
f2db9df
initial commit
giventocode Sep 7, 2023
71a2b87
initial implementation
giventocode Sep 13, 2023
11e00b0
test refactoring
giventocode Sep 14, 2023
46ff703
test cleanup and initial basic implementation
giventocode Sep 15, 2023
75c3a35
mrege and refactoring
giventocode Sep 19, 2023
97fa979
enable runner execution
giventocode Sep 19, 2023
6ddf722
script changes
giventocode Sep 19, 2023
5c6e166
fixed image tag setting
giventocode Sep 19, 2023
d877176
escaping fixes
giventocode Sep 19, 2023
e6ce03e
node task converter refactoring
giventocode Sep 20, 2023
85439a0
converter refactoring
giventocode Sep 21, 2023
4be59a2
test refactoring
giventocode Sep 21, 2023
b512de5
node execution command.
giventocode Sep 21, 2023
d6d87ec
logging updates
giventocode Sep 22, 2023
a77791b
refactoring and wget line fixes
giventocode Sep 21, 2023
1238315
add script ending and test
giventocode Sep 22, 2023
6f55577
updated metrics nmae
giventocode Sep 22, 2023
827fc57
run command fixes.
giventocode Sep 22, 2023
a3282f1
additional tests, formatting and refactoring
giventocode Sep 23, 2023
fb46a4b
refactoring and additional tests
giventocode Sep 24, 2023
8b9c520
pr feedback
giventocode Sep 26, 2023
e3404e9
test fixes
giventocode Sep 26, 2023
6a79134
enabling setting the client id for mi
giventocode Sep 28, 2023
eb92c69
Update Azure.Identity, Azure.Core, and Microsoft.Azure.Batch nuget pa…
MattMcL4475 Sep 26, 2023
a8244f7
Use Storage Blob Data Contributor instead of Reader
MattMcL4475 Sep 26, 2023
fbd86e0
handle external storage accounts
giventocode Sep 27, 2023
4138391
arm strategy skips urls with sas tokens
giventocode Sep 27, 2023
cbc2a3c
merge fix
giventocode Sep 28, 2023
42c7ffa
resourceid for mi, and digest image name
giventocode Sep 28, 2023
8dd7898
pull retries and uses global identity as fallback
giventocode Sep 29, 2023
ea082cf
logging statements
giventocode Sep 29, 2023
8222077
join fix
giventocode Sep 29, 2023
29f5f3e
creds manager with retries
giventocode Sep 29, 2023
fe4ec37
uploads tes task
giventocode Sep 29, 2023
1e4721f
test fixes
giventocode Sep 29, 2023
3cd6c80
external accounts as local paths
giventocode Sep 29, 2023
9ddcc39
formatting
giventocode Sep 29, 2023
abf1438
multiplexed stream reading
giventocode Sep 30, 2023
0aca93c
code clean up
giventocode Oct 1, 2023
760348a
merge fixes
giventocode Oct 1, 2023
0980bf9
Merge branch 'ja-nodetask-builder' of https://github.com/microsoft/ga…
giventocode Oct 1, 2023
5c92fe4
Merge commit 'e1ae065f0ea6fe998f8b3f18599abb22b76b3e2b' into ja-nodet…
giventocode Oct 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/CommonUtilities/Models/NodeTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ namespace Tes.Runner.Models
{
public class NodeTask
{
public string? Id { get; set; }
public string? WorkflowId { get; set; }
public string? ImageTag { get; set; }
public string? ImageName { get; set; }
public string? ContainerWorkDir { get; set; }
public List<string>? CommandsToExecute { get; set; }
public List<FileInput>? Inputs { get; set; }
public List<FileOutput>? Outputs { get; set; }
Expand All @@ -32,12 +35,20 @@ public class FileInput
public string? Path { get; set; }
public string? MountParentDirectory { get; set; }
public string? SourceUrl { get; set; }
public TransformationStrategy? SasStrategy { get; set; }
public TransformationStrategy? TransformationStrategy { get; set; }
}

public class RuntimeOptions
{
public TerraRuntimeOptions? Terra { get; set; }

public string? NodeManagedIdentityResourceId { get; set; }
}

public class DockerCleanUpOptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is no longer used, please remove.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have been line 48

{
public bool ExecuteRmi { get; set; }
public bool ExecutePrune { get; set; }
}


Expand Down
2 changes: 1 addition & 1 deletion src/GenerateBatchVmSkus/GenerateBatchVmSkus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.8.2" />
<PackageReference Include="Azure.Identity" Version="1.10.1" />
<PackageReference Include="Azure.ResourceManager.Batch" Version="1.1.1" />
<PackageReference Include="Azure.ResourceManager.Compute" Version="1.1.0" />
<PackageReference Include="Azure.Security.KeyVault.Secrets" Version="4.4.0" />
Expand Down
5 changes: 2 additions & 3 deletions src/Tes.ApiClients/TerraWsmApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure.Core;
using Azure.Identity;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Tes.ApiClients.Models.Terra;
Expand Down Expand Up @@ -35,13 +34,13 @@ public class TerraWsmApiClient : TerraApiClient

}

public static TerraWsmApiClient CreateTerraWsmApiClient(string apiUrl)
public static TerraWsmApiClient CreateTerraWsmApiClient(string apiUrl, TokenCredential tokenCredential)
{
var retryPolicyOptions = new RetryPolicyOptions();
var cacheRetryHandler = new CachingRetryHandler(sharedMemoryCache,
Microsoft.Extensions.Options.Options.Create(retryPolicyOptions));

return new TerraWsmApiClient(apiUrl, new DefaultAzureCredential(), cacheRetryHandler, ApiClientsLoggerFactory.Create<TerraWsmApiClient>());
return new TerraWsmApiClient(apiUrl, tokenCredential, cacheRetryHandler, ApiClientsLoggerFactory.Create<TerraWsmApiClient>());
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/Tes.ApiClients/Tes.ApiClients.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.31.0" />
<PackageReference Include="Azure.Identity" Version="1.8.2" />
<PackageReference Include="Azure.Core" Version="1.35.0" />
<PackageReference Include="Azure.Identity" Version="1.10.1" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
Expand Down
6 changes: 3 additions & 3 deletions src/Tes.Runner.Test/ResolutionPolicyHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public async Task ApplyResolutionPolicyAsync_WhenTestTaskInputsIsNotEmpty_Return
{
var testTaskInputs = new List<FileInput>
{
new FileInput(){Path = "file", SourceUrl = "http://foo.bar", SasStrategy = TransformationStrategy.None},
new FileInput(){Path = "file1", SourceUrl = "http://foo1.bar", SasStrategy = TransformationStrategy.None},
new FileInput(){Path = "file2", SourceUrl = "http://foo2.bar", SasStrategy = TransformationStrategy.None}
new FileInput(){Path = "file", SourceUrl = "http://foo.bar", TransformationStrategy = TransformationStrategy.None},
new FileInput(){Path = "file1", SourceUrl = "http://foo1.bar", TransformationStrategy = TransformationStrategy.None},
new FileInput(){Path = "file2", SourceUrl = "http://foo2.bar", TransformationStrategy = TransformationStrategy.None}
};
var result = await resolutionPolicyHandler.ApplyResolutionPolicyAsync(testTaskInputs);
Assert.IsNotNull(result);
Expand Down
15 changes: 15 additions & 0 deletions src/Tes.Runner.Test/Storage/ArmUrlTransformationStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class ArmUrlTransformationStrategyTests
private ArmUrlTransformationStrategy armUrlTransformationStrategy = null!;
private UserDelegationKey userDelegationKey = null!;
const string StorageAccountName = "foo";
const string SasToken = "sv=2019-12-12&ss=bfqt&srt=sco&spr=https&st=2023-09-27T17%3A32%3A57Z&se=2023-09-28T17%3A32%3A57Z&sp=rwdlacupx&sig=SIGNATURE";

[TestInitialize]
public void SetUp()
Expand Down Expand Up @@ -56,6 +57,20 @@ public async Task TransformUrlWithStrategyAsync_InvalidBlobStorageUrl_UrlIsRetur
Assert.AreEqual(blobUri.AbsoluteUri, transformedUrl.AbsoluteUri);
}

[TestMethod]
[DataRow($"https://{StorageAccountName}.blob.core.windows.net?{SasToken}")]
[DataRow($"https://{StorageAccountName}.blob.core.windows.net/?{SasToken}")]
[DataRow($"https://{StorageAccountName}.blob.core.windows.net/cont?{SasToken}")]
[DataRow($"https://{StorageAccountName}.blob.core.windows.net/cont/blob?{SasToken}")]
public async Task TransformUrlWithStrategyAsync_BlobStorageUrlWithSasToken_UrlIsReturnAsIs(string sourceUrl)
{
var transformedUrl = await armUrlTransformationStrategy.TransformUrlWithStrategyAsync(sourceUrl, BlobSasPermissions.Read);

Assert.IsNotNull(transformedUrl);
var blobUri = new Uri(sourceUrl);
Assert.AreEqual(blobUri.AbsoluteUri, transformedUrl.AbsoluteUri);
}

[TestMethod]
public async Task TransformUrlWithStrategyAsync_CallTwiceForSameStorageAccount_CachesKey()
{
Expand Down
2 changes: 1 addition & 1 deletion src/Tes.Runner.Test/Storage/FileOperationResolverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void SetUp()
{
Path = "/foo/bar",
SourceUrl = "https://foo.bar/cont?sig=sasToken",
SasStrategy = TransformationStrategy.None,
TransformationStrategy = TransformationStrategy.None,
};

singleFileOutput = new FileOutput
Expand Down
64 changes: 64 additions & 0 deletions src/Tes.Runner/Authentication/CredentialsManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Azure.Core;
using Azure.Identity;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using Tes.Runner.Models;
using Tes.Runner.Transfer;

namespace Tes.Runner.Authentication
{
public class CredentialsManager
{
private readonly ILogger logger = PipelineLoggerFactory.Create<CredentialsManager>();

private readonly RetryPolicy retryPolicy;
private const int MaxRetryCount = 5;
private const int ExponentialBackOffExponent = 2;

public CredentialsManager()
{
retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetry(MaxRetryCount,
SleepDurationHandler);
}

private TimeSpan SleepDurationHandler(int attempt)
{
logger.LogInformation($"Attempt {attempt} to get token credential");
var duration = TimeSpan.FromSeconds(Math.Pow(ExponentialBackOffExponent, attempt));
logger.LogInformation($"Waiting {duration} before retrying");
return duration;
}

public TokenCredential GetTokenCredential(RuntimeOptions runtimeOptions)
{
return retryPolicy.Execute(() => GetTokenCredentialImpl(runtimeOptions));
}
private TokenCredential GetTokenCredentialImpl(RuntimeOptions runtimeOptions)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a blank line here

{
try
{
if (!string.IsNullOrWhiteSpace(runtimeOptions.NodeManagedIdentityResourceId))
{
logger.LogInformation($"Token credentials with Managed Identity and resource ID: {runtimeOptions.NodeManagedIdentityResourceId}");

return new ManagedIdentityCredential(new ResourceIdentifier(runtimeOptions.NodeManagedIdentityResourceId));
}

logger.LogInformation("Token credentials with DefaultAzureCredential");

return new DefaultAzureCredential();
}
catch (Exception e)
{
logger.LogError(e, "Failed to get token credential");
throw;
}
}
}
}
33 changes: 33 additions & 0 deletions src/Tes.Runner/Docker/ConsoleStreamLogReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Tes.Runner.Docker;

public class ConsoleStreamLogReader : MultiplexedStreamLogReader
{

public ConsoleStreamLogReader()
{

}

public override Task AppendStandardOutputAsync(string data)
{
Console.Write(data);
return Task.CompletedTask;
}

public override Task AppendStandardErrAsync(string data)
{
Console.Write(data);
return Task.CompletedTask;
}

public override void OnComplete(Exception? err)
{
if (err != null)
{
Console.Write(err.ToString());
}
}
}
2 changes: 1 addition & 1 deletion src/Tes.Runner/Docker/ContainerExecutionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

namespace Tes.Runner.Docker
{
public record ContainerExecutionResult(string Id, string? Error, long StatusCode, MultiplexedStream Logs);
public record ContainerExecutionResult(string Id, string? Error, long StatusCode);
}
66 changes: 48 additions & 18 deletions src/Tes.Runner/Docker/DockerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
using Docker.DotNet;
using Docker.DotNet.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Tes.ApiClients;
using Tes.ApiClients.Options;
using Tes.Runner.Transfer;

namespace Tes.Runner.Docker
Expand All @@ -13,32 +16,43 @@ public class DockerExecutor
private readonly IDockerClient dockerClient;
private readonly ILogger logger = PipelineLoggerFactory.Create<DockerExecutor>();
private readonly NetworkUtility networkUtility = new NetworkUtility();
private readonly RetryHandler retryHandler = new RetryHandler(Options.Create(new RetryPolicyOptions()));
private readonly MultiplexedStreamLogReader multiplexedStreamLogReader;

const int LogStreamingMaxWaitTimeInSeconds = 30;

public DockerExecutor(Uri dockerHost)
{
dockerClient = new DockerClientConfiguration(dockerHost)
.CreateClient();
multiplexedStreamLogReader = new ConsoleStreamLogReader();
}

public async Task<ContainerExecutionResult> RunOnContainerAsync(string? imageName, string? tag, List<string>? commandsToExecute, List<string>? volumeBindings)
public async Task<ContainerExecutionResult> RunOnContainerAsync(string? imageName, string? tag, List<string>? commandsToExecute, List<string>? volumeBindings, string? workingDir)
{
ArgumentException.ThrowIfNullOrEmpty(imageName);
ArgumentException.ThrowIfNullOrEmpty(tag);
ArgumentNullException.ThrowIfNull(commandsToExecute);

await PullImageAsync(imageName, tag);
await PullImageWithRetriesAsync(imageName, tag);

await ConfigureNetworkAsync();

var createResponse = await CreateContainerAsync(imageName, commandsToExecute, volumeBindings);
var createResponse = await CreateContainerAsync(imageName, tag, commandsToExecute, volumeBindings, workingDir);

var logs = await StartContainerWithStreamingOutput(createResponse);

logger.LogInformation("Starting to read output from container");

multiplexedStreamLogReader.StartReadingFromLogStream(logs);

var runResponse = await dockerClient.Containers.WaitContainerAsync(createResponse.ID);

return new ContainerExecutionResult(createResponse.ID, runResponse.Error?.Message, runResponse.StatusCode, logs);
await multiplexedStreamLogReader.WaitUntilAsync(TimeSpan.FromSeconds(LogStreamingMaxWaitTimeInSeconds));

return new ContainerExecutionResult(createResponse.ID, runResponse.Error?.Message, runResponse.StatusCode);
}


private async Task<MultiplexedStream> StartContainerWithStreamingOutput(CreateContainerResponse createResponse)
{
var logs = await StreamStdOutAndErrorAsync(createResponse.ID);
Expand All @@ -61,34 +75,50 @@ private async Task<MultiplexedStream> StreamStdOutAndErrorAsync(string container
});
}

private async Task<CreateContainerResponse> CreateContainerAsync(string imageName, List<string> commandsToExecute, List<string>? volumeBindings)
private async Task<CreateContainerResponse> CreateContainerAsync(string imageName, string? imageTag,
List<string> commandsToExecute, List<string>? volumeBindings, string? workingDir)
{
var imageWithTag = ToImageNameWithTag(imageName, imageTag);
logger.LogInformation($"Creating container with image name: {imageWithTag}");

var createResponse = await dockerClient.Containers.CreateContainerAsync(
new CreateContainerParameters
{
Image = imageName,
Entrypoint = commandsToExecute,
Image = imageWithTag,
Cmd = commandsToExecute,
AttachStdout = true,
AttachStderr = true,
WorkingDir = "/",
WorkingDir = workingDir,
HostConfig = new HostConfig
{
AutoRemove = true,
Binds = volumeBindings
}
});
return createResponse;
}

private async Task PullImageAsync(string imageName, string tag, AuthConfig? authConfig = null)
private static string ToImageNameWithTag(string imageName, string? imageTag)
{
await dockerClient.Images.CreateImageAsync(
new ImagesCreateParameters()
{
FromImage = imageName,
Tag = tag
},
authConfig,
new Progress<JSONMessage>(message => logger.LogInformation(message.Status)));
if (string.IsNullOrWhiteSpace(imageTag))
{
return imageName;
}

return $"{imageName}:{imageTag}";
}

private async Task PullImageWithRetriesAsync(string imageName, string? tag, AuthConfig? authConfig = null)
{
logger.LogInformation($"Pulling image name: {imageName} image tag: {tag}");

await retryHandler.AsyncRetryPolicy.ExecuteAsync(async () =>
{
await dockerClient.Images.CreateImageAsync(
new ImagesCreateParameters() { FromImage = imageName, Tag = tag },
authConfig,
new Progress<JSONMessage>(message => logger.LogDebug(message.Status)));
});
}

/// <summary>
Expand Down
Loading
Loading