From 3c22931a5fa24f9fa5fa89797f03787fefb45f6d Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:21:39 +0000 Subject: [PATCH 01/26] attempting aws auth propagation --- skyplane/api/client.py | 13 ++++++++++++- skyplane/api/pipeline.py | 10 ++++++++-- skyplane/api/transfer_job.py | 8 ++++++-- skyplane/compute/aws/aws_cloud_provider.py | 4 ++-- skyplane/compute/aws/aws_server.py | 4 ++-- 5 files changed, 30 insertions(+), 9 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index a1a43143a..78cb0c6a0 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -93,8 +93,19 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ + provider_src, bucket_src, _ = parse_path(self.src) + src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) - pipeline = self.pipeline(max_instances=max_instances, debug=debug) + if isinstance(dst, str): + provider_dst, bucket_dst, _ = parse_path(self.dst) + dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)] + else: + dst_ifaces = [] + for path in self.dst: + provider_dst, bucket_dst, _ = parse_path(path) + dst_ifaces.append(StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)) + + pipeline = self.pipeline(max_instances=max_instances, debug=debug, src_iface = src_iface, dst_ifaces=dst_ifaces) pipeline.queue_copy(src, dst, recursive=recursive) pipeline.start(progress=True) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 6fa3face4..3099a0f5c 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -33,6 +33,8 @@ def __init__( max_instances: Optional[int] = 1, n_connections: Optional[int] = 64, planning_algorithm: Optional[str] = "direct", + src_iface: Optional[ObjectStoreInterface] = None, + dst_ifaces: Optional[List[ObjectStoreInterface]] = None, debug: Optional[bool] = False, ): """ @@ -70,6 +72,10 @@ def __init__( else: raise ValueError(f"No such planning algorithm {planning_algorithm}") + # obj store interfaces + self.src_iface = src_iface + self.dst_ifaces = dst_ifaces + # transfer logs self.transfer_dir = tmp_log_dir / "transfer_logs" / datetime.now().strftime("%Y%m%d_%H%M%S") self.transfer_dir.mkdir(exist_ok=True, parents=True) @@ -146,7 +152,7 @@ def queue_copy( """ if isinstance(dst, str): dst = [dst] - job = CopyJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays) + job = CopyJob(src, dst, recursive, requester_pays=self.transfer_config.requester_pays, src_iface=self.src_iface, dst_ifaces=self.dst_ifaces) logger.fs.debug(f"[SkyplaneClient] Queued copy job {job}") self.jobs_to_dispatch.append(job) return job.uuid @@ -169,7 +175,7 @@ def queue_sync( """ if isinstance(dst, str): dst = [dst] - job = SyncJob(src, dst, requester_pays=self.transfer_config.requester_pays) + job = SyncJob(src, dst, requester_pays=self.transfer_config.requester_pays, src_iface=self.src_iface, dst_ifaces=self.dst_ifaces) logger.fs.debug(f"[SkyplaneClient] Queued sync job {job}") self.jobs_to_dispatch.append(job) return job.uuid diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index cf3322985..96f725df1 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -468,6 +468,8 @@ def __init__( recursive: bool = False, requester_pays: bool = False, job_id: Optional[str] = None, + src_iface: Optional[ObjectStoreInterface] = None, + dst_ifaces: Optional[List[ObjectStoreInterface]] = None ): self.src_path = src_path self.dst_paths = dst_paths @@ -477,6 +479,8 @@ def __init__( self.uuid = str(uuid.uuid4()) else: self.uuid = job_id + self._src_iface = src_iface + self._dst_ifaces = dst_ifaces @property def transfer_type(self) -> str: @@ -495,7 +499,7 @@ def src_prefix(self) -> Optional[str]: @property def src_iface(self) -> StorageInterface: """Return the source object store interface""" - if not hasattr(self, "_src_iface"): + if not self._src_iface: provider_src, bucket_src, _ = parse_path(self.src_path) self._src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src) if self.requester_pays: @@ -515,7 +519,7 @@ def dst_prefixes(self) -> List[str]: @property def dst_ifaces(self) -> List[StorageInterface]: """Return the destination object store interface""" - if not hasattr(self, "_dst_iface"): + if not self._dst_ifaces: if self.transfer_type == "unicast": provider_dst, bucket_dst, _ = parse_path(self.dst_paths[0]) self._dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst)] diff --git a/skyplane/compute/aws/aws_cloud_provider.py b/skyplane/compute/aws/aws_cloud_provider.py index d82733f63..0613dd967 100644 --- a/skyplane/compute/aws/aws_cloud_provider.py +++ b/skyplane/compute/aws/aws_cloud_provider.py @@ -56,7 +56,7 @@ def get_instance_list(exceptions, self, region: str) -> List[AWSServer]: except exceptions.ClientError as e: logger.error(f"error provisioning in {region}: {e}") return [] - return [AWSServer(f"aws:{region}", i) for i in instance_ids] + return [AWSServer(f"aws:{region}", i, auth=self.auth) for i in instance_ids] def setup_global(self, iam_name: str = "skyplane_gateway", attach_policy_arn: Optional[str] = None): # Create IAM role if it doesn't exist and grant managed role if given. @@ -246,4 +246,4 @@ def start_instance(subnet_id: str): logger.fs.warning(f"Terminating instance {instance[0].id} due to keyboard interrupt") instance[0].terminate() raise - return AWSServer(f"aws:{region}", instance[0].id) + return AWSServer(f"aws:{region}", instance[0].id, auth=self.auth) diff --git a/skyplane/compute/aws/aws_server.py b/skyplane/compute/aws/aws_server.py index a2c1d2a22..5161b8b1b 100644 --- a/skyplane/compute/aws/aws_server.py +++ b/skyplane/compute/aws/aws_server.py @@ -21,10 +21,10 @@ class AWSServer(Server): """AWS Server class to support basic SSH operations""" - def __init__(self, region_tag, instance_id, log_dir=None): + def __init__(self, region_tag, instance_id, log_dir=None, auth: Optional[compute.AWSAuthentication] = None): super().__init__(region_tag, log_dir=log_dir) assert self.region_tag.split(":")[0] == "aws" - self.auth = AWSAuthentication() + self.auth = AWSAuthentication() if auth is None else auth self.key_manager = AWSKeyManager(self.auth) self.aws_region = self.region_tag.split(":")[1] self.instance_id = instance_id From 160cbd60ac2ab5b7b73cc23b5ad9b80838d50425 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:29:55 +0000 Subject: [PATCH 02/26] added option to disable clouds --- skyplane/api/client.py | 8 ++++++++ skyplane/api/provisioner.py | 24 ++++++++++++++++-------- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 78cb0c6a0..9f0d5ebe0 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -28,6 +28,10 @@ def __init__( ibmcloud_config: Optional["IBMCloudConfig"] = None, transfer_config: Optional[TransferConfig] = None, log_dir: Optional[str] = None, + disable_aws: False, + disable_azure: False, + disable_gcp: False, + disable_ibm: False ): """ :param aws_config: aws cloud configurations @@ -66,6 +70,10 @@ def __init__( azure_auth=self.azure_auth, gcp_auth=self.gcp_auth, ibmcloud_auth=self.ibmcloud_auth, + disable_aws=disable_aws, + disable_azure=disable_azure, + disable_gcp=disable_gcp, + disable_ibm=disable_ibm ) def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, debug=False): diff --git a/skyplane/api/provisioner.py b/skyplane/api/provisioner.py index 09e89cf52..52bf5fc3d 100644 --- a/skyplane/api/provisioner.py +++ b/skyplane/api/provisioner.py @@ -52,6 +52,10 @@ def __init__( gcp_auth: Optional[compute.GCPAuthentication] = None, host_uuid: Optional[str] = None, ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None, + disable_aws: False, + disable_azure: False, + disable_gcp: False, + disable_ibm: False, ): """ :param aws_auth: authentication information for aws @@ -70,7 +74,7 @@ def __init__( self.gcp_auth = gcp_auth self.host_uuid = host_uuid self.ibmcloud_auth = ibmcloud_auth - self._make_cloud_providers() + self._make_cloud_providers(disable_aws, disable_azure, disable_gcp, disable_ibm) self.temp_nodes: Set[compute.Server] = set() # temporary area to store nodes that should be terminated upon exit self.pending_provisioner_tasks: List[ProvisionerTask] = [] self.provisioned_vms: Dict[str, compute.Server] = {} @@ -78,13 +82,17 @@ def __init__( # store GCP firewall rules to be deleted upon exit self.gcp_firewall_rules: Set[str] = set() - def _make_cloud_providers(self): - self.aws = compute.AWSCloudProvider( - key_prefix=f"skyplane{'-'+self.host_uuid.replace('-', '') if self.host_uuid else ''}", auth=self.aws_auth - ) - self.azure = compute.AzureCloudProvider(auth=self.azure_auth) - self.gcp = compute.GCPCloudProvider(auth=self.gcp_auth) - self.ibmcloud = compute.IBMCloudProvider(auth=self.ibmcloud_auth) + def _make_cloud_providers(self, disable_aws, disable_azure, disable_gcp, disable_ibm): + if not disable_aws: + self.aws = compute.AWSCloudProvider( + key_prefix=f"skyplane{'-'+self.host_uuid.replace('-', '') if self.host_uuid else ''}", auth=self.aws_auth + ) + if not disable_azure: + self.azure = compute.AzureCloudProvider(auth=self.azure_auth) + if not disable_gcp: + self.gcp = compute.GCPCloudProvider(auth=self.gcp_auth) + if not disable_ibm: + self.ibmcloud = compute.IBMCloudProvider(auth=self.ibmcloud_auth) def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ibmcloud: bool = True): """ From 3c1225e8437e2852599ec22ce349f2a1d7bd0d5c Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:39:13 +0000 Subject: [PATCH 03/26] bug fix --- skyplane/compute/aws/aws_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/compute/aws/aws_server.py b/skyplane/compute/aws/aws_server.py index 5161b8b1b..3226e94b7 100644 --- a/skyplane/compute/aws/aws_server.py +++ b/skyplane/compute/aws/aws_server.py @@ -21,7 +21,7 @@ class AWSServer(Server): """AWS Server class to support basic SSH operations""" - def __init__(self, region_tag, instance_id, log_dir=None, auth: Optional[compute.AWSAuthentication] = None): + def __init__(self, region_tag, instance_id, log_dir=None, auth: Optional[AWSAuthentication] = None): super().__init__(region_tag, log_dir=log_dir) assert self.region_tag.split(":")[0] == "aws" self.auth = AWSAuthentication() if auth is None else auth From 6efda7bbc1e51f29d2bd33334b4133e37e2d9dbd Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:42:00 +0000 Subject: [PATCH 04/26] fixing argument ordering --- skyplane/api/client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 9f0d5ebe0..e65344094 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -22,16 +22,16 @@ class SkyplaneClient: def __init__( self, + disable_aws: False, + disable_azure: False, + disable_gcp: False, + disable_ibm: False, aws_config: Optional["AWSConfig"] = None, azure_config: Optional["AzureConfig"] = None, gcp_config: Optional["GCPConfig"] = None, ibmcloud_config: Optional["IBMCloudConfig"] = None, transfer_config: Optional[TransferConfig] = None, - log_dir: Optional[str] = None, - disable_aws: False, - disable_azure: False, - disable_gcp: False, - disable_ibm: False + log_dir: Optional[str] = None ): """ :param aws_config: aws cloud configurations From 92591f5ff3055683379f03bdb7dd351e0c9db702 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:44:49 +0000 Subject: [PATCH 05/26] more bug fixing --- skyplane/api/provisioner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/skyplane/api/provisioner.py b/skyplane/api/provisioner.py index 52bf5fc3d..5b1e7477e 100644 --- a/skyplane/api/provisioner.py +++ b/skyplane/api/provisioner.py @@ -47,15 +47,15 @@ class Provisioner: def __init__( self, + disable_aws: False, + disable_azure: False, + disable_gcp: False, + disable_ibm: False, aws_auth: Optional[compute.AWSAuthentication] = None, azure_auth: Optional[compute.AzureAuthentication] = None, gcp_auth: Optional[compute.GCPAuthentication] = None, host_uuid: Optional[str] = None, ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None, - disable_aws: False, - disable_azure: False, - disable_gcp: False, - disable_ibm: False, ): """ :param aws_auth: authentication information for aws From 6b1252fdcfed2b46389b096b15224af8d3e7029b Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:50:19 +0000 Subject: [PATCH 06/26] bugs --- skyplane/api/pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 3099a0f5c..ab5d7a600 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -10,6 +10,7 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig +from skyplane.obj_store.obj_store import ObjectStore from skyplane.planner.planner import MulticastDirectPlanner, DirectPlannerSourceOneSided, DirectPlannerDestOneSided from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger From 0f514428826acec3107a6b5dc40865f10609b2d6 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:52:13 +0000 Subject: [PATCH 07/26] naming error --- skyplane/api/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index ab5d7a600..3013dc104 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -10,7 +10,7 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.obj_store.obj_store import ObjectStore +from skyplane.api.obj_store import ObjectStore from skyplane.planner.planner import MulticastDirectPlanner, DirectPlannerSourceOneSided, DirectPlannerDestOneSided from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger From 191512ab7e84e870b0c1a4c9533ec0174b0782dd Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Thu, 19 Oct 2023 23:56:49 +0000 Subject: [PATCH 08/26] import errors --- skyplane/api/client.py | 2 ++ skyplane/api/pipeline.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index e65344094..523c5f07c 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -7,6 +7,8 @@ from skyplane.api.config import TransferConfig from skyplane.api.provisioner import Provisioner from skyplane.api.obj_store import ObjectStore +from skyplane.obj_store.object_store_interface import ObjectStoreInterface +from skyplane.obj_store.storage_interface import StorageInterface from skyplane.api.usage import get_clientid from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 3013dc104..19aab51fb 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -10,7 +10,7 @@ from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.api.obj_store import ObjectStore +from skyplane.obj_store.object_store_interface import ObjectStoreInterface from skyplane.planner.planner import MulticastDirectPlanner, DirectPlannerSourceOneSided, DirectPlannerDestOneSided from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger From 5d37d85bca99a9e9b6809099bf074e24f2c4816a Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:00:06 +0000 Subject: [PATCH 09/26] fixing args for client --- skyplane/api/client.py | 8 ++++---- skyplane/api/provisioner.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 523c5f07c..31dc333c3 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -24,16 +24,16 @@ class SkyplaneClient: def __init__( self, - disable_aws: False, - disable_azure: False, - disable_gcp: False, - disable_ibm: False, aws_config: Optional["AWSConfig"] = None, azure_config: Optional["AzureConfig"] = None, gcp_config: Optional["GCPConfig"] = None, ibmcloud_config: Optional["IBMCloudConfig"] = None, transfer_config: Optional[TransferConfig] = None, log_dir: Optional[str] = None + disable_aws=False, + disable_azure=False, + disable_gcp=False, + disable_ibm=False ): """ :param aws_config: aws cloud configurations diff --git a/skyplane/api/provisioner.py b/skyplane/api/provisioner.py index 5b1e7477e..c5e9891e3 100644 --- a/skyplane/api/provisioner.py +++ b/skyplane/api/provisioner.py @@ -47,15 +47,15 @@ class Provisioner: def __init__( self, - disable_aws: False, - disable_azure: False, - disable_gcp: False, - disable_ibm: False, aws_auth: Optional[compute.AWSAuthentication] = None, azure_auth: Optional[compute.AzureAuthentication] = None, gcp_auth: Optional[compute.GCPAuthentication] = None, host_uuid: Optional[str] = None, ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None, + disable_aws=False, + disable_azure=False, + disable_gcp=False, + disable_ibm=False ): """ :param aws_auth: authentication information for aws From c232cf4fdca68474097db7a362aaa7ce07993e62 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:01:28 +0000 Subject: [PATCH 10/26] minor bug --- skyplane/api/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 31dc333c3..c5ec95b3f 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -29,7 +29,7 @@ def __init__( gcp_config: Optional["GCPConfig"] = None, ibmcloud_config: Optional["IBMCloudConfig"] = None, transfer_config: Optional[TransferConfig] = None, - log_dir: Optional[str] = None + log_dir: Optional[str] = None, disable_aws=False, disable_azure=False, disable_gcp=False, From af866f9cb34ee40db5d9031690e217364d4f9b0a Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:06:06 +0000 Subject: [PATCH 11/26] argument typing --- skyplane/api/client.py | 8 ++++---- skyplane/api/provisioner.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index c5ec95b3f..8a6082088 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -30,10 +30,10 @@ def __init__( ibmcloud_config: Optional["IBMCloudConfig"] = None, transfer_config: Optional[TransferConfig] = None, log_dir: Optional[str] = None, - disable_aws=False, - disable_azure=False, - disable_gcp=False, - disable_ibm=False + disable_aws: bool = False, + disable_azure: bool = False, + disable_gcp: bool = False, + disable_ibm: bool = False ): """ :param aws_config: aws cloud configurations diff --git a/skyplane/api/provisioner.py b/skyplane/api/provisioner.py index c5e9891e3..49735ef17 100644 --- a/skyplane/api/provisioner.py +++ b/skyplane/api/provisioner.py @@ -52,10 +52,10 @@ def __init__( gcp_auth: Optional[compute.GCPAuthentication] = None, host_uuid: Optional[str] = None, ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None, - disable_aws=False, - disable_azure=False, - disable_gcp=False, - disable_ibm=False + disable_aws: bool = False, + disable_azure: bool = False, + disable_gcp: bool = False, + disable_ibm: bool = False ): """ :param aws_auth: authentication information for aws From 655b2ef3655348d28d27692d378b60477be51550 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:08:29 +0000 Subject: [PATCH 12/26] adding import --- skyplane/api/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 8a6082088..f70550866 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -12,6 +12,7 @@ from skyplane.api.usage import get_clientid from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir +from skyplane.utils.path import parse_path from skyplane.api.pipeline import Pipeline From fd32c1ea10cf1b7166eda4ba51f77c16c113f477 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:15:30 +0000 Subject: [PATCH 13/26] bug in client --- skyplane/api/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index f70550866..54e40f8d4 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -104,15 +104,15 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ - provider_src, bucket_src, _ = parse_path(self.src) + provider_src, bucket_src, _ = parse_path(src) src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) if isinstance(dst, str): - provider_dst, bucket_dst, _ = parse_path(self.dst) + provider_dst, bucket_dst, _ = parse_path(dst) dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)] else: dst_ifaces = [] - for path in self.dst: + for path in dst: provider_dst, bucket_dst, _ = parse_path(path) dst_ifaces.append(StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)) From 52809a0ed2b1657e48b57c790dd4493ea6255b45 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:25:21 +0000 Subject: [PATCH 14/26] trying import before call --- skyplane/api/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 54e40f8d4..6e4894bf2 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -12,7 +12,6 @@ from skyplane.api.usage import get_clientid from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir -from skyplane.utils.path import parse_path from skyplane.api.pipeline import Pipeline @@ -104,6 +103,8 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ + from skyplane.utils.path import parse_path + provider_src, bucket_src, _ = parse_path(src) src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) From f149b422c806e72cd061eddcac746c739fe8f776 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:42:52 +0000 Subject: [PATCH 15/26] trying different import --- skyplane/api/client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 6e4894bf2..372023c92 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -103,18 +103,18 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ - from skyplane.utils.path import parse_path - - provider_src, bucket_src, _ = parse_path(src) + from skyplane.utils import path + + provider_src, bucket_src, _ = path.parse_path(src) src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) if isinstance(dst, str): - provider_dst, bucket_dst, _ = parse_path(dst) + provider_dst, bucket_dst, _ = path.parse_path(dst) dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)] else: dst_ifaces = [] - for path in dst: - provider_dst, bucket_dst, _ = parse_path(path) + for dst_path in dst: + provider_dst, bucket_dst, _ = path.parse_path(dst_path) dst_ifaces.append(StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)) pipeline = self.pipeline(max_instances=max_instances, debug=debug, src_iface = src_iface, dst_ifaces=dst_ifaces) From 4ae66ca379197c3062489f998e1d278bb3d3865f Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 00:55:28 +0000 Subject: [PATCH 16/26] temporary fix --- skyplane/api/client.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 372023c92..72d8a4108 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -103,18 +103,23 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ - from skyplane.utils import path + def parse(path): + provider, parsed = path[:2], path[5:] + bucket, *keys = parsed.split("/", 1) + provider = "aws" if provider == "s3" else "gcp" + return provider, bucket, key + + provider_src, bucket_src, _ = parse(src) - provider_src, bucket_src, _ = path.parse_path(src) src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) if isinstance(dst, str): - provider_dst, bucket_dst, _ = path.parse_path(dst) + provider_dst, bucket_dst, _ = parse(dst) dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)] else: dst_ifaces = [] for dst_path in dst: - provider_dst, bucket_dst, _ = path.parse_path(dst_path) + provider_dst, bucket_dst, _ = parse(dst_path) dst_ifaces.append(StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)) pipeline = self.pipeline(max_instances=max_instances, debug=debug, src_iface = src_iface, dst_ifaces=dst_ifaces) From 891df5361b428f08727c03154e8730ab4d308931 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 01:05:32 +0000 Subject: [PATCH 17/26] attempting revert --- skyplane/api/client.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 72d8a4108..6cd11ad3c 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -12,6 +12,7 @@ from skyplane.api.usage import get_clientid from skyplane.utils import logger from skyplane.utils.definitions import tmp_log_dir +from skyplane.utils.path import parse_path from skyplane.api.pipeline import Pipeline @@ -103,23 +104,17 @@ def copy(self, src: str, dst: str, recursive: bool = False, max_instances: Optio :param max_instances: The maximum number of instances to use per region (default: 1) :type max_instances: int """ - def parse(path): - provider, parsed = path[:2], path[5:] - bucket, *keys = parsed.split("/", 1) - provider = "aws" if provider == "s3" else "gcp" - return provider, bucket, key - - provider_src, bucket_src, _ = parse(src) + provider_src, bucket_src, _ = parse_path(src) src_iface = ObjectStoreInterface.create(f"{provider_src}:infer", bucket_src, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth) if isinstance(dst, str): - provider_dst, bucket_dst, _ = parse(dst) + provider_dst, bucket_dst, _ = parse_path(dst) dst_ifaces = [StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)] else: dst_ifaces = [] for dst_path in dst: - provider_dst, bucket_dst, _ = parse(dst_path) + provider_dst, bucket_dst, _ = parse_path(dst_path) dst_ifaces.append(StorageInterface.create(f"{provider_dst}:infer", bucket_dst, aws_auth=self.aws_auth, azure_auth=self.azure_auth, gcp_auth=self.gcp_auth)) pipeline = self.pipeline(max_instances=max_instances, debug=debug, src_iface = src_iface, dst_ifaces=dst_ifaces) From a7bcac047749b692781d7548b33e00382bc4e51b Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 01:14:14 +0000 Subject: [PATCH 18/26] iface propagation --- skyplane/api/client.py | 4 +++- skyplane/api/transfer_job.py | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 6cd11ad3c..14874f12f 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -79,7 +79,7 @@ def __init__( disable_ibm=disable_ibm ) - def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, debug=False): + def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, src_iface: Optional[ObjectStoreInterface] = None, dst_ifaces: Optional[List[ObjectStoreInterface]] = None, debug=False): """Create a pipeline object to queue jobs""" return Pipeline( planning_algorithm=planning_algorithm, @@ -87,6 +87,8 @@ def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: clientid=self.clientid, provisioner=self.provisioner, transfer_config=self.transfer_config, + src_iface=src_iface, + dst_ifaces=dst_ifaces, debug=debug, ) diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index 96f725df1..acd7f1cdb 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -574,8 +574,10 @@ def __init__( recursive: bool = False, requester_pays: bool = False, job_id: Optional[str] = None, + src_iface: Optional[ObjectStoreInterface] = None, + dst_ifaces: Optional[List[ObjectStoreInterface]] = None ): - super().__init__(src_path, dst_paths, recursive, requester_pays, job_id) + super().__init__(src_path, dst_paths, recursive, requester_pays, job_id, src_iface, dst_ifaces) self.transfer_list = [] self.multipart_transfer_list = [] @@ -776,8 +778,8 @@ def size_gb(self): class SyncJob(CopyJob): """sync job that copies the source objects that does not exist in the destination bucket to the destination""" - def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: bool = False, job_id: Optional[str] = None): - super().__init__(src_path, dst_paths, True, requester_pays, job_id) + def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: bool = False, job_id: Optional[str] = None, src_iface: Optional[ObjectStoreInterface] = None, dst_ifaces: Optional[List[ObjectStoreInterface]] = None): + super().__init__(src_path, dst_paths, True, requester_pays, job_id, src_iface, dst_ifaces) self.transfer_list = [] self.multipart_transfer_list = [] From 2a2fd1d4d289c0de10aed86fed8fa02310dae435 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 01:17:03 +0000 Subject: [PATCH 19/26] list import --- skyplane/api/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 14874f12f..d09f1b513 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -2,7 +2,7 @@ import typer from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional from skyplane.api.config import TransferConfig from skyplane.api.provisioner import Provisioner From 23b14c966ba9ca421d712a6b143f048159a105b4 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 01:25:10 +0000 Subject: [PATCH 20/26] config path safeguard --- skyplane/planner/planner.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index 2bb21e0ac..d716a4f9d 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -28,9 +28,12 @@ class Planner: - def __init__(self, transfer_config: TransferConfig, quota_limits_file: Optional[str] = None): + def __init__(self, transfer_config: TransferConfig, config: Optional[SkyplaneConfig] = None, quota_limits_file: Optional[str] = None): self.transfer_config = transfer_config - self.config = SkyplaneConfig.load_config(config_path) + if config_path.exists(): + self.config = SkyplaneConfig.load_config(config_path) + else: + self.config = SkyplaneConfig.default_config() self.n_instances = self.config.get_flag("max_instances") # Loading the quota information, add ibm cloud when it is supported From 654ca94a26d5f030d9a76b5c14afca080946fb70 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 04:52:52 +0000 Subject: [PATCH 21/26] attempting to write config file on client creation --- skyplane/api/client.py | 32 ++++++++++++++++++++++++++++++++ skyplane/config.py | 12 ++++++++++++ 2 files changed, 44 insertions(+) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index d09f1b513..3d654171c 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -7,6 +7,7 @@ from skyplane.api.config import TransferConfig from skyplane.api.provisioner import Provisioner from skyplane.api.obj_store import ObjectStore +from skyplane.config_paths import config_path from skyplane.obj_store.object_store_interface import ObjectStoreInterface from skyplane.obj_store.storage_interface import StorageInterface from skyplane.api.usage import get_clientid @@ -79,6 +80,37 @@ def __init__( disable_ibm=disable_ibm ) + self.config = SkyplaneConfig() + if not disable_aws: + self.config.aws_enabled = True + if aws_config: + self.config.aws_access_key = aws_config.aws_access_key + self.config.aws_secret_key = aws_config.aws_secret_key + if not disable_azure: + self.config.azure_enabled = True + if azure_config: + self.config.azure_subscription_id=azure_config.azure_subscription_id + self.config.azure_resource_group=azure_config.azure_resource_group + self.config.azure_principal_id=azure_config.azure_umi_id + self.config.azure_umi_name=azure_config.azure_umi_name + self.config.azure_client_id=azure_config.azure_umi_client_id + if not disable_gcp: + self.config.gcp_enabled = True + if gcp_config: + self.config.gcp_project_id=gcp_config.gcp_project_id + if not disable_ibm: + self.config.ibm_enabled = True + if ibm_config: + self.config.ibmcloud_access_id=ibm_config.ibmcloud_access_id + self.config.ibmcloud_secret_key=ibm_config.ibmcloud_secret_key + self.config.ibmcloud_iam_key=ibm_config.ibmcloud_iam_key + self.config.ibmcloud_iam_endpoint=ibm_config.ibmcloud_iam_endpoint + self.config.ibmcloud_useragent=ibm_config.ibmcloud_useragent + self.config.ibmcloud_resource_group_id=ibm_config.ibmcloud_resource_group_id + + self.config.to_config_file(config_path) + typer.secho(f"\nConfig file saved to {config_path}", fg="green") + def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, src_iface: Optional[ObjectStoreInterface] = None, dst_ifaces: Optional[List[ObjectStoreInterface]] = None, debug=False): """Create a pipeline object to queue jobs""" return Pipeline( diff --git a/skyplane/config.py b/skyplane/config.py index f1210d8a9..8e5fecd19 100644 --- a/skyplane/config.py +++ b/skyplane/config.py @@ -104,6 +104,8 @@ class SkyplaneConfig: cloudflare_enabled: bool ibmcloud_enabled: bool anon_clientid: str + aws_access_key: Optional[str] = None + aws_secret_key: Optional[str] = None azure_principal_id: Optional[str] = None azure_subscription_id: Optional[str] = None azure_resource_group: Optional[str] = None @@ -151,6 +153,10 @@ def load_config(cls, path) -> "SkyplaneConfig": if "aws" in config: if "aws_enabled" in config["aws"]: aws_enabled = config.getboolean("aws", "aws_enabled") + if "aws_access_key" in config["aws"]: + aws_access_key = config.get("aws", "aws_access_key") + if "aws_secret_key" in config["aws"]: + aws_secret_key = config.getboolean("aws", "aws_secret_key") azure_enabled = False azure_subscription_id = None @@ -215,6 +221,8 @@ def load_config(cls, path) -> "SkyplaneConfig": gcp_enabled=gcp_enabled, ibmcloud_enabled=ibmcloud_enabled, cloudflare_enabled=cloudflare_enabled, + aws_access_key=aws_access_key, + aws_secret_key=aws_secret_key, anon_clientid=anon_clientid, azure_principal_id=azure_principal_id, azure_subscription_id=azure_subscription_id, @@ -248,6 +256,10 @@ def to_config_file(self, path): if "aws" not in config: config.add_section("aws") config.set("aws", "aws_enabled", str(self.aws_enabled)) + if self.aws_access_key: + config.set("aws", "aws_access_key", self.aws_access_key) + if self.aws_secret_key: + config.set("aws", "aws_secret_key", self.aws_secret_key) if "ibmcloud" not in config: config.add_section("ibmcloud") From aaacaf5ff20dbef9fc7d30838c9f2a946c00da9b Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 04:55:41 +0000 Subject: [PATCH 22/26] import error --- skyplane/api/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/skyplane/api/client.py b/skyplane/api/client.py index 3d654171c..d597fc432 100644 --- a/skyplane/api/client.py +++ b/skyplane/api/client.py @@ -7,6 +7,7 @@ from skyplane.api.config import TransferConfig from skyplane.api.provisioner import Provisioner from skyplane.api.obj_store import ObjectStore +from skyplane.config import SkyplaneConfig from skyplane.config_paths import config_path from skyplane.obj_store.object_store_interface import ObjectStoreInterface from skyplane.obj_store.storage_interface import StorageInterface @@ -80,7 +81,7 @@ def __init__( disable_ibm=disable_ibm ) - self.config = SkyplaneConfig() + self.config = SkyplaneConfig.default_config() if not disable_aws: self.config.aws_enabled = True if aws_config: From a5e52fa1e7451069fa049d5090b832ad4ff7b057 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 05:00:09 +0000 Subject: [PATCH 23/26] config error --- skyplane/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyplane/config.py b/skyplane/config.py index 8e5fecd19..b73fe2dd5 100644 --- a/skyplane/config.py +++ b/skyplane/config.py @@ -156,7 +156,7 @@ def load_config(cls, path) -> "SkyplaneConfig": if "aws_access_key" in config["aws"]: aws_access_key = config.get("aws", "aws_access_key") if "aws_secret_key" in config["aws"]: - aws_secret_key = config.getboolean("aws", "aws_secret_key") + aws_secret_key = config.get("aws", "aws_secret_key") azure_enabled = False azure_subscription_id = None From e557fad04224286e9578109093f98ceb3066a6e2 Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 05:16:16 +0000 Subject: [PATCH 24/26] auth change --- skyplane/compute/aws/aws_auth.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/skyplane/compute/aws/aws_auth.py b/skyplane/compute/aws/aws_auth.py index 532c01e20..45552d72e 100644 --- a/skyplane/compute/aws/aws_auth.py +++ b/skyplane/compute/aws/aws_auth.py @@ -18,6 +18,10 @@ def __init__(self, config: Optional[SkyplaneConfig] = None, access_key: Optional self.config_mode = "manual" self._access_key = access_key self._secret_key = secret_key + elif self.config.aws_access_key and self.config.aws_secret_key: + self.config_mode = "manual" + self._access_key = access_key + self._secret_key = secret_key else: self.config_mode = "iam_inferred" self._access_key = None From bbeccfe827a28473c10f44d4235d60ffd20a6d0d Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 05:24:32 +0000 Subject: [PATCH 25/26] auth config bug --- skyplane/compute/aws/aws_auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/skyplane/compute/aws/aws_auth.py b/skyplane/compute/aws/aws_auth.py index 45552d72e..ba249b040 100644 --- a/skyplane/compute/aws/aws_auth.py +++ b/skyplane/compute/aws/aws_auth.py @@ -20,8 +20,8 @@ def __init__(self, config: Optional[SkyplaneConfig] = None, access_key: Optional self._secret_key = secret_key elif self.config.aws_access_key and self.config.aws_secret_key: self.config_mode = "manual" - self._access_key = access_key - self._secret_key = secret_key + self._access_key = self.config.aws_access_key + self._secret_key = self.config.aws_secret_key else: self.config_mode = "iam_inferred" self._access_key = None From 139203747310deb647ec72526859701ec376941f Mon Sep 17 00:00:00 2001 From: Asim Biswal Date: Fri, 20 Oct 2023 06:50:36 +0000 Subject: [PATCH 26/26] fixing config.py --- skyplane/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/skyplane/config.py b/skyplane/config.py index b73fe2dd5..49993ee48 100644 --- a/skyplane/config.py +++ b/skyplane/config.py @@ -150,6 +150,8 @@ def load_config(cls, path) -> "SkyplaneConfig": anon_clientid = cls.generate_machine_id() aws_enabled = False + aws_access_key = None + aws_secret_access_key = None if "aws" in config: if "aws_enabled" in config["aws"]: aws_enabled = config.getboolean("aws", "aws_enabled")