diff --git a/sync/__init__.py b/sync/__init__.py index 974f2cd..916984a 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,4 +1,4 @@ """Library for leveraging the power of Sync""" -__version__ = "1.0.2" +__version__ = "1.0.3" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" diff --git a/sync/awsdatabricks.py b/sync/awsdatabricks.py index 3af7f1c..7cd7792 100644 --- a/sync/awsdatabricks.py +++ b/sync/awsdatabricks.py @@ -2,7 +2,7 @@ import logging from pathlib import Path from time import sleep -from typing import List, Tuple +from typing import Generator, List, Tuple from urllib.parse import urlparse import boto3 as boto @@ -481,28 +481,39 @@ def _get_ebs_volumes_for_instances( ) -> List[dict]: """Get all ebs volumes associated with a list of instance reservations""" + def get_chunk(instance_ids: list[str], chunk_size: int) -> Generator[list[str]]: + """ + Splits the instance_ids list into chunks of size determined by chunk_size. + This function exists to respect thresholds required by the call to + ec2_client.describe_volumes below. + """ + for idx in range(0, len(instance_ids), chunk_size): + yield instance_ids[idx : idx + chunk_size] + instance_ids = [] if instances: for instance in instances: instance_ids.append(instance.get("InstanceId")) volumes = [] - if instance_ids: - filters = [ - {"Name": "tag:Vendor", "Values": ["Databricks"]}, - {"Name": "attachment.instance-id", "Values": instance_ids}, - ] - - response = ec2_client.describe_volumes(Filters=filters) - volumes = response.get("Volumes", []) - next_token = response.get("NextToken") + MAX_CHUNK_SIZE = 199 - while next_token: - response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) - volumes += response.get("Volumes", []) + if instance_ids: + for chunk in get_chunk(instance_ids, MAX_CHUNK_SIZE): + filters = [ + {"Name": "tag:Vendor", "Values": ["Databricks"]}, + {"Name": "attachment.instance-id", "Values": chunk}, + ] + + response = ec2_client.describe_volumes(Filters=filters) + volumes = response.get("Volumes", []) next_token = response.get("NextToken") + while next_token: + response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) + volumes += response.get("Volumes", []) + next_token = response.get("NextToken") + num_vol = len(volumes) logger.info(f"Identified {num_vol} ebs volumes in cluster") - return volumes