|
2 | 2 | import logging |
3 | 3 | from pathlib import Path |
4 | 4 | from time import sleep |
5 | | -from typing import List, Tuple |
| 5 | +from typing import Generator, List, Tuple |
6 | 6 | from urllib.parse import urlparse |
7 | 7 |
|
8 | 8 | import boto3 as boto |
@@ -481,28 +481,39 @@ def _get_ebs_volumes_for_instances( |
481 | 481 | ) -> List[dict]: |
482 | 482 | """Get all ebs volumes associated with a list of instance reservations""" |
483 | 483 |
|
| 484 | + def get_chunk(instance_ids: list[str], chunk_size: int) -> Generator[list[str]]: |
| 485 | + """ |
| 486 | + Splits the instance_ids list into chunks of size determined by chunk_size. |
| 487 | + This function exists to respect thresholds required by the call to |
| 488 | + ec2_client.describe_volumes below. |
| 489 | + """ |
| 490 | + for idx in range(0, len(instance_ids), chunk_size): |
| 491 | + yield instance_ids[idx : idx + chunk_size] |
| 492 | + |
484 | 493 | instance_ids = [] |
485 | 494 | if instances: |
486 | 495 | for instance in instances: |
487 | 496 | instance_ids.append(instance.get("InstanceId")) |
488 | 497 |
|
489 | 498 | volumes = [] |
490 | | - if instance_ids: |
491 | | - filters = [ |
492 | | - {"Name": "tag:Vendor", "Values": ["Databricks"]}, |
493 | | - {"Name": "attachment.instance-id", "Values": instance_ids}, |
494 | | - ] |
495 | | - |
496 | | - response = ec2_client.describe_volumes(Filters=filters) |
497 | | - volumes = response.get("Volumes", []) |
498 | | - next_token = response.get("NextToken") |
| 499 | + MAX_CHUNK_SIZE = 199 |
499 | 500 |
|
500 | | - while next_token: |
501 | | - response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) |
502 | | - volumes += response.get("Volumes", []) |
| 501 | + if instance_ids: |
| 502 | + for chunk in get_chunk(instance_ids, MAX_CHUNK_SIZE): |
| 503 | + filters = [ |
| 504 | + {"Name": "tag:Vendor", "Values": ["Databricks"]}, |
| 505 | + {"Name": "attachment.instance-id", "Values": chunk}, |
| 506 | + ] |
| 507 | + |
| 508 | + response = ec2_client.describe_volumes(Filters=filters) |
| 509 | + volumes = response.get("Volumes", []) |
503 | 510 | next_token = response.get("NextToken") |
504 | 511 |
|
| 512 | + while next_token: |
| 513 | + response = ec2_client.describe_volumes(Filters=filters, NextToken=next_token) |
| 514 | + volumes += response.get("Volumes", []) |
| 515 | + next_token = response.get("NextToken") |
| 516 | + |
505 | 517 | num_vol = len(volumes) |
506 | 518 | logger.info(f"Identified {num_vol} ebs volumes in cluster") |
507 | | - |
508 | 519 | return volumes |
0 commit comments