Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Support for retention policies on containers #3501

Merged
merged 27 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f0b6e0c
Support for retention policies on containers
Porges Sep 12, 2023
c50c5c4
Silence analysis
Porges Sep 12, 2023
6c2b4c1
"namespace" metadata
Porges Sep 14, 2023
054a19f
Define retention periods from CLI
Porges Sep 14, 2023
03e1109
Remove additional value from model
Porges Sep 14, 2023
bdab6f0
Unused import
Porges Sep 14, 2023
5fe6658
Add feature flag and perform update concurrently
Porges Sep 14, 2023
97f00c5
ContainerTemplate should be a template-only concept
Porges Sep 14, 2023
1818cb2
Use container_name more consistently
Porges Sep 14, 2023
afa426e
Reformat
Porges Sep 14, 2023
10e2dca
Typing fixes
Porges Sep 14, 2023
0ed0c95
Missing return type
Porges Sep 14, 2023
d7ba687
Update examples
Porges Sep 14, 2023
f7bade6
Bump CLI Python version
Porges Sep 14, 2023
8649e3e
Bump others as well
Porges Sep 14, 2023
8ea092f
quotes
Porges Sep 14, 2023
0844174
Actually need 3.11 for Self
Porges Sep 15, 2023
6f362fe
Revert "Actually need 3.11 for Self"
Porges Sep 15, 2023
1122b21
Forward reference instead of Self
Porges Sep 15, 2023
fc64e60
ugh
Porges Sep 15, 2023
b0744eb
Merge branch 'main' into container-retention-policy
Porges Sep 15, 2023
30a3550
Default-enable retention policy feature flag
Porges Sep 17, 2023
cb560e2
Merge branch 'main' into container-retention-policy
Porges Sep 17, 2023
54434c0
Add additional log method
Porges Sep 18, 2023
17d733c
Renaming
Porges Sep 18, 2023
386aabb
Better logging
Porges Sep 18, 2023
d45c1f3
Merge branch 'main' into container-retention-policy
Porges Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ApiService/ApiService/FeatureFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ public static class FeatureFlagConstants {
public const string EnableBlobRetentionPolicy = "EnableBlobRetentionPolicy";
public const string EnableDryRunBlobRetention = "EnableDryRunBlobRetention";
public const string EnableWorkItemCreation = "EnableWorkItemCreation";
public const string EnableContainerRetentionPolicies = "EnableContainerRetentionPolicies";
Porges marked this conversation as resolved.
Show resolved Hide resolved
}
39 changes: 35 additions & 4 deletions src/ApiService/ApiService/Functions/QueueFileChanges.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Tasks;
using Azure.Core;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -54,14 +55,16 @@ public class QueueFileChanges {
return;
}

var storageAccount = new ResourceIdentifier(topicElement.GetString()!);

try {
// Setting isLastRetryAttempt to false will rethrow any exceptions
// With the intention that the azure functions runtime will handle requeing
// the message for us. The difference is for the poison queue, we're handling the
// requeuing ourselves because azure functions doesn't support retry policies
// for queue based functions.

var result = await FileAdded(fileChangeEvent, isLastRetryAttempt: false);
var result = await FileAdded(storageAccount, fileChangeEvent, isLastRetryAttempt: false);
if (!result.IsOk && result.ErrorV.Code == ErrorCode.ADO_WORKITEM_PROCESSING_DISABLED) {
await RequeueMessage(msg, TimeSpan.FromDays(1));
}
Expand All @@ -71,16 +74,44 @@ public class QueueFileChanges {
}
}

