Skip to content

Commit

Permalink
runtime env, base env, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
carolineechen committed Apr 12, 2024
1 parent a7a046e commit b5b5e3f
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 27 deletions.
3 changes: 0 additions & 3 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,9 @@ def _start_server(
domain=None,
certs_address=None,
use_local_telemetry=False,
<<<<<<< HEAD
api_server_url=None,
=======
default_env=None,
conda_env=None,
>>>>>>> adaa6415 (add default env)
):
############################################
# Build CLI commands to start the server
Expand Down
16 changes: 6 additions & 10 deletions runhouse/resources/envs/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from typing import Dict, List, Optional, Union

from runhouse.globals import obj_store

from runhouse.resources.envs.utils import run_setup_command, run_with_logs
from runhouse.resources.envs.utils import (
_process_env_vars,
run_setup_command,
run_with_logs,
)
from runhouse.resources.folders import Folder
from runhouse.resources.hardware import _get_cluster_from, Cluster
from runhouse.resources.packages import Package
from runhouse.resources.resource import Resource

from .utils import _env_vars_from_file


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -208,11 +208,7 @@ def to(
if new_env.name == Env.DEFAULT_NAME:
new_env.name = system.default_env.name
key = system.put_resource(new_env)
env_vars = (
_env_vars_from_file(self.env_vars)
if isinstance(self.env_vars, str)
else self.env_vars
)
env_vars = _process_env_vars(self.env_vars)
if env_vars:
system.call(key, "_set_env_vars", env_vars)
system.call(key, "install", force=force_install)
Expand Down
7 changes: 7 additions & 0 deletions runhouse/resources/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ def _process_reqs(reqs):
return preprocessed_reqs


def _process_env_vars(env_vars):
processed_vars = (
_env_vars_from_file(env_vars) if isinstance(env_vars, str) else env_vars
)
return processed_vars


def _get_env_from(env):
if isinstance(env, Resource):
return env
Expand Down
25 changes: 19 additions & 6 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,28 @@ def creds_values(self) -> Dict:

@property
def default_env(self):
from runhouse.resources.envs import env
from runhouse.resources.envs import Env

return self._default_env if self._default_env else env()
return (
self._default_env
if self._default_env
else Env(name=Env.DEFAULT_NAME, working_dir="./")
)

@default_env.setter
def default_env(self, env):
self._default_env = _get_env_from(env)
if self.is_up():
self.check_server()
if not self.get(self._default_env.name):
self._sync_default_env_to_cluster()
self.put_resource(self._default_env)
self._default_env.to(self)
self.save_config_to_cluster()

def save_config_to_cluster(self, node: str = None):
logger.info(
"The cluster default env has been updated. "
"Run `cluster.restart_server()` to restart the Runhouse server on the new default env."
)

def save_config_to_cluster(self, node: str = None):
config = self.config(condensed=False)
config.pop("creds")
json_config = f"{json.dumps(config)}"
Expand Down Expand Up @@ -342,6 +348,8 @@ def keep_warm(self):
return self

def _sync_default_env_to_cluster(self):
"""Install and set up the default env requirements on the cluster. This does not put the env resource
on the cluster or initialize the servlet. It also does not set any env vars."""
if not self._default_env:
return

Expand Down Expand Up @@ -827,7 +835,12 @@ def restart_server(
self._start_ray_workers(DEFAULT_RAY_PORT, env=self.default_env)

if default_env:
from runhouse.resources.envs.utils import _process_env_vars

self.put_resource(default_env)
env_vars = _process_env_vars(default_env.env_vars)
if env_vars:
self.call(default_env.name, "_set_env_vars", env_vars)

return status_codes

Expand Down
10 changes: 7 additions & 3 deletions runhouse/resources/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self._env = (
self._system.default_env
if self._system
else Env(name=Env.DEFAULT_NAME)
else Env(name=Env.DEFAULT_NAME, working_dir="./")
)
# If we're creating pointers, we're also local to the class definition and package, so it should be
# set as the workdir (we can do this in a fancier way later)
Expand Down Expand Up @@ -428,7 +428,9 @@ def to(
_get_cluster_from(system, dryrun=self.dryrun) if system else self.system
)
if not env:
if not self.env or (self.env and self.env.name == Env.DEFAULT_NAME):
if not self.env or (
self.env == Env(name=Env.DEFAULT_NAME, working_dir="./")
):
env = system.default_env if system else self.env
else:
env = self.env
Expand Down Expand Up @@ -871,7 +873,9 @@ def resolved_state(self):
def _save_sub_resources(self):
if isinstance(self.system, Resource) and self.system.name:
self.system.save()
if isinstance(self.env, Resource) and self.env.name != Env.DEFAULT_NAME:
if isinstance(self.env, Resource) and self.env != Env(
name=Env.DEFAULT_NAME, working_dir="./"
):
self.env.save()

def rename(self, name: str):
Expand Down
4 changes: 3 additions & 1 deletion runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ async def _add_username_to_span(request: Request, call_next):

if from_test:
await obj_store.ainitialize(
default_env, setup_ray=RaySetupOption.TEST_PROCESS
default_env,
setup_ray=RaySetupOption.TEST_PROCESS,
runtime_env=runtime_env,
)

# TODO disabling due to latency, figure out what to do with this
Expand Down
9 changes: 5 additions & 4 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, *args):
super().__init__("No local object store exists; cannot perform operation.")


def get_cluster_servlet(create_if_not_exists: bool = False):
def get_cluster_servlet(create_if_not_exists: bool = False, runtime_env: Dict = {}):
from runhouse.servers.cluster_servlet import ClusterServlet

if not ray.is_initialized():
Expand All @@ -72,6 +72,7 @@ def get_cluster_servlet(create_if_not_exists: bool = False):
namespace="runhouse",
max_concurrency=1000,
resources={f"node:{current_ip}": 0.001},
runtime_env=runtime_env or None,
)
.remote()
)
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(self):
async def ainitialize(
self,
servlet_name: Optional[str] = None,
runtime_env: Optional[Dict] = {},
has_local_storage: bool = False,
setup_ray: RaySetupOption = RaySetupOption.GET_OR_FAIL,
ray_address: str = "auto",
Expand Down Expand Up @@ -199,7 +201,8 @@ async def ainitialize(
setup_cluster_servlet != ClusterServletSetupOption.GET_OR_FAIL
)
self.cluster_servlet = get_cluster_servlet(
create_if_not_exists=create_if_not_exists
create_if_not_exists=create_if_not_exists,
runtime_env=runtime_env,
)
if self.cluster_servlet is None:
# TODO: logger.<method> is not printing correctly here when doing `runhouse start`.
Expand All @@ -208,8 +211,6 @@ async def ainitialize(
"Warning, cluster servlet is not initialized. Object Store operations will not work."
)

# TODO -- set the servlet name here to be the default env on the cluster

# There are 3 operating modes of the KV store:
# servlet_name is set, has_local_storage is True: This is an EnvServlet with a local KV store.
# servlet_name is set, has_local_storage is False: This is an ObjStore class that is not an EnvServlet,
Expand Down

0 comments on commit b5b5e3f

Please sign in to comment.