Skip to content

Commit

Permalink
command util for local v cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
carolineechen committed Apr 4, 2024
1 parent 268b059 commit a7ed140
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 126 deletions.
8 changes: 8 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
LOGS_DIR = ".rh/logs"
RH_LOGFILE_PATH = Path.home() / LOGS_DIR

ENVS_DIR = "~/.rh/envs"

MAX_MESSAGE_LENGTH = 1 * 1024 * 1024 * 1024 # 1 GB

CLI_RESTART_CMD = "runhouse restart"
Expand Down Expand Up @@ -43,3 +45,9 @@
# We need to use this instead of ray stop to make sure we don't stop the SkyPilot ray server,
# which runs on other ports but is required to preserve autostop and correct cluster status.
RAY_KILL_CMD = 'pkill -f ".*ray.*' + str(DEFAULT_RAY_PORT) + '.*"'

CONDA_INSTALL_CMDS = [
"wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda.sh",
"bash ~/miniconda.sh -b -p ~/miniconda",
"source $HOME/miniconda3/bin/activate",
]
125 changes: 75 additions & 50 deletions runhouse/resources/envs/conda_env.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import logging
import shlex
import subprocess
from pathlib import Path

from pathlib import Path
from typing import Dict, List, Optional, Union

import yaml

from runhouse.constants import ENVS_DIR
from runhouse.globals import obj_store

from runhouse.resources.packages import Package
from runhouse.utils import install_conda
from runhouse.utils import install_conda, run_setup_command

from .env import Env

Expand Down Expand Up @@ -62,50 +64,67 @@ def config(self, condensed=True):
def env_name(self):
return self.conda_yaml["name"]

def _create_conda_env(self, force=False):
path = "~/.rh/envs"
subprocess.run(f"mkdir -p {path}", shell=True)

local_env_exists = f"\n{self.env_name} " in subprocess.check_output(
shlex.split("conda info --envs"), shell=False
).decode("utf-8")
yaml_exists = (Path(path).expanduser() / f"{self.env_name}.yml").exists()

if force or not (yaml_exists and local_env_exists):
python_commands = "; ".join(
[
"import yaml",
"from pathlib import Path",
f"path = Path('{path}').expanduser()",
f"yaml.dump({self.conda_yaml}, open(path / '{self.env_name}.yml', 'w'))",
]
)
subprocess.run(f'python -c "{python_commands}"', shell=True)
def _create_conda_env(self, force=False, cluster=None):
yaml_path = Path(ENVS_DIR) / f"{self.env_name}.yml"

if not local_env_exists:
subprocess.run(
f"conda env create -f {path}/{self.env_name}.yml", shell=True
env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
)
run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster)
yaml_exists = (
(Path(ENVS_DIR).expanduser() / f"{self.env_name}.yml").exists()
if not cluster
else run_setup_command(f"ls {yaml_path}", cluster=cluster)[0] == 0
)

if force or not (yaml_exists and env_exists):
# dump config into yaml file on cluster
if not cluster:
python_commands = "; ".join(
[
"import yaml",
"from pathlib import Path",
f"path = Path('{ENVS_DIR}').expanduser()",
f"yaml.dump({self.conda_yaml}, open(path / '{self.env_name}.yml', 'w'))",
]
)
if f"\n{self.env_name} " not in subprocess.check_output(
shlex.split("conda info --envs"), shell=False
).decode("utf-8"):
raise RuntimeError(
f"conda env {self.env_name} not created properly."
)

def install(self, force=False):
"""Locally install packages and run setup commands."""
if not ["python" in dep for dep in self.conda_yaml["dependencies"]]:
base_python_version = (
subprocess.check_output(shlex.split("python --version"), shell=False)
.decode("utf-8")
.split()[1]
subprocess.run(f'python -c "{python_commands}"', shell=True)
else:
contents = yaml.dump(self.conda_yaml)
run_setup_command(f"echo $'{contents}' > {yaml_path}", cluster=cluster)

# create conda env from yaml file
run_setup_command(f"conda env create -f {yaml_path}", cluster=cluster)

env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
)
if not env_exists:
raise RuntimeError(f"conda env {self.env_name} not created properly.")

def install(self, force=False, cluster=None):
"""Locally install packages and run setup commands.
Args:
force (bool, optional): Whether to force re-install env if it has already been installed.
(default: ``False``)
cluster (bool, optional): If None, installs env locally. Otherwise installs remotely
on the cluster using SSH. (default: ``None``)
"""
if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]):
base_python_version = run_setup_command(
"python --version", cluster=cluster, stream_logs=False
)[1].split()[1]
self.conda_yaml["dependencies"].append(f"python=={base_python_version}")
install_conda()
local_env_exists = f"\n{self.env_name} " in subprocess.check_output(
shlex.split("conda info --envs"), shell=False
).decode("utf-8")
install_conda(cluster=cluster)
local_env_exists = (
f"\n{self.env_name} "
in run_setup_command(
"conda info --envs", cluster=cluster, stream_logs=False
)[1]
)

