diff --git a/sky/__init__.py b/sky/__init__.py index 61df3dbffbc..4f0adc2322b 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -85,6 +85,7 @@ def get_git_commit(): OCI = clouds.OCI RunPod = clouds.RunPod Vsphere = clouds.Vsphere +Fluidstack = clouds.Fluidstack optimize = Optimizer.optimize __all__ = [ @@ -101,6 +102,7 @@ def get_git_commit(): 'RunPod', 'SCP', 'Vsphere', + 'Fluidstack', 'Optimizer', 'OptimizeTarget', 'backends', diff --git a/sky/authentication.py b/sky/authentication.py index 299ff7f1347..8b7f94d14d1 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -43,6 +43,7 @@ from sky.adaptors import ibm from sky.adaptors import runpod from sky.clouds.utils import lambda_utils +from sky.provision.fluidstack import fluidstack_utils from sky.provision.kubernetes import utils as kubernetes_utils from sky.utils import common_utils from sky.utils import kubernetes_enums @@ -464,3 +465,17 @@ def setup_runpod_authentication(config: Dict[str, Any]) -> Dict[str, Any]: runpod.runpod().cli.groups.ssh.functions.add_ssh_key(public_key) return configure_ssh_info(config) + + +def setup_fluidstack_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + + get_or_generate_keys() + + client = fluidstack_utils.FluidstackClient() + public_key_path = os.path.expanduser(PUBLIC_SSH_KEY_PATH) + public_key = None + with open(public_key_path, 'r', encoding='utf-8') as f: + public_key = f.read() + client.get_or_add_ssh_key(public_key) + config['auth']['ssh_public_key'] = PUBLIC_SSH_KEY_PATH + return configure_ssh_info(config) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 3ea636f341b..8d2837c175f 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -993,6 +993,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): config = auth.setup_ibm_authentication(config) elif isinstance(cloud, clouds.RunPod): config = auth.setup_runpod_authentication(config) + elif isinstance(cloud, clouds.Fluidstack): + config = auth.setup_fluidstack_authentication(config) else: assert isinstance(cloud, clouds.Local), cloud # Local cluster case, authentication is already filled by the user diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index cfbbb59e294..70cf9e056a9 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -150,6 +150,7 @@ def _get_cluster_config_template(cloud): clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', + clouds.Fluidstack: 'fluidstack-ray.yml.j2' } return cloud_to_template[type(cloud)] diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index 93171a050ce..93b67508ce1 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -13,6 +13,7 @@ from sky.clouds.aws import AWS from sky.clouds.azure import Azure from sky.clouds.cudo import Cudo +from sky.clouds.fluidstack import Fluidstack from sky.clouds.gcp import GCP from sky.clouds.ibm import IBM from sky.clouds.kubernetes import Kubernetes @@ -43,6 +44,7 @@ 'CLOUD_REGISTRY', 'ProvisionerVersion', 'StatusVersion', + 'Fluidstack', # Utility functions 'cloud_in_list', ] diff --git a/sky/clouds/fluidstack.py b/sky/clouds/fluidstack.py new file mode 100644 index 00000000000..67129b1726c --- /dev/null +++ b/sky/clouds/fluidstack.py @@ -0,0 +1,328 @@ +"""Fluidstack Cloud.""" +import json +import os +import typing +from typing import Dict, Iterator, List, Optional, Tuple + +import requests + +from sky import clouds +from sky import status_lib +from sky.clouds import service_catalog +from sky.provision.fluidstack import fluidstack_utils +from sky.utils.resources_utils import DiskTier + +_CREDENTIAL_FILES = [ + # credential files for FluidStack, + fluidstack_utils.FLUIDSTACK_API_KEY_PATH, + fluidstack_utils.FLUIDSTACK_API_TOKEN_PATH, +] +if typing.TYPE_CHECKING: + # Renaming to avoid shadowing variables. + from sky import resources as resources_lib + + +@clouds.CLOUD_REGISTRY.register +class Fluidstack(clouds.Cloud): + """FluidStack GPU Cloud.""" + + _REPR = 'Fluidstack' + + _MAX_CLUSTER_NAME_LEN_LIMIT = 57 + # Currently, none of clouds.CloudImplementationFeatures + # are implemented for Fluidstack Cloud. + # STOP/AUTOSTOP: The Fluidstack cloud + # provider does not support stopping VMs. + + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.STOP: + 'Stopping clusters in FluidStack' + ' is not supported in SkyPilot', + clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: + 'Migrating ' + f'disk is not supported in {_REPR}.', + clouds.CloudImplementationFeatures.SPOT_INSTANCE: + 'Spot instances are' + f' not supported in {_REPR}.', + clouds.CloudImplementationFeatures.IMAGE_ID: + 'Specifying image ID ' + f'is not supported for {_REPR}.', + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + 'Custom disk tiers' + f' is not supported in {_REPR}.', + clouds.CloudImplementationFeatures.OPEN_PORTS: + 'Opening ports' + f'is not supported in {_REPR}.', + } + # Using the latest SkyPilot provisioner API to provision and check status. + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + """The features not supported based on the resources provided. + + This method is used by check_features_are_supported() to check if the + cloud implementation supports all the requested features. + + Returns: + A dict of {feature: reason} for the features not supported by the + cloud implementation. + """ + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering(cls, instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + assert zone is None, 'FluidStack does not support zones.' + del accelerators, zone # unused + if use_spot: + return [] + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'fluidstack') + + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_cost(instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='fluidstack') + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + del accelerators, use_spot, region, zone # unused + # Fluidstack includes accelerators as part of the instance type. + return 0.0 + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + def __repr__(self): + return 'Fluidstack' + + def is_same_cloud(self, other: clouds.Cloud) -> bool: + # Returns true if the two clouds are the same cloud type. + return isinstance(other, Fluidstack) + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[DiskTier] = None) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='fluidstack') + + @classmethod + def get_accelerators_from_instance_type( + cls, + instance_type: str, + ) -> Optional[Dict[str, int]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='fluidstack') + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + return service_catalog.get_vcpus_mem_from_instance_type( + instance_type, clouds='fluidstack') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name_on_cloud: str, + region: clouds.Region, + zones: Optional[List[clouds.Zone]], + dryrun: bool = False, + ) -> Dict[str, Optional[str]]: + + assert zones is None, 'FluidStack does not support zones.' + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + if acc_dict is not None: + custom_resources = json.dumps(acc_dict, separators=(',', ':')) + else: + custom_resources = None + cuda_installation_commands = """ + sudo wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-keyring_1.1-1_all.deb -O /usr/local/cuda-keyring_1.1-1_all.deb; + sudo dpkg -i /usr/local/cuda-keyring_1.1-1_all.deb; + sudo apt-get update; + sudo apt-get -y install cuda-toolkit-12-3; + sudo apt-get install -y cuda-drivers; + sudo apt-get install -y python3-pip; + nvidia-smi || sudo reboot;""" + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'fluidstack_username': self.default_username(region.name), + 'cuda_installation_commands': cuda_installation_commands, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): + if resources.instance_type is not None: + assert resources.is_launchable(), resources + # Accelerators are part of the instance type in Fluidstack Cloud + resources = resources.copy(accelerators=None) + return ([resources], []) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Fluidstack(), + instance_type=instance_type, + # Setting this to None as + # Fluidstack doesn't separately bill / + # attach the accelerators. + # Billed as part of the VM type. + accelerators=None, + cpus=None, + memory=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type with the given number of vCPUs. + default_instance_type = Fluidstack.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + return ([], []) + else: + return (_make([default_instance_type]), []) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + memory=resources.memory, + region=resources.region, + zone=resources.zone, + clouds='fluidstack') + if instance_list is None: + return ([], fuzzy_candidate_list) + return (_make(instance_list), fuzzy_candidate_list) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + + try: + assert os.path.exists( + os.path.expanduser(fluidstack_utils.FLUIDSTACK_API_KEY_PATH)) + assert os.path.exists( + os.path.expanduser(fluidstack_utils.FLUIDSTACK_API_TOKEN_PATH)) + except AssertionError: + return False, ( + 'Failed to access FluidStack Cloud' + ' with credentials. ' + 'To configure credentials, go to:\n ' + ' https://console.fluidstack.io \n ' + 'to obtain an API key and API Token, ' + 'then add save the contents ' + 'to ~/.fluidstack/api_key and ~/.fluidstack/api_token \n') + except requests.exceptions.ConnectionError: + return False, ('Failed to verify FluidStack Cloud credentials. ' + 'Check your network connection ' + 'and try again.') + return True, None + + def get_credential_file_mounts(self) -> Dict[str, str]: + return {filename: filename for filename in _CREDENTIAL_FILES} + + @classmethod + def get_current_user_identity(cls) -> Optional[List[str]]: + # TODO(mjibril): Implement get_current_user_identity for Fluidstack + return None + + def instance_type_exists(self, instance_type: str) -> bool: + return service_catalog.instance_type_exists(instance_type, 'fluidstack') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + return service_catalog.validate_region_zone(region, + zone, + clouds='fluidstack') + + @classmethod + def default_username(cls, region: str) -> str: + return { + 'norway_2_eu': 'fsuser', + 'calgary_1_canada': 'ubuntu', + 'norway_3_eu': 'ubuntu', + 'norway_4_eu': 'ubuntu', + 'india_2': 'root', + 'nevada_1_usa': 'fsuser', + 'generic_1_canada': 'ubuntu', + 'iceland_1_eu': 'ubuntu', + 'new_york_1_usa': 'fsuser', + 'illinois_1_usa': 'fsuser' + }.get(region, 'ubuntu') + + @classmethod + def query_status( + cls, + name: str, + tag_filters: Dict[str, str], + region: Optional[str], + zone: Optional[str], + **kwargs, + ) -> List[status_lib.ClusterStatus]: + return [] diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index 609555080cd..8b57b873b0f 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -16,7 +16,7 @@ CloudFilter = Optional[Union[List[str], str]] ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', - 'kubernetes', 'runpod', 'vsphere', 'cudo') + 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack') def _map_clouds_catalog(clouds: CloudFilter, method_name: str, *args, **kwargs): diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_fluidstack.py b/sky/clouds/service_catalog/data_fetchers/fetch_fluidstack.py new file mode 100644 index 00000000000..3dda75e2f07 --- /dev/null +++ b/sky/clouds/service_catalog/data_fetchers/fetch_fluidstack.py @@ -0,0 +1,158 @@ +"""A script that generates the Fluidstack catalog. + +Usage: + python fetch_fluidstack_cloud.py +""" + +import copy +import csv +import json +import os +from typing import List + +import requests + +ENDPOINT = 'https://api.fluidstack.io/v1/plans' +DEFAULT_FLUIDSTACK_API_KEY_PATH = os.path.expanduser('~/.fluidstack/api_key') +DEFAULT_FLUIDSTACK_API_TOKEN_PATH = os.path.expanduser( + '~/.fluidstack/api_token') + +GPU_MAP = { + 'H100_PCIE_80GB': 'H100', + 'A100_SXM4_80GB': 'A100-80GB', + 'A100_PCIE_80GB': 'A100-80GB', + 'A100_SXM4_40GB': 'A100', + 'A100_PCIE_40GB': 'A100', + 'Tesla_V100_SXM2_16GB': 'V100', + 'Tesla_V100_PCIE_16GB': 'V100', + 'A10_PCIE_24GB': 'A10', + 'A30_24GB': 'A30', + 'A40_48GB': 'A40', + 'RTX_A6000_48GB': 'RTXA6000', + 'RTX_A5000_24GB': 'RTXA5000', + 'RTX_A4000_16GB': 'RTXA4000', + 'Quadro_RTX_5000_16GB': 'RTX5000', + 'Quadro_RTX_4000_8GB': 'RTX4000', + 'L40_48GB': 'L40', + 'Quadro_RTX_6000_16GB': 'RTX6000', + 'T4_16GB': 'T4', + 'RTX_3090_24GB': 'RTX3090', + 'RTX_3080_10GB': 'RTX3080', +} + +CUSTOM_PLANS_CONFIG = [ + dict(gpu_count=1, cpu_count=8, nvme_storage=750, ram=64), + dict(gpu_count=2, cpu_count=16, nvme_storage=1024, ram=128), + #dict(gpu_count=4, cpu_count=32, nvme_storage=1200, ram=160), + #dict(gpu_count=8, cpu_count=64, nvme_storage=1500, ram=200) +] + + +def get_regions(plans: List) -> dict: + """Return a list of regions where the plan is available.""" + regions = {} + for plan in plans: + for region in plan.get('regions', []): + regions[region['id']] = region['id'] + return regions + + +def plans_from_custom_plan(plan: dict) -> List[dict]: + prices = dict(cpu=plan['price']['cpu']['hourly'], + ram=plan['price']['ram']['hourly'], + storage=plan['price']['storage']['hourly'], + gpu=plan['price']['gpu']['hourly']) + new_plans = [] + for i, config in enumerate(CUSTOM_PLANS_CONFIG): + new_plan = copy.deepcopy(plan) + price = (prices['cpu'] * + config['cpu_count']) + (prices['ram'] * config['ram']) + ( + prices['storage'] * config['nvme_storage']) + ( + prices['gpu'] * config['gpu_count']) + new_plan['price']['hourly'] = price / config['gpu_count'] + new_plan['configuration']['core_count'] = config['cpu_count'] + new_plan['configuration']['ram'] = config['ram'] + new_plan['configuration']['gpu_count'] = config['gpu_count'] + new_plan['configuration']['nvme_storage'] = config['nvme_storage'] + new_plan['plan_id'] = f'custom:{i}:{plan["plan_id"]}' + new_plans.append(new_plan) + return new_plans + + +def create_catalog(output_dir: str) -> None: + response = requests.get(ENDPOINT) + plans = response.json() + custom_plans = [ + plan for plan in plans if plan['minimum_commitment'] == 'hourly' and + plan['type'] in ['custom'] and plan['gpu_type'] != 'NO GPU' + ] + #plans = [plan for plan in plans if len(plan['regions']) > 0] + plans = [ + plan for plan in plans if plan['minimum_commitment'] == 'hourly' and + plan['type'] in ['preconfigured'] and + plan['gpu_type'] not in ['NO GPU', 'RTX_3080_10GB', 'RTX_3090_24GB'] + ] + + plans = plans + [ + plan for plan in custom_plans for plan in plans_from_custom_plan(plan) + ] + with open(os.path.join(output_dir, 'vms.csv'), mode='w', + encoding='utf-8') as f: + writer = csv.writer(f, delimiter=',', quotechar='"') + writer.writerow([ + 'InstanceType', + 'AcceleratorName', + 'AcceleratorCount', + 'vCPUs', + 'MemoryGiB', + 'Price', + 'Region', + 'GpuInfo', + 'SpotPrice', + ]) + for plan in plans: + try: + gpu = GPU_MAP[plan['gpu_type']] + except KeyError: + print(f'Could not map {plan["gpu_type"]}') + continue + gpu_memory = int( + str(plan['configuration']['gpu_memory']).replace('GB', + '')) * 1024 + gpu_cnt = int(plan['configuration']['gpu_count']) + vcpus = float(plan['configuration']['core_count']) + mem = float(plan['configuration']['ram']) + price = float(plan['price']['hourly']) * gpu_cnt + gpuinfo = { + 'Gpus': [{ + 'Name': gpu, + 'Manufacturer': 'NVIDIA', + 'Count': gpu_cnt, + 'MemoryInfo': { + 'SizeInMiB': int(gpu_memory) + }, + }], + 'TotalGpuMemoryInMiB': int(gpu_memory * gpu_cnt), + } + gpuinfo = json.dumps(gpuinfo).replace('"', "'") # pylint: disable=invalid-string-quote + for r in plan.get('regions', []): + if r['id'] == 'india_2': + continue + writer.writerow([ + plan['plan_id'], + gpu, + gpu_cnt, + vcpus, + mem, + price, + r['id'], + gpuinfo, + '', + ]) + + +if __name__ == '__main__': + + os.makedirs('fluidstack', exist_ok=True) + create_catalog('fluidstack') + print('Fluidstack catalog saved to {}/vms.csv'.format('fluidstack')) diff --git a/sky/clouds/service_catalog/fluidstack_catalog.py b/sky/clouds/service_catalog/fluidstack_catalog.py new file mode 100644 index 00000000000..2f47a38df43 --- /dev/null +++ b/sky/clouds/service_catalog/fluidstack_catalog.py @@ -0,0 +1,129 @@ +"""Fluidstack Cloud Catalog. + +This module loads the service catalog file and can be used to query +instance types and pricing information for FluidStack. +""" +import typing +from typing import Dict, List, Optional, Tuple + +from sky.clouds.service_catalog import common +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +_df = common.read_catalog('fluidstack/vms.csv') + +# Number of vCPUS for gpu_1x_a10 +_DEFAULT_NUM_VCPUS = 6 +_DEFAULT_MEMORY_CPU_RATIO = 4 + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('FluidStack Cloud does not support zones.') + return common.validate_region_zone_impl('fluidstack', _df, region, zone) + + +def get_hourly_cost(instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + assert not use_spot, 'FluidStack Cloud does not support spot.' + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('FluidStack Cloud does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type(cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None) -> Optional[str]: + del disk_tier # unused + if cpus is None and memory is None: + cpus = f'{_DEFAULT_NUM_VCPUS}+' + if memory is None: + memory_gb_or_ratio = f'{_DEFAULT_MEMORY_CPU_RATIO}x' + else: + memory_gb_or_ratio = memory + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, + memory_gb_or_ratio) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, int]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: + """Returns a list of instance types satisfying the required count of + accelerators with sorted prices and a list of candidates with fuzzy search. + """ + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('FluidStack Cloud does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone) + + +def regions() -> List['cloud.Region']: + return common.get_region_zones(_df, use_spot=False) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + region_list = common.get_region_zones(df, use_spot) + # Hack: Enforce US regions are always tried first + us_region_list = [] + other_region_list = [] + for region in region_list: + if region.name.startswith('us-'): + us_region_list.append(region) + else: + other_region_list.append(region) + return us_region_list + other_region_list + + +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True, +) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in Fluidstack offering GPUs.""" + del require_price + return common.list_accelerators_impl('Fluidstack', _df, gpus_only, + name_filter, region_filter, + quantity_filter, case_sensitive, + all_regions) diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 6705f5f25bc..9dc73a54a53 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -16,6 +16,7 @@ from sky.provision import azure from sky.provision import common from sky.provision import cudo +from sky.provision import fluidstack from sky.provision import gcp from sky.provision import kubernetes from sky.provision import runpod diff --git a/sky/provision/fluidstack/__init__.py b/sky/provision/fluidstack/__init__.py new file mode 100644 index 00000000000..af070bd20b4 --- /dev/null +++ b/sky/provision/fluidstack/__init__.py @@ -0,0 +1,10 @@ +"""Fluidstack provisioner module.""" + +from sky.provision.fluidstack.config import bootstrap_instances +from sky.provision.fluidstack.instance import cleanup_ports +from sky.provision.fluidstack.instance import get_cluster_info +from sky.provision.fluidstack.instance import query_instances +from sky.provision.fluidstack.instance import run_instances +from sky.provision.fluidstack.instance import stop_instances +from sky.provision.fluidstack.instance import terminate_instances +from sky.provision.fluidstack.instance import wait_instances diff --git a/sky/provision/fluidstack/config.py b/sky/provision/fluidstack/config.py new file mode 100644 index 00000000000..e89fa6cf8cc --- /dev/null +++ b/sky/provision/fluidstack/config.py @@ -0,0 +1,12 @@ +"""FluidStack configuration bootstrapping.""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name # unused + + return config diff --git a/sky/provision/fluidstack/fluidstack_utils.py b/sky/provision/fluidstack/fluidstack_utils.py new file mode 100644 index 00000000000..de3b2fae2c9 --- /dev/null +++ b/sky/provision/fluidstack/fluidstack_utils.py @@ -0,0 +1,241 @@ +"""FluidStack API client.""" + +import functools +import json +import os +from typing import Any, Dict, List, Optional +import uuid + +import requests + +from sky.clouds.service_catalog.data_fetchers import fetch_fluidstack + + +def get_key_suffix(): + return str(uuid.uuid4()).replace('-', '')[:8] + + +ENDPOINT = 'https://api.fluidstack.io/v1/' +FLUIDSTACK_API_KEY_PATH = '~/.fluidstack/api_key' +FLUIDSTACK_API_TOKEN_PATH = '~/.fluidstack/api_token' + + +def read_contents(path: str) -> str: + with open(path, mode='r', encoding='utf-8') as f: + return f.read().strip() + + +class FluidstackAPIError(Exception): + + def __init__(self, message: str, code: int = 400): + self.code = code + super().__init__(message) + + +def raise_fluidstack_error(response: requests.Response) -> None: + """Raise FluidstackAPIError if appropriate.""" + status_code = response.status_code + if response.ok: + return + try: + resp_json = response.json() + message = resp_json.get('error', response.text) + except (KeyError, json.decoder.JSONDecodeError) as e: + raise FluidstackAPIError( + f'Unexpected error. Status code: {status_code} \n {response.text}' + f'\n {str(e)}', + code=status_code) from e + raise FluidstackAPIError(f'{message}', status_code) + + +@functools.lru_cache() +def with_nvidia_drivers(region: str): + if region in ['norway_4_eu', 'generic_1_canada']: + return False + client = FluidstackClient() + plans = client.get_plans() + for plan in plans: + if region in [r['id'] for r in plan['regions']]: + if 'Ubuntu 20.04 LTS (Nvidia)' in plan['os_options']: + return True + return False + + +class FluidstackClient: + """FluidStack API Client""" + + def __init__(self): + self.api_key = read_contents( + os.path.expanduser(FLUIDSTACK_API_KEY_PATH)) + self.api_token = read_contents( + os.path.expanduser(FLUIDSTACK_API_TOKEN_PATH)) + + def get_plans(self): + response = requests.get(ENDPOINT + 'plans') + raise_fluidstack_error(response) + plans = response.json() + plans = [ + plan for plan in plans + if plan['minimum_commitment'] == 'hourly' and plan['type'] in + ['preconfigured', 'custom'] and plan['gpu_type'] != 'NO GPU' + ] + return plans + + def list_instances( + self, + tag_filters: Optional[Dict[str, + str]] = None) -> List[Dict[str, Any]]: + response = requests.get( + ENDPOINT + 'servers', + auth=(self.api_key, self.api_token), + ) + raise_fluidstack_error(response) + instances = response.json() + filtered_instances = [] + + for instance in instances: + if isinstance(instance['tags'], str): + instance['tags'] = json.loads(instance['tags']) + if not instance['tags']: + instance['tags'] = {} + if tag_filters: + for key in tag_filters: + if instance['tags'].get(key, None) != tag_filters[key]: + break + else: + filtered_instances.append(instance) + else: + filtered_instances.append(instance) + + return filtered_instances + + def create_instance( + self, + instance_type: str = '', + hostname: str = '', + region: str = '', + ssh_pub_key: str = '', + count: int = 1, + ) -> List[str]: + """Launch new instances.""" + + config = {} + plans = self.get_plans() + if 'custom' in instance_type: + values = instance_type.split(':') + index = values[1] + instance_type = values[2] + config = fetch_fluidstack.CUSTOM_PLANS_CONFIG[int(index)] + plan = [plan for plan in plans if plan['plan_id'] == instance_type + ][0] + config['gpu_model'] = plan['gpu_type'] + + regions = self.list_regions() + plans = [ + plan for plan in plans if plan['plan_id'] == instance_type and + region in [r['id'] for r in plan['regions']] + ] + if not plans: + raise FluidstackAPIError( + f'Plan {instance_type} out of stock in region {region}') + + ssh_key = self.get_or_add_ssh_key(ssh_pub_key) + os_id = 'Ubuntu 20.04 LTS' + body = dict(plan=None if config else instance_type, + region=regions[region], + os=os_id, + hostname=hostname, + ssh_keys=[ssh_key['id']], + multiplicity=count, + config=config) + + response = requests.post(ENDPOINT + 'server', + auth=(self.api_key, self.api_token), + json=body) + raise_fluidstack_error(response) + return response.json().get('multiple') + + def list_ssh_keys(self): + response = requests.get(ENDPOINT + 'ssh', + auth=(self.api_key, self.api_token)) + raise_fluidstack_error(response) + return response.json() + + def get_or_add_ssh_key(self, ssh_pub_key: str = '') -> Dict[str, str]: + """Add ssh key if not already added.""" + ssh_keys = self.list_ssh_keys() + for key in ssh_keys: + if key['public_key'].strip() == ssh_pub_key.strip(): + return { + 'id': key['id'], + 'name': key['name'], + 'ssh_key': ssh_pub_key + } + ssh_key_name = 'skypilot-' + get_key_suffix() + response = requests.post( + ENDPOINT + 'ssh', + auth=(self.api_key, self.api_token), + json=dict(name=ssh_key_name, public_key=ssh_pub_key), + ) + raise_fluidstack_error(response) + key_id = response.json()['id'] + return {'id': key_id, 'name': ssh_key_name, 'ssh_key': ssh_pub_key} + + @functools.lru_cache() + def list_regions(self): + response = requests.get(ENDPOINT + 'plans') + raise_fluidstack_error(response) + plans = response.json() + plans = [ + plan for plan in plans + if plan['minimum_commitment'] == 'hourly' and plan['type'] in + ['preconfigured', 'custom'] and plan['gpu_type'] != 'NO GPU' + ] + + def get_regions(plans: List) -> dict: + """Return a list of regions where the plan is available.""" + regions = {} + for plan in plans: + for region in plan.get('regions', []): + regions[region['id']] = region['id'] + return regions + + regions = get_regions(plans) + return regions + + def delete(self, instance_id: str): + response = requests.delete(ENDPOINT + 'server/' + instance_id, + auth=(self.api_key, self.api_token)) + raise_fluidstack_error(response) + return response.json() + + def stop(self, instance_id: str): + response = requests.put(ENDPOINT + 'server/' + instance_id + '/stop', + auth=(self.api_key, self.api_token)) + raise_fluidstack_error(response) + return response.json() + + def restart(self, instance_id: str): + response = requests.post(ENDPOINT + 'server/' + instance_id + '/reboot', + auth=(self.api_key, self.api_token)) + raise_fluidstack_error(response) + return response.json() + + def info(self, instance_id: str): + response = requests.get(ENDPOINT + f'server/{instance_id}', + auth=(self.api_key, self.api_token)) + raise_fluidstack_error(response) + return response.json() + + def status(self, instance_id: str): + response = self.info(instance_id) + return response['status'] + + def add_tags(self, instance_id: str, tags: Dict[str, str]): + response = requests.patch( + ENDPOINT + f'server/{instance_id}/tag', + auth=(self.api_key, self.api_token), + json=dict(tags=json.dumps(tags)), + ) + raise_fluidstack_error(response) + return response.json() diff --git a/sky/provision/fluidstack/instance.py b/sky/provision/fluidstack/instance.py new file mode 100644 index 00000000000..68c656f2808 --- /dev/null +++ b/sky/provision/fluidstack/instance.py @@ -0,0 +1,276 @@ +"""FluidStack instance provisioning.""" +import time +from typing import Any, Dict, List, Optional + +from sky import authentication as auth +from sky import exceptions +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.fluidstack import fluidstack_utils as utils +from sky.utils import command_runner +from sky.utils import common_utils +from sky.utils import subprocess_utils +from sky.utils import ux_utils + +_GET_INTERNAL_IP_CMD = ('ip -4 -br addr show | grep UP | grep -Eo ' + r'"(10\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|' + r'172\.(1[6-9]|2[0-9]|3[0-1]))\.(25[0-5]|' + r'2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|' + r'2[0-4][0-9]|[01]?[0-9][0-9]?)"') +POLL_INTERVAL = 5 + +logger = sky_logging.init_logger(__name__) + + +def get_internal_ip(node_info: Dict[str, Any]) -> None: + node_info['internal_ip'] = node_info['ip_address'] + runner = command_runner.SSHCommandRunner( + node_info['ip_address'], + ssh_user=node_info['capabilities']['default_user_name'], + ssh_private_key=auth.PRIVATE_SSH_KEY_PATH) + result = runner.run(_GET_INTERNAL_IP_CMD, + require_outputs=True, + stream_logs=False) + + if result[0] != 0: + # Some DCs do not have internal IPs and can fail when getting + # the IP. We set the `internal_ip` to the same as + # external IP. It should be fine as the `ray cluster` + # will also get and use that external IP in that case. + logger.debug('Failed get obtain private IP from node') + else: + node_info['internal_ip'] = result[1].strip() + + +def _filter_instances(cluster_name_on_cloud: str, + status_filters: Optional[List[str]]) -> Dict[str, Any]: + + instances = utils.FluidstackClient().list_instances() + possible_names = [ + f'{cluster_name_on_cloud}-head', f'{cluster_name_on_cloud}-worker' + ] + + filtered_instances = {} + for instance in instances: + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('hostname') in possible_names: + filtered_instances[instance['id']] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + head_instance_id = None + for inst_id, inst in instances.items(): + if inst['hostname'].endswith('-head'): + head_instance_id = inst_id + break + return head_instance_id + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + + pending_status = [ + 'create', + 'requesting', + 'provisioning', + 'customizing', + 'starting', + 'stopping', + 'start', + 'stop', + 'reboot', + 'rebooting', + ] + + while True: + instances = _filter_instances(cluster_name_on_cloud, pending_status) + if not instances: + break + logger.info(f'Waiting for {len(instances)} instances to be ready.') + time.sleep(POLL_INTERVAL) + exist_instances = _filter_instances(cluster_name_on_cloud, ['running']) + head_instance_id = _get_head_instance_id(exist_instances) + + to_start_count = config.count - len(exist_instances) + if to_start_count < 0: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, no need to start more.') + return common.ProvisionRecord(provider_name='fluidstack', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[]) + + created_instance_ids = [] + for _ in range(to_start_count): + node_type = 'head' if head_instance_id is None else 'worker' + try: + instance_ids = utils.FluidstackClient().create_instance( + hostname=f'{cluster_name_on_cloud}-{node_type}', + instance_type=config.node_config['InstanceType'], + ssh_pub_key=config.node_config['AuthorizedKey'], + region=region) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'run_instances error: {e}') + raise + logger.info(f'Launched instance {instance_ids[0]}.') + created_instance_ids.append(instance_ids[0]) + if head_instance_id is None: + head_instance_id = instance_ids[0] + + # Wait for instances to be ready. + while True: + instances = _filter_instances(cluster_name_on_cloud, ['running']) + ready_instance_cnt = len(instances) + logger.info('Waiting for instances to be ready: ' + f'({ready_instance_cnt}/{config.count}).') + if ready_instance_cnt == config.count: + break + failed_instances = _filter_instances( + cluster_name_on_cloud, + ['timeout error', 'failed to create', 'out of stock']) + if failed_instances: + logger.error(f'Failed to create {len(failed_instances)}' + f'instances for cluster {cluster_name_on_cloud}') + raise RuntimeError( + f'Failed to create {len(failed_instances)} instances.') + + time.sleep(POLL_INTERVAL) + assert head_instance_id is not None, 'head_instance_id should not be None' + return common.ProvisionRecord(provider_name='fluidstack', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=created_instance_ids) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + raise NotImplementedError() + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config # unused + instances = _filter_instances(cluster_name_on_cloud, None) + for inst_id, inst in instances.items(): + logger.debug(f'Terminating instance {inst_id}: {inst}') + if worker_only and inst['hostname'].endswith('-head'): + continue + try: + utils.FluidstackClient().delete(inst_id) + except Exception as e: # pylint: disable=broad-except + if (isinstance(e, utils.FluidstackAPIError) and + 'Machine is already terminated' in str(e)): + logger.debug(f'Instance {inst_id} is already terminated.') + continue + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to terminate instance {inst_id}: ' + f'{common_utils.format_exception(e, use_bracket=False)}' + ) from e + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: + del region, provider_config # unused + running_instances = _filter_instances(cluster_name_on_cloud, ['running']) + instances: Dict[str, List[common.InstanceInfo]] = {} + + subprocess_utils.run_in_parallel(get_internal_ip, + list(running_instances.values())) + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instance_id = instance_info['id'] + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['internal_ip'], + external_ip=instance_info['ip_address'], + ssh_port=instance_info['ssh_port'], + tags={}, + ) + ] + if instance_info['hostname'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo(instances=instances, + head_instance_id=head_instance_id, + custom_ray_options={'use_external_ip': True}) + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(cluster_name_on_cloud, None) + instances = _filter_instances(cluster_name_on_cloud, None) + status_map = { + 'provisioning': status_lib.ClusterStatus.INIT, + 'requesting': status_lib.ClusterStatus.INIT, + 'create': status_lib.ClusterStatus.INIT, + 'customizing': status_lib.ClusterStatus.INIT, + 'stopping': status_lib.ClusterStatus.STOPPED, + 'stop': status_lib.ClusterStatus.STOPPED, + 'start': status_lib.ClusterStatus.INIT, + 'reboot': status_lib.ClusterStatus.STOPPED, + 'rebooting': status_lib.ClusterStatus.STOPPED, + 'stopped': status_lib.ClusterStatus.STOPPED, + 'starting': status_lib.ClusterStatus.INIT, + 'running': status_lib.ClusterStatus.UP, + 'failed to create': status_lib.ClusterStatus.INIT, + 'timeout error': status_lib.ClusterStatus.INIT, + 'out of stock': status_lib.ClusterStatus.INIT, + 'terminated': None, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for inst_id, inst in instances.items(): + if inst['status'] not in status_map: + with ux_utils.print_exception_no_traceback(): + raise exceptions.ClusterStatusFetchingError( + f'Failed to parse status from Fluidstack: {inst["status"]}') + status = status_map.get(inst['status'], None) + if non_terminated_only and status is None: + continue + statuses[inst_id] = status + return statuses + + +def cleanup_ports( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, provider_config diff --git a/sky/provision/instance_setup.py b/sky/provision/instance_setup.py index 746b4a7a668..c0cb70dc54d 100644 --- a/sky/provision/instance_setup.py +++ b/sky/provision/instance_setup.py @@ -227,6 +227,7 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], **ssh_credentials) assert cluster_info.head_instance_id is not None, (cluster_name, cluster_info) + # Log the head node's output to the provision.log log_path_abs = str(provision_logging.get_log_path()) ray_options = ( @@ -240,6 +241,8 @@ def start_ray_on_head_node(cluster_name: str, custom_resource: Optional[str], ray_options += f' --resources=\'{custom_resource}\'' if cluster_info.custom_ray_options: + if 'use_external_ip' in cluster_info.custom_ray_options: + cluster_info.custom_ray_options.pop('use_external_ip') for key, value in cluster_info.custom_ray_options.items(): ray_options += f' --{key}={value}' @@ -295,11 +298,19 @@ def start_ray_on_worker_nodes(cluster_name: str, no_restart: bool, head_instance = cluster_info.get_head_instance() assert head_instance is not None, cluster_info - head_private_ip = head_instance.internal_ip + use_external_ip = False + if cluster_info.custom_ray_options: + # Some cloud providers, e.g. fluidstack, cannot connect to the internal + # IP of the head node from the worker nodes. In this case, we need to + # use the external IP of the head node. + use_external_ip = cluster_info.custom_ray_options.pop( + 'use_external_ip', False) + head_ip = (head_instance.internal_ip + if not use_external_ip else head_instance.external_ip) + + ray_options = (f'--address={head_ip}:{constants.SKY_REMOTE_RAY_PORT} ' + f'--object-manager-port=8076') - ray_options = ( - f'--address={head_private_ip}:{constants.SKY_REMOTE_RAY_PORT} ' - f'--object-manager-port=8076') if custom_resource: ray_options += f' --resources=\'{custom_resource}\'' @@ -320,7 +331,7 @@ def start_ray_on_worker_nodes(cluster_name: str, no_restart: bool, # Instead, we check whether the raylet process is running on gcs address # that is connected to the head with the correct port. cmd = (f'RAY_PORT={ray_port}; ps aux | grep "ray/raylet/raylet" | ' - f'grep "gcs-address={head_private_ip}:${{RAY_PORT}}" || ' + f'grep "gcs-address={head_ip}:${{RAY_PORT}}" || ' f'{{ {cmd} }}') else: cmd = 'ray stop; ' + cmd diff --git a/sky/templates/fluidstack-ray.yml.j2 b/sky/templates/fluidstack-ray.yml.j2 new file mode 100644 index 00000000000..baeb114c1a6 --- /dev/null +++ b/sky/templates/fluidstack-ray.yml.j2 @@ -0,0 +1,78 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +provider: + type: external + module: sky.provision.fluidstack + region: {{region}} + disable_launch_config_check: true + + +auth: + ssh_user: {{fluidstack_username}} + ssh_private_key: {{ssh_private_key}} + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + AuthorizedKey: | + skypilot:ssh_public_key_content + + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - sudo systemctl stop unattended-upgrades || true; + sudo systemctl disable unattended-upgrades || true; + sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; + sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; + sudo pkill -9 apt-get; + sudo pkill -9 dpkg; + sudo dpkg --configure -a; + {{ cuda_installation_commands }} + mkdir -p ~/.ssh; touch ~/.ssh/config; + {{ conda_installation_commands }} + (type -a python | grep -q python3) || echo 'alias python=python3' >> ~/.bashrc; + (type -a pip | grep -q pip3) || echo 'alias pip=pip3' >> ~/.bashrc; + source ~/.bashrc; + (pip3 list | grep ray | grep {{ray_version}} 2>&1 > /dev/null || pip3 install -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; + (pip3 list | grep "skypilot " && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[aws, remote]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + python3 -c "from sky.skylet.ray_patches import patch; patch()" || exit 1; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); + diff --git a/tests/conftest.py b/tests/conftest.py index e241d93dea8..04333395c8c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,7 @@ # --managed-spot. all_clouds_in_smoke_tests = [ 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', - 'kubernetes', 'vsphere', 'cudo' + 'kubernetes', 'vsphere', 'cudo', 'fluidstack' ] default_clouds_to_run = ['gcp', 'azure'] @@ -40,6 +40,7 @@ 'oci': 'oci', 'kubernetes': 'kubernetes', 'vsphere': 'vsphere', + 'fluidstack': 'fluidstack', 'cudo': 'cudo' } diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 486ff203171..16d5961fbf7 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -68,6 +68,7 @@ test_id = str(uuid.uuid4())[-2:] LAMBDA_TYPE = '--cloud lambda --gpus A10' +FLUIDSTACK_TYPE = '--cloud fluidstack --gpus RTXA4000' SCP_TYPE = '--cloud scp' SCP_GPU_V100 = '--gpus V100-32GB' @@ -118,6 +119,11 @@ def echo(self, message: str): print(message, file=sys.stderr, flush=True) +def _get_timeout(generic_cloud: str, default_timeout: int = Test.timeout): + timeouts = {'fluidstack': 60 * 60} # file_mounts + return timeouts.get(generic_cloud, default_timeout) + + def _get_cluster_name() -> str: """Returns a user-unique cluster name for each test_(). @@ -243,6 +249,7 @@ def get_gcp_region_for_quota_failover() -> Optional[str]: # ---------- Dry run: 2 Tasks in a chain. ---------- +@pytest.mark.no_fluidstack #requires GCP and AWS set up def test_example_app(): test = Test( 'example_app', @@ -267,6 +274,7 @@ def test_minimal(generic_cloud: str): f'sky logs {name} 2 --status', # Ensure the job succeeded. ], f'sky down -y {name}', + _get_timeout(generic_cloud), ) run_one_test(test) @@ -666,6 +674,7 @@ def test_image_no_conda(): # ------------ Test stale job ------------ +@pytest.mark.no_fluidstack # FluidStack does not support stopping instances in SkyPilot implementation @pytest.mark.no_lambda_cloud # Lambda Cloud does not support stopping instances @pytest.mark.no_kubernetes # Kubernetes does not support stopping instances def test_stale_job(generic_cloud: str): @@ -749,6 +758,7 @@ def test_gcp_stale_job_manual_restart(): # ---------- Check Sky's environment variables; workdir. ---------- +@pytest.mark.no_fluidstack # Requires amazon S3 @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet def test_env_check(generic_cloud: str): name = _get_cluster_name() @@ -784,7 +794,7 @@ def test_file_mounts(generic_cloud: str): 'using_file_mounts', test_commands, f'sky down -y {name}', - timeout=20 * 60, # 20 mins + _get_timeout(generic_cloud, 20 * 60), # 20 mins ) run_one_test(test) @@ -806,6 +816,7 @@ def test_scp_file_mounts(): run_one_test(test) +@pytest.mark.no_fluidstack # Requires GCP to be enabled def test_using_file_mounts_with_env_vars(generic_cloud: str): name = _get_cluster_name() test_commands = [ @@ -991,21 +1002,17 @@ def test_cli_logs(generic_cloud: str): # Kubernetes does not support multi-node num_nodes = 1 timestamp = time.time() - test = Test( - 'cli_logs', - [ - f'sky launch -y -c {name} --cloud {generic_cloud} --num-nodes {num_nodes} "echo {timestamp} 1"', - f'sky exec {name} "echo {timestamp} 2"', - f'sky exec {name} "echo {timestamp} 3"', - f'sky exec {name} "echo {timestamp} 4"', - f'sky logs {name} 2 --status', - f'sky logs {name} 3 4 --sync-down', - f'sky logs {name} * --sync-down', - f'sky logs {name} 1 | grep "{timestamp} 1"', - f'sky logs {name} | grep "{timestamp} 4"', - ], - f'sky down -y {name}', - ) + test = Test('cli_logs', [ + f'sky launch -y -c {name} --cloud {generic_cloud} --num-nodes {num_nodes} "echo {timestamp} 1"', + f'sky exec {name} "echo {timestamp} 2"', + f'sky exec {name} "echo {timestamp} 3"', + f'sky exec {name} "echo {timestamp} 4"', + f'sky logs {name} 2 --status', + f'sky logs {name} 3 4 --sync-down', + f'sky logs {name} * --sync-down', + f'sky logs {name} 1 | grep "{timestamp} 1"', + f'sky logs {name} | grep "{timestamp} 4"', + ], f'sky down -y {name}') run_one_test(test) @@ -1032,6 +1039,7 @@ def test_scp_logs(): # ---------- Job Queue. ---------- +@pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus. run test_ibm_job_queue instead @pytest.mark.no_scp # SCP does not have T4 gpus. Run test_scp_job_queue instead @@ -1063,6 +1071,7 @@ def test_job_queue(generic_cloud: str): # ---------- Job Queue with Docker. ---------- +@pytest.mark.no_fluidstack # FluidStack does not support docker for now @pytest.mark.no_lambda_cloud # Doesn't support Lambda Cloud for now @pytest.mark.no_ibm # Doesn't support IBM Cloud for now @pytest.mark.no_scp # Doesn't support SCP for now @@ -1165,6 +1174,7 @@ def test_scp_job_queue(): run_one_test(test) +@pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus. run test_ibm_job_queue_multinode instead @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @@ -1309,6 +1319,7 @@ def test_ibm_job_queue_multinode(): # ---------- Docker with preinstalled package. ---------- +@pytest.mark.no_fluidstack # Doesn't support Fluidstack for now @pytest.mark.no_lambda_cloud # Doesn't support Lambda Cloud for now @pytest.mark.no_ibm # Doesn't support IBM Cloud for now @pytest.mark.no_scp # Doesn't support SCP for now @@ -1330,6 +1341,7 @@ def test_docker_preinstalled_package(generic_cloud: str): # ---------- Submitting multiple tasks to the same cluster. ---------- +@pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @@ -1500,7 +1512,7 @@ def test_multi_hostname(generic_cloud: str): f'sky logs {name} 2 --status', # Ensure the job succeeded. ], f'sky down -y {name}', - timeout=total_timeout_minutes * 60, + timeout=_get_timeout(generic_cloud, total_timeout_minutes * 60), ) run_one_test(test) @@ -1671,6 +1683,7 @@ def test_azure_start_stop(): # ---------- Testing Autostopping ---------- +@pytest.mark.no_fluidstack # FluidStack does not support stopping in SkyPilot implementation @pytest.mark.no_lambda_cloud # Lambda Cloud does not support stopping instances @pytest.mark.no_ibm # FIX(IBM) sporadically fails, as restarted workers stay uninitialized indefinitely @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @@ -1736,6 +1749,7 @@ def test_autostop(generic_cloud: str): # ---------- Testing Autodowning ---------- +@pytest.mark.no_fluidstack # FluidStack does not support stopping in SkyPilot implementation @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_autodown instead. def test_autodown(generic_cloud: str): name = _get_cluster_name() @@ -1911,6 +1925,7 @@ def test_cancel_ibm(): # ---------- Testing use-spot option ---------- +@pytest.mark.no_fluidstack # FluidStack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -1962,6 +1977,7 @@ def test_stop_gcp_spot(): # ---------- Testing managed spot ---------- +@pytest.mark.no_fluidstack # FluidStack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -1996,6 +2012,7 @@ def test_spot(generic_cloud: str): run_one_test(test) +@pytest.mark.no_fluidstack #fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2036,6 +2053,7 @@ def test_spot_pipeline(generic_cloud: str): run_one_test(test) +@pytest.mark.no_fluidstack #fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2060,6 +2078,7 @@ def test_spot_failed_setup(generic_cloud: str): run_one_test(test) +@pytest.mark.no_fluidstack #fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2253,6 +2272,7 @@ def test_spot_pipeline_recovery_gcp(): run_one_test(test) +@pytest.mark.no_fluidstack # Fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2475,6 +2495,7 @@ def test_spot_cancellation_gcp(): # ---------- Testing storage for managed spot ---------- +@pytest.mark.no_fluidstack # Fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2532,6 +2553,7 @@ def test_spot_tpu(): # ---------- Testing env for spot ---------- +@pytest.mark.no_fluidstack # Fluidstack does not support spot instances @pytest.mark.no_azure # Azure does not support spot instances @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @@ -2563,11 +2585,13 @@ def test_inline_env(generic_cloud: str): 'test-inline-env', [ f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + 'sleep 20', f'sky logs {name} 1 --status', f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', f'sky logs {name} 2 --status', ], f'sky down -y {name}', + _get_timeout(generic_cloud), ) run_one_test(test) @@ -2585,6 +2609,7 @@ def test_inline_env_file(generic_cloud: str): f'sky logs {name} 2 --status', ], f'sky down -y {name}', + _get_timeout(generic_cloud), ) run_one_test(test) @@ -3622,6 +3647,7 @@ def tmp_public_storage_obj(self, request): # This does not require any deletion logic because it is a public bucket # and should not get added to global_user_state. + @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, pytest.param(storage_lib.StoreType.IBM, marks=pytest.mark.ibm), @@ -3645,6 +3671,7 @@ def test_new_bucket_creation_and_deletion(self, tmp_local_storage_obj, out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_local_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_fluidstack @pytest.mark.xdist_group('multiple_bucket_deletion') @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -3685,6 +3712,7 @@ def test_multiple_buckets_creation_and_deletion( ] assert all([item not in out for item in storage_obj_name]) + @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, pytest.param(storage_lib.StoreType.IBM, marks=pytest.mark.ibm), @@ -3709,6 +3737,7 @@ def test_upload_source_with_spaces(self, store_type, ] assert all([item in out for item in storage_obj_names]) + @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, pytest.param(storage_lib.StoreType.IBM, marks=pytest.mark.ibm), @@ -3738,6 +3767,7 @@ def test_bucket_external_deletion(self, tmp_scratch_storage_obj, out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_scratch_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, pytest.param(storage_lib.StoreType.IBM, marks=pytest.mark.ibm), @@ -3755,6 +3785,7 @@ def test_bucket_bulk_deletion(self, store_type, tmp_bulk_del_storage_obj): output = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_bulk_del_storage_obj.name not in output.decode('utf-8') + @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'tmp_public_storage_obj, store_type', [('s3://tcga-2-open', storage_lib.StoreType.S3), @@ -3770,6 +3801,7 @@ def test_public_bucket(self, tmp_public_storage_obj, store_type): out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_public_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_fluidstack @pytest.mark.parametrize('nonexist_bucket_url', [ 's3://{random_name}', 'gs://{random_name}', pytest.param('cos://us-east/{random_name}', marks=pytest.mark.ibm), @@ -3831,6 +3863,7 @@ def test_nonexistent_bucket(self, nonexist_bucket_url): storage_obj = storage_lib.Storage(source=nonexist_bucket_url.format( random_name=nonexist_bucket_name)) + @pytest.mark.no_fluidstack @pytest.mark.parametrize('private_bucket', [ f's3://imagenet', f'gs://imagenet', pytest.param('cos://us-east/bucket1', marks=pytest.mark.ibm) @@ -3848,6 +3881,7 @@ def test_private_bucket(self, private_bucket): name=private_bucket_name)): storage_obj = storage_lib.Storage(source=private_bucket) + @pytest.mark.no_fluidstack @pytest.mark.parametrize('ext_bucket_fixture, store_type', [('tmp_awscli_bucket', storage_lib.StoreType.S3), ('tmp_gsutil_bucket', storage_lib.StoreType.GCS), @@ -3885,6 +3919,7 @@ def test_upload_to_existing_bucket(self, ext_bucket_fixture, request, out = subprocess.check_output(['sky', 'storage', 'ls']) assert storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_fluidstack def test_copy_mount_existing_storage(self, tmp_copy_mnt_existing_storage_obj): # Creates a bucket with no source in MOUNT mode (empty bucket), and @@ -3896,6 +3931,7 @@ def test_copy_mount_existing_storage(self, out = subprocess.check_output(['sky', 'storage', 'ls']).decode('utf-8') assert storage_name in out, f'Storage {storage_name} not found in sky storage ls.' + @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, pytest.param(storage_lib.StoreType.IBM, marks=pytest.mark.ibm), @@ -3922,6 +3958,7 @@ def test_list_source(self, tmp_local_list_storage_obj, store_type): 'File not found in bucket - output was : {}'.format(out.decode ('utf-8')) + @pytest.mark.no_fluidstack @pytest.mark.parametrize('invalid_name_list, store_type', [(AWS_INVALID_NAMES, storage_lib.StoreType.S3), (GCS_INVALID_NAMES, storage_lib.StoreType.GCS), @@ -3939,6 +3976,7 @@ def test_invalid_names(self, invalid_name_list, store_type): storage_obj = storage_lib.Storage(name=name) storage_obj.add_store(store_type) + @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'gitignore_structure, store_type', [(GITIGNORE_SYNC_TEST_DIR_STRUCTURE, storage_lib.StoreType.S3), @@ -4036,6 +4074,7 @@ def test_load_dump_yaml_config_equivalent(self): # ---------- Testing Multiple Accelerators ---------- +@pytest.mark.no_fluidstack # Fluidstack does not support K80 gpus for now def test_multiple_accelerators_ordered(): name = _get_cluster_name() test = Test( @@ -4050,6 +4089,7 @@ def test_multiple_accelerators_ordered(): run_one_test(test) +@pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs def test_multiple_accelerators_ordered_with_default(): name = _get_cluster_name() test = Test( @@ -4064,6 +4104,7 @@ def test_multiple_accelerators_ordered_with_default(): run_one_test(test) +@pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs def test_multiple_accelerators_unordered(): name = _get_cluster_name() test = Test( @@ -4077,6 +4118,7 @@ def test_multiple_accelerators_unordered(): run_one_test(test) +@pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs def test_multiple_accelerators_unordered_with_default(): name = _get_cluster_name() test = Test( @@ -4091,6 +4133,7 @@ def test_multiple_accelerators_unordered_with_default(): run_one_test(test) +@pytest.mark.no_fluidstack # Requires other clouds to be enabled def test_multiple_resources(): name = _get_cluster_name() test = Test( @@ -4105,6 +4148,7 @@ def test_multiple_resources(): # ---------- Sky Benchmark ---------- +@pytest.mark.no_fluidstack # Requires other clouds to be enabled @pytest.mark.no_kubernetes def test_sky_bench(generic_cloud: str): name = _get_cluster_name()