Skip to content

Commit 08926f0

Browse files
[aws][fix] Collection S3 cloudwatch metrics in the corresponding regions (#2052)
Signed-off-by: Lukas Lösche <lukas@some.engineering> Co-authored-by: Matthias Veit <aquamatthias@users.noreply.github.com>
1 parent c663eaf commit 08926f0

File tree

1 file changed

+67
-53
lines changed
  • plugins/aws/fix_plugin_aws/resource

1 file changed

+67
-53
lines changed

plugins/aws/fix_plugin_aws/resource/s3.py

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from attr import field
66
from attrs import define
77
from datetime import timedelta
8+
from concurrent.futures import wait as futures_wait
89

910
from fix_plugin_aws.aws_client import AwsClient
10-
from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder, parse_json
11+
from fix_plugin_aws.resource.base import AwsRegion, AwsResource, AwsApiSpec, GraphBuilder, parse_json
1112
from fix_plugin_aws.resource.cloudwatch import (
1213
AwsCloudwatchMetricData,
1314
AwsCloudwatchQuery,
@@ -298,18 +299,25 @@ def add_bucket_location(bck: AwsS3Bucket) -> None:
298299
bucket_location = str(raw_location)
299300
bck.bucket_location = bucket_location
300301

302+
bucket_location_futures = []
303+
buckets = []
301304
for js in json:
302305
if bucket := cls.from_api(js, builder):
306+
buckets.append(bucket)
303307
bucket.set_arn(builder=builder, region="", account="", resource=bucket.safe_name)
304308
builder.add_node(bucket, js)
305-
builder.submit_work(service_name, add_tags, bucket)
306-
builder.submit_work(service_name, add_bucket_encryption, bucket)
307-
builder.submit_work(service_name, add_bucket_policy, bucket)
308-
builder.submit_work(service_name, add_bucket_versioning, bucket)
309-
builder.submit_work(service_name, add_public_access, bucket)
310-
builder.submit_work(service_name, add_acls, bucket)
311-
builder.submit_work(service_name, add_bucket_logging, bucket)
312-
builder.submit_work(service_name, add_bucket_location, bucket)
309+
bucket_location_futures.append(builder.submit_work(service_name, add_bucket_location, bucket))
310+
for bucket in buckets:
311+
builder.submit_work(service_name, add_tags, bucket)
312+
builder.submit_work(service_name, add_bucket_encryption, bucket)
313+
builder.submit_work(service_name, add_bucket_policy, bucket)
314+
builder.submit_work(service_name, add_bucket_versioning, bucket)
315+
builder.submit_work(service_name, add_public_access, bucket)
316+
builder.submit_work(service_name, add_acls, bucket)
317+
builder.submit_work(service_name, add_bucket_logging, bucket)
318+
319+
# wait for all bucket location futures to complete to block collect() before calling collect_usage_metrics()
320+
futures_wait(bucket_location_futures)
313321

314322
def _set_tags(self, client: AwsClient, tags: Dict[str, str]) -> bool:
315323
tag_set = [{"Key": k, "Value": v} for k, v in tags.items()]
@@ -335,52 +343,58 @@ def _get_tags(self, client: AwsClient) -> Dict[str, str]:
335343

336344
@classmethod
337345
def collect_usage_metrics(cls: Type[AwsResource], builder: GraphBuilder) -> None:
338-
s3s = {s3.id: s3 for s3 in builder.nodes(clazz=AwsS3Bucket) if s3.region().id == builder.region.id}
339-
queries = []
340-
delta = timedelta(days=1)
341-
now = builder.created_at
342-
start = now - timedelta(days=2)
343-
344-
for s3_id, s3 in s3s.items():
345-
queries.append(
346-
AwsCloudwatchQuery.create(
347-
metric_name="NumberOfObjects",
348-
namespace="AWS/S3",
349-
period=delta,
350-
ref_id=s3_id,
351-
stat="Average",
352-
unit="Count",
353-
BucketName=s3.name or s3.safe_name,
354-
StorageType="AllStorageTypes",
346+
for region in {
347+
s3_bucket.bucket_location
348+
for s3_bucket in builder.nodes(clazz=AwsS3Bucket)
349+
if s3_bucket.bucket_location is not None
350+
}:
351+
s3s = {s3_bucket.id: s3_bucket for s3_bucket in builder.nodes(clazz=AwsS3Bucket, bucket_location=region)}
352+
queries = []
353+
delta = timedelta(days=1)
354+
now = builder.created_at
355+
start = now - timedelta(days=2)
356+
357+
for s3_id, s3 in s3s.items():
358+
queries.append(
359+
AwsCloudwatchQuery.create(
360+
metric_name="NumberOfObjects",
361+
namespace="AWS/S3",
362+
period=delta,
363+
ref_id=s3_id,
364+
stat="Average",
365+
unit="Count",
366+
BucketName=s3.name or s3.safe_name,
367+
StorageType="AllStorageTypes",
368+
)
355369
)
356-
)
357-
queries.append(
358-
AwsCloudwatchQuery.create(
359-
metric_name="BucketSizeBytes",
360-
namespace="AWS/S3",
361-
period=delta,
362-
ref_id=s3_id,
363-
stat="Average",
364-
unit="Bytes",
365-
BucketName=s3.name or s3.safe_name,
366-
StorageType="StandardStorage",
370+
queries.append(
371+
AwsCloudwatchQuery.create(
372+
metric_name="BucketSizeBytes",
373+
namespace="AWS/S3",
374+
period=delta,
375+
ref_id=s3_id,
376+
stat="Average",
377+
unit="Bytes",
378+
BucketName=s3.name or s3.safe_name,
379+
StorageType="StandardStorage",
380+
)
367381
)
368-
)
369-
metric_normalizers = {
370-
"BucketSizeBytes": MetricNormalization(
371-
metric_name=MetricName.BucketSizeBytes,
372-
unit=MetricUnit.Bytes,
373-
normalize_value=lambda x: round(x, ndigits=4),
374-
),
375-
"NumberOfObjects": MetricNormalization(
376-
metric_name=MetricName.NumberOfObjects,
377-
unit=MetricUnit.Count,
378-
normalize_value=lambda x: round(x, ndigits=4),
379-
),
380-
}
381-
382-
cloudwatch_result = AwsCloudwatchMetricData.query_for(builder, queries, start, now)
383-
update_resource_metrics(s3s, cloudwatch_result, metric_normalizers)
382+
metric_normalizers = {
383+
"BucketSizeBytes": MetricNormalization(
384+
metric_name=MetricName.BucketSizeBytes,
385+
unit=MetricUnit.Bytes,
386+
normalize_value=lambda x: round(x, ndigits=4),
387+
),
388+
"NumberOfObjects": MetricNormalization(
389+
metric_name=MetricName.NumberOfObjects,
390+
unit=MetricUnit.Count,
391+
normalize_value=lambda x: round(x, ndigits=4),
392+
),
393+
}
394+
395+
region_builder = builder.for_region(AwsRegion(id=region, name=region))
396+
cloudwatch_result = AwsCloudwatchMetricData.query_for(region_builder, queries, start, now)
397+
update_resource_metrics(s3s, cloudwatch_result, metric_normalizers)
384398

385399
def update_resource_tag(self, client: AwsClient, key: str, value: str) -> bool:
386400
tags = self._get_tags(client)

0 commit comments

Comments
 (0)