# Hash the config_for_rns to check if we need to create/install the conda env
env_config = self.config()
Expand All @@ -118,25 +137,31 @@ def install(self, force=False):
return
obj_store.installed_envs[install_hash] = self.name

self._create_conda_env()
self._create_conda_env(force=force, cluster=cluster)

if self.reqs:
for package in self.reqs:
if isinstance(package, str):
pkg = Package.from_string(package)
if pkg.install_method == "reqs" and cluster:
pkg = pkg.to(cluster)
elif hasattr(package, "_install"):
pkg = package
else:
raise ValueError(f"package {package} not recognized")

logger.debug(f"Installing package: {str(pkg)}")
pkg._install(self)
if not cluster:
pkg._install(self)
else:
install_cmd = f"{self._run_cmd} {pkg._install_cmd(cluster=cluster)}"
run_setup_command(install_cmd, cluster=cluster)

return (
self._run_command([f"{self.setup_cmds.join(' && ')}"])
if self.setup_cmds
else None
)
if self.setup_cmds:
setup_cmds = f"{self._run_cmd} {self.setup_cmds.join(' && ')}"
run_setup_command(setup_cmds, cluster=cluster)

return

@property
def _run_cmd(self):
Expand Down
12 changes: 8 additions & 4 deletions runhouse/resources/envs/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from runhouse.resources.packages import Package
from runhouse.resources.resource import Resource

from runhouse.utils import run_with_logs
from runhouse.utils import run_setup_command, run_with_logs

from .utils import _env_vars_from_file

Expand Down Expand Up @@ -137,7 +137,7 @@ def _secrets_to(self, system: Union[str, Cluster]):
new_secrets.append(secret.to(system=system, env=self))
return new_secrets

def install(self, force=False):
def install(self, force=False, cluster=None):
"""Locally install packages and run setup commands."""
# Hash the config_for_rns to check if we need to install
env_config = self.config()
Expand All @@ -153,16 +153,20 @@ def install(self, force=False):
for package in self.reqs:
if isinstance(package, str):
pkg = Package.from_string(package)
if cluster and pkg.install_method == "reqs":
pkg = pkg.to(cluster)
elif hasattr(package, "_install"):
pkg = package
else:
raise ValueError(f"package {package} not recognized")

logger.debug(f"Installing package: {str(pkg)}")
pkg._install(self)
pkg._install(self, cluster=cluster)

if self.setup_cmds:
for cmd in self.setup_cmds:
self._run_command(cmd)
cmd = f"{self._run_cmd} {cmd}" if self._run_cmd else cmd
run_setup_command(cmd, cluster=cluster)

def _run_command(self, command: str, **kwargs):
"""Run command locally inside the environment"""
Expand Down
4 changes: 2 additions & 2 deletions runhouse/resources/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ def _get_conda_yaml(conda_env=None):
for dep in conda_yaml["dependencies"]
if isinstance(dep, Dict) and "pip" in dep
]:
conda_yaml["dependencies"].append({"pip": ["ray<=2.4.0,>=2.2.0"]})
conda_yaml["dependencies"].append({"pip": ["ray >= 2.2.0, <= 2.6.3, != 2.6.0"]})
else:
for dep in conda_yaml["dependencies"]:
if (
isinstance(dep, Dict)
and "pip" in dep
and not [pip for pip in dep["pip"] if "ray" in pip]
):
dep["pip"].append("ray<=2.4.0,>=2.2.0")
dep["pip"].append("ray >= 2.2.0, <= 2.6.3, != 2.6.0")
continue
return conda_yaml

Expand Down
5 changes: 3 additions & 2 deletions runhouse/resources/packages/git_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ def __str__(self):
return f"GitPackage: {self.name}"
return f"GitPackage: {self.git_url}@{self.revision}"

def _install(self, env: Union[str, "Env"] = None):
def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
# Clone down the repo
if not Path(self.install_target).exists():
logging.info(f"Cloning: git clone {self.git_url}")

subprocess.run(
["git", "clone", self.git_url],
cwd=Path(self.install_target).expanduser().parent,
Expand All @@ -74,7 +75,7 @@ def _install(self, env: Union[str, "Env"] = None):
check=True,
)
# Use super to install the package
super()._install(env)
super()._install(env, cluster=cluster)

@staticmethod
def from_config(config: dict, dryrun=False):
Expand Down
Loading

0 comments on commit a7ed140

Please sign in to comment.