private async Async.Task<OneFuzzResultVoid> FileAdded(JsonDocument fileChangeEvent, bool isLastRetryAttempt) {
private async Async.Task<OneFuzzResultVoid> FileAdded(ResourceIdentifier storageAccount, JsonDocument fileChangeEvent, bool isLastRetryAttempt) {
var data = fileChangeEvent.RootElement.GetProperty("data");
var url = data.GetProperty("url").GetString()!;
var parts = url.Split("/").Skip(3).ToList();

var container = parts[0];
var container = Container.Parse(parts[0]);
var path = string.Join('/', parts.Skip(1));

_log.LogInformation("file added : {Container} - {Path}", container, path);
return await _notificationOperations.NewFiles(Container.Parse(container), path, isLastRetryAttempt);

var (_, result) = await (
ApplyRetentionPolicy(storageAccount, container, path),
_notificationOperations.NewFiles(container, path, isLastRetryAttempt));

return result;
}

private async Async.Task<bool> ApplyRetentionPolicy(ResourceIdentifier storageAccount, Container container, string path) {
if (await _context.FeatureManagerSnapshot.IsEnabledAsync(FeatureFlagConstants.EnableContainerRetentionPolicies)) {
// default retention period can be applied to the container
// if one exists, we will set the expiry date on the newly-created blob, if it doesn't already have one
var account = await _storage.GetBlobServiceClientForAccount(storageAccount);
var containerClient = account.GetBlobContainerClient(container.String);
var containerProps = await containerClient.GetPropertiesAsync();
var retentionPeriod = RetentionPolicyUtils.GetRetentionPeriodFromMetadata(containerProps.Value.Metadata);
if (retentionPeriod.HasValue) {
var blobClient = containerClient.GetBlobClient(path);
var tags = (await blobClient.GetTagsAsync()).Value.Tags;
var expiryDate = DateTime.UtcNow + retentionPeriod.Value;
var tag = RetentionPolicyUtils.CreateExpiryDateTag(DateOnly.FromDateTime(expiryDate));
if (tags.TryAdd(tag.Key, tag.Value)) {
_ = await blobClient.SetTagsAsync(tags);
return true;
}
}
}

return false;
}

private async Async.Task RequeueMessage(string msg, TimeSpan? visibilityTimeout = null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ public NotificationOperations(ILogger<NotificationOperations> log, IOnefuzzConte

}
public async Async.Task<OneFuzzResultVoid> NewFiles(Container container, string filename, bool isLastRetryAttempt) {
var result = OneFuzzResultVoid.Ok;

// We don't want to store file added events for the events container because that causes an infinite loop
if (container == WellKnownContainers.Events) {
return result;
return Result.Ok();
}

var result = OneFuzzResultVoid.Ok;
var notifications = GetNotifications(container);
var hasNotifications = await notifications.AnyAsync();
var reportOrRegression = await _context.Reports.GetReportOrRegression(container, filename, expectReports: hasNotifications);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Microsoft.OneFuzz.Service;
using System.Xml;

namespace Microsoft.OneFuzz.Service;


public interface IRetentionPolicy {
Expand All @@ -21,4 +23,22 @@ public class RetentionPolicyUtils {
}

public static string CreateExpiredBlobTagFilter() => $@"""{EXPIRY_TAG}"" <= '{DateOnly.FromDateTime(DateTime.UtcNow)}'";

// NB: this must match the value used on the CLI side
public const string RETENTION_KEY = "onefuzz_retentionperiod";

public static TimeSpan? GetRetentionPeriodFromMetadata(IDictionary<string, string>? containerMetadata) {
if (containerMetadata is not null &&
containerMetadata.TryGetValue(RETENTION_KEY, out var retentionString) &&
!string.IsNullOrWhiteSpace(retentionString)) {
try {
return XmlConvert.ToTimeSpan(retentionString);
} catch {
// Log error: unable to convert xxx
return null;
}
}

return null;
}
}
91 changes: 74 additions & 17 deletions src/cli/onefuzz/templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import tempfile
import zipfile
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4

Expand All @@ -22,6 +23,28 @@ class StoppedEarly(Exception):
pass


class ContainerTemplate:
def __init__(
self,
name: Container,
exists: bool,
*,
retention_period: Optional[timedelta] = None,
):
self.name = name
self.retention_period = retention_period
# TODO: exists is not yet used/checked
self.exists = exists

@staticmethod
def existing(name: Container):
return ContainerTemplate(name, True)

@staticmethod
def fresh(name: Container, *, retention_period: Optional[timedelta] = None):
return ContainerTemplate(name, False, retention_period=retention_period)


class JobHelper:
def __init__(
self,
Expand Down Expand Up @@ -59,7 +82,7 @@ def __init__(

self.wait_for_running: bool = False
self.wait_for_stopped: bool = False
self.containers: Dict[ContainerType, Container] = {}
self.containers: Dict[ContainerType, ContainerTemplate] = {}
self.tags: Dict[str, str] = {"project": project, "name": name, "build": build}
if job is None:
self.onefuzz.versions.check()
Expand All @@ -71,6 +94,20 @@ def __init__(
else:
self.job = job

def add_existing_container(
self, container_type: ContainerType, container: Container
) -> None:
self.containers[container_type] = ContainerTemplate.existing(container)

def container_name(self, container_type: ContainerType) -> Container:
return self.containers[container_type].name

def container_names(self) -> Dict[ContainerType, Container]:
return {
container_type: container.name
for (container_type, container) in self.containers.items()
}

def define_containers(self, *types: ContainerType) -> None:
"""
Define default container set based on provided types
Expand All @@ -79,13 +116,23 @@ def define_containers(self, *types: ContainerType) -> None:
"""

for container_type in types:
self.containers[container_type] = self.onefuzz.utils.build_container_name(
container_name = self.onefuzz.utils.build_container_name(
container_type=container_type,
project=self.project,
name=self.name,
build=self.build,
platform=self.platform,
)
self.containers[container_type] = ContainerTemplate.fresh(
container_name,
retention_period=JobHelper._default_retention_period(container_type),
)

@staticmethod
def _default_retention_period(container_type: ContainerType) -> Optional[timedelta]:
if container_type == ContainerType.crashdumps:
return timedelta(days=90)
return None

def get_unique_container_name(self, container_type: ContainerType) -> Container:
return Container(
Expand All @@ -97,11 +144,17 @@ def get_unique_container_name(self, container_type: ContainerType) -> Container:
)

def create_containers(self) -> None:
for container_type, container_name in self.containers.items():
self.logger.info("using container: %s", container_name)
self.onefuzz.containers.create(
container_name, metadata={"container_type": container_type.name}
)
for container_type, container in self.containers.items():
self.logger.info("using container: %s", container.name)
metadata = {"container_type": container_type.name}
if container.retention_period is not None:
# format as ISO8601 period
# NB: this must match the value used on the server side
metadata[
"onefuzz_retentionperiod"
] = f"P{container.retention_period.days}D"

self.onefuzz.containers.create(container.name, metadata=metadata)

def delete_container(self, container_name: Container) -> None:
self.onefuzz.containers.delete(container_name)
Expand All @@ -110,7 +163,7 @@ def setup_notifications(self, config: Optional[NotificationConfig]) -> None:
if not config:
return

containers: List[Container] = []
containers: List[ContainerTemplate] = []
if ContainerType.unique_reports in self.containers:
containers.append(self.containers[ContainerType.unique_reports])
else:
Expand All @@ -121,7 +174,9 @@ def setup_notifications(self, config: Optional[NotificationConfig]) -> None:

for container in containers:
self.logger.info("creating notification config for %s", container)
self.onefuzz.notifications.create(container, config, replace_existing=True)
self.onefuzz.notifications.create(
container.name, config, replace_existing=True
)

def upload_setup(
self,
Expand All @@ -141,33 +196,35 @@ def upload_setup(

self.logger.info("uploading setup dir `%s`" % setup_dir)
self.onefuzz.containers.files.upload_dir(
self.containers[ContainerType.setup], setup_dir
self.containers[ContainerType.setup].name, setup_dir
)
else:
self.logger.info("uploading target exe `%s`" % target_exe)
self.onefuzz.containers.files.upload_file(
self.containers[ContainerType.setup], target_exe
self.containers[ContainerType.setup].name, target_exe
)

pdb_path = os.path.splitext(target_exe)[0] + ".pdb"
if os.path.exists(pdb_path):
pdb_name = os.path.basename(pdb_path)
self.onefuzz.containers.files.upload_file(
self.containers[ContainerType.setup], pdb_path, pdb_name
self.containers[ContainerType.setup].name, pdb_path, pdb_name
)
if setup_files:
for filename in setup_files:
self.logger.info("uploading %s", filename)
self.onefuzz.containers.files.upload_file(
self.containers[ContainerType.setup], filename
self.containers[ContainerType.setup].name, filename
)

def upload_inputs(self, path: Directory, read_only: bool = False) -> None:
self.logger.info("uploading inputs: `%s`" % path)
container_type = ContainerType.inputs
if read_only:
container_type = ContainerType.readonly_inputs
self.onefuzz.containers.files.upload_dir(self.containers[container_type], path)
self.onefuzz.containers.files.upload_dir(
self.containers[container_type].name, path
)

def upload_inputs_zip(self, path: File) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
Expand All @@ -176,7 +233,7 @@ def upload_inputs_zip(self, path: File) -> None:

self.logger.info("uploading inputs from zip: `%s`" % path)
self.onefuzz.containers.files.upload_dir(
self.containers[ContainerType.inputs], Directory(tmp_dir)
self.containers[ContainerType.inputs].name, Directory(tmp_dir)
)

@classmethod
Expand All @@ -195,8 +252,8 @@ def wait_on(
wait_for_files = []

self.to_monitor = {
self.containers[x]: len(
self.onefuzz.containers.files.list(self.containers[x]).files
self.containers[x].name: len(
self.onefuzz.containers.files.list(self.containers[x].name).files
)
for x in wait_for_files
}
Expand Down
22 changes: 11 additions & 11 deletions src/cli/onefuzz/templates/afl.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def basic(

if existing_inputs:
self.onefuzz.containers.get(existing_inputs)
helper.containers[ContainerType.inputs] = existing_inputs
helper.add_existing_container(ContainerType.inputs, existing_inputs)
else:
helper.define_containers(ContainerType.inputs)

Expand All @@ -112,7 +112,7 @@ def basic(
if (
len(
self.onefuzz.containers.files.list(
helper.containers[ContainerType.inputs]
helper.containers[ContainerType.inputs].name
).files
)
== 0
Expand All @@ -131,16 +131,16 @@ def basic(

containers = [
(ContainerType.tools, afl_container),
(ContainerType.setup, helper.containers[ContainerType.setup]),
(ContainerType.crashes, helper.containers[ContainerType.crashes]),
(ContainerType.inputs, helper.containers[ContainerType.inputs]),
(ContainerType.setup, helper.container_name(ContainerType.setup)),
(ContainerType.crashes, helper.container_name(ContainerType.crashes)),
(ContainerType.inputs, helper.container_name(ContainerType.inputs)),
]

if extra_setup_container is not None:
containers.append(
(
ContainerType.extra_setup,
helper.containers[ContainerType.extra_setup],
extra_setup_container,
Porges marked this conversation as resolved.
Show resolved Hide resolved
)
)

Expand Down Expand Up @@ -169,20 +169,20 @@ def basic(
)

report_containers = [
(ContainerType.setup, helper.containers[ContainerType.setup]),
(ContainerType.crashes, helper.containers[ContainerType.crashes]),
(ContainerType.reports, helper.containers[ContainerType.reports]),
(ContainerType.setup, helper.container_name(ContainerType.setup)),
(ContainerType.crashes, helper.container_name(ContainerType.crashes)),
(ContainerType.reports, helper.container_name(ContainerType.reports)),
(
ContainerType.unique_reports,
helper.containers[ContainerType.unique_reports],
helper.container_name(ContainerType.unique_reports),
),
]

if extra_setup_container is not None:
report_containers.append(
(
ContainerType.extra_setup,
helper.containers[ContainerType.extra_setup],
helper.container_name(ContainerType.extra_setup),
)
)

Expand Down
Loading
Loading