From 948befeaaf7bdb63f63a8106cf5258329b5fe26e Mon Sep 17 00:00:00 2001 From: Aditya Jaishankar Date: Wed, 14 Feb 2024 12:42:26 -0500 Subject: [PATCH 1/3] add chunking to describe volumes call --- sync/awsdatabricks.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/sync/awsdatabricks.py b/sync/awsdatabricks.py index a243916..04c00e1 100644 --- a/sync/awsdatabricks.py +++ b/sync/awsdatabricks.py @@ -2,7 +2,7 @@ import logging from pathlib import Path from time import sleep -from typing import List, Tuple +from typing import Iterator, List, Tuple from urllib.parse import urlparse import boto3 as boto @@ -481,28 +481,39 @@ def _get_ebs_volumes_for_instances( ) -> List[dict]: """Get all ebs volumes associated with a list of instance reservations""" + def get_chunk(instance_ids: list, chunk_size: int) -> Iterator[list]: + """ + Splits the instance_ids list into chunks of size determined by chunk_size. + This function exists to respect thresholds required by the call to + ec2_client.describe_volumes below. + """ + for idx in range(0, len(instance_ids), chunk_size): + yield instance_ids[idx : idx + chunk_size] + instance_ids = [] if instances: for instance in instances: instance_ids.append(instance.get("InstanceId")) volumes = [] - if instance_ids: - filters = [ - {"Name": "tag:Vendor", "Values": ["Databricks"]}, - {"Name": "attachment.instance-id", "Values": instance_ids}, - ] - - response = ec2_client.describe_volumes(Filters=filters) - volumes = response.get("Volumes", []) - next_token = response.get("NextToken") + MAX_CHUNK_SIZE = 199 - while next_token: - response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) - volumes += response.get("Volumes", []) + if instance_ids: + for chunk in get_chunk(instance_ids, MAX_CHUNK_SIZE): + filters = [ + {"Name": "tag:Vendor", "Values": ["Databricks"]}, + {"Name": "attachment.instance-id", "Values": chunk}, + ] + + response = ec2_client.describe_volumes(Filters=filters) + volumes = response.get("Volumes", []) next_token = response.get("NextToken") + while next_token: + response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) + volumes += response.get("Volumes", []) + next_token = response.get("NextToken") + num_vol = len(volumes) logger.info(f"Identified {num_vol} ebs volumes in cluster") - return volumes From 3744f6b17f55f6434d58cf9d502eefbb2f83d6ed Mon Sep 17 00:00:00 2001 From: Aditya Jaishankar Date: Wed, 14 Feb 2024 12:42:44 -0500 Subject: [PATCH 2/3] bump version number --- sync/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/__init__.py b/sync/__init__.py index e03d156..510dcb0 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,4 +1,4 @@ """Library for leveraging the power of Sync""" -__version__ = "1.0.0" +__version__ = "1.0.1" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" From 25a093adf0fe7a19e9c8e7c8c6c012103faadecb Mon Sep 17 00:00:00 2001 From: Aditya Jaishankar Date: Wed, 21 Feb 2024 09:15:43 -0500 Subject: [PATCH 3/3] update type hints --- sync/awsdatabricks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync/awsdatabricks.py b/sync/awsdatabricks.py index 8bc90f8..7cd7792 100644 --- a/sync/awsdatabricks.py +++ b/sync/awsdatabricks.py @@ -2,7 +2,7 @@ import logging from pathlib import Path from time import sleep -from typing import Iterator, List, Tuple +from typing import Generator, List, Tuple from urllib.parse import urlparse import boto3 as boto @@ -481,7 +481,7 @@ def _get_ebs_volumes_for_instances( ) -> List[dict]: """Get all ebs volumes associated with a list of instance reservations""" - def get_chunk(instance_ids: list, chunk_size: int) -> Iterator[list]: + def get_chunk(instance_ids: list[str], chunk_size: int) -> Generator[list[str]]: """ Splits the instance_ids list into chunks of size determined by chunk_size. This function exists to respect thresholds required by the call to