Skip to content

Commit

Permalink
[WIP] don't sync .rh directory onto clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Mar 27, 2024
1 parent 7145afb commit d39ba16
Show file tree
Hide file tree
Showing 19 changed files with 183 additions and 103 deletions.
10 changes: 5 additions & 5 deletions docs/api/python/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ be started on the cluster on port :code:`32300`.
Server Authentication
---------------------

If desired, Runhouse provides out-of-the-box authentication via users' Runhouse token (generated when
:ref:`logging in <Login/Logout>`) and set locally at: :code:`~/.rh/config.yaml`). This is crucial if the cluster
has ports open to the public internet, as would usually be the case when using the ``tls`` connection type. You may
also set up your own authentication manually inside of your own code, but you should likely still enable Runhouse
authentication to ensure that even your non-user-facing endpoints into the server are secured.
If desired, Runhouse provides out-of-the-box authentication via users' Runhouse cluster token (generated when
:ref:`logging in <Login/Logout>`). This is crucial if the cluster has ports open to the public internet, as would
usually be the case when using the ``tls`` connection type. You may also set up your own authentication manually
inside of your own code, but you should likely still enable Runhouse authentication to ensure that even your
non-user-facing endpoints into the server are secured.

When :ref:`initializing a cluster <Cluster Factory Method>`, you can set the :code:`den_auth` parameter to :code:`True`
to enable token authentication. Calls to the cluster server can then be made using an auth header with the
Expand Down
5 changes: 2 additions & 3 deletions docs/tutorials/api-resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ Runhouse RNS
Setting Config Options
----------------------

Runhouse stores user configs both locally in ``~/.rh/config.yaml`` and
remotely in the Runhouse database, letting you preserve your same config
across environments.
Runhouse stores user configs locally in ``~/.rh/config.yaml``, letting you preserve your same
config across environments.

Some configs to consider setting:

Expand Down
4 changes: 1 addition & 3 deletions docs/tutorials/api-secrets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,7 @@ Login and Logout

The login flow gives you the option to upload locally detected builtin
provider secrets, or load down saved-down Vault secrets into your local
environment. If loading down new secrets, the location (file or env var)
of the new secrets will be logged in your runhouse config yaml at
``~/.rh/config.yaml`` as well. There are some useful APIs as well for
environment. There are some useful APIs as well for
seeing which secrets you have locally configured or stored in Vault.

.. code:: ipython3
Expand Down
48 changes: 30 additions & 18 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import contextlib
import copy
import importlib
import json
import logging
import re
import subprocess
Expand All @@ -12,6 +11,8 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import yaml

from runhouse.rns.utils.api import ResourceAccess, ResourceVisibility
from runhouse.servers.http.certs import TLSCertConfig

Expand All @@ -23,15 +24,14 @@
from runhouse.constants import (
CLI_RESTART_CMD,
CLI_STOP_CMD,
CLUSTER_CONFIG_PATH,
DEFAULT_HTTP_PORT,
DEFAULT_HTTPS_PORT,
DEFAULT_RAY_PORT,
DEFAULT_SERVER_PORT,
LOCALHOST,
RESERVED_SYSTEM_NAMES,
)
from runhouse.globals import obj_store, rns_client
from runhouse.globals import configs, obj_store, rns_client
from runhouse.resources.envs.utils import _get_env_from
from runhouse.resources.hardware.utils import _current_cluster, ServerConnectionType
from runhouse.resources.resource import Resource
Expand Down Expand Up @@ -111,18 +111,6 @@ def creds_values(self) -> Dict:

return self._creds.values

def save_config_to_cluster(self, node: str = None):
config = self.config(condensed=False)
config.pop("creds")
json_config = f"{json.dumps(config)}"

self.run(
[
f"mkdir -p ~/.rh; touch {CLUSTER_CONFIG_PATH}; echo '{json_config}' > {CLUSTER_CONFIG_PATH}"
],
node=node or "all",
)

def save(
self,
name: str = None,
Expand Down Expand Up @@ -309,6 +297,27 @@ def keep_warm(self):
)
return self

def _save_cluster_token(self):
"""Save user data (including cluster token) to the cluster."""
import shlex

cluster_token = rns_client.base_cluster_token
username = rns_client.username

path_to_file = Path(configs.CLUSTER_TOKEN_PATH)

user_data = {
username: {"rns_address": self.rns_address, "token": cluster_token}
}

yaml_data = yaml.dump(user_data, default_flow_style=False, allow_unicode=True)
token_cmd = f"echo {shlex.quote(yaml_data)} >> {path_to_file}"
self.run([token_cmd])

logger.debug(
f"Saved data to cluster owners file on server in path: {configs.CLUSTER_TOKEN_PATH}"
)

def _sync_runhouse_to_cluster(self, _install_url=None, env=None):
if self.on_this_cluster():
return
Expand Down Expand Up @@ -674,6 +683,7 @@ def _start_ray_workers(self, ray_port):
def restart_server(
self,
_rh_install_url: str = None,
_set_owner: bool = False,
resync_rh: bool = True,
restart_ray: bool = False,
env: Union[str, "Env"] = None,
Expand All @@ -692,6 +702,11 @@ def restart_server(
"""
logger.info(f"Restarting Runhouse API server on {self.name}.")

if _set_owner and rns_client.token and self.rns_address:
# If a Runhouse token is saved locally and rns address exists for the cluster, we can write down the
# user's hashed cluster token to the "cluster_owners" file on the cluster
self._save_cluster_token()

if resync_rh:
self._sync_runhouse_to_cluster(_install_url=_rh_install_url)
logger.debug("Finished syncing Runhouse to cluster.")
Expand Down Expand Up @@ -733,9 +748,6 @@ def restart_server(
)
cluster_cert_path = f"{base_caddy_dir}/{self.cert_config.CERT_NAME}"

# Update the cluster config on the cluster
self.save_config_to_cluster()

cmd = (
CLI_RESTART_CMD
+ (" --restart-ray" if restart_ray else "")
Expand Down
9 changes: 2 additions & 7 deletions runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,7 @@ def up(self):
use_spot=self.use_spot,
)
)
if Path("~/.rh").expanduser().exists():
task.set_file_mounts(
{
"~/.rh": "~/.rh",
}
)

sky.launch(
task,
cluster_name=self.name,
Expand All @@ -419,7 +414,7 @@ def up(self):
raise ValueError(f"Cluster provider {self.provider} not supported.")

self._update_from_sky_status()
self.restart_server()
self.restart_server(_set_owner=True)

return self

Expand Down
10 changes: 1 addition & 9 deletions runhouse/resources/hardware/sagemaker/sagemaker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ def set_connection_defaults(self):
def restart_server(
self,
_rh_install_url: str = None,
_set_owner: bool = False,
resync_rh: bool = True,
restart_ray: bool = True,
env: Union[str, "Env"] = None,
Expand Down Expand Up @@ -1303,15 +1304,6 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non
if not self.client:
self.connect_server_client()

# Sync the local ~/.rh directory to the cluster
self._rsync(
source=str(Path("~/.rh").expanduser()),
dest="~/.rh",
up=True,
contents=True,
)
logger.info("Synced ~/.rh folder to the cluster")

local_rh_package_path = Path(importlib.util.find_spec("runhouse").origin).parent
# local_rh_package_path = Path(pkgutil.get_loader("runhouse").path).parent

Expand Down
49 changes: 49 additions & 0 deletions runhouse/rns/defaults.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import hashlib
import json
import logging
import os
Expand All @@ -20,6 +21,8 @@ class Defaults:
USER_ENDPOINT = "user/"
GROUP_ENDPOINT = "group/"
CONFIG_PATH = Path("~/.rh/config.yaml").expanduser()
CLUSTER_TOKEN_PATH = "~/.rh/cluster_owners.yaml"

# TODO [DG] default sub-dicts for various resources (e.g. defaults.get('cluster').get('resource_type'))
BASE_DEFAULTS = {
"default_folder": "~",
Expand Down Expand Up @@ -62,6 +65,10 @@ def token(self):
def token(self, value):
self._token = value

@property
def cluster_token(self):
return self._get_or_create_cluster_token()

@property
def username(self):
if self._simulate_logged_out:
Expand All @@ -84,6 +91,9 @@ def username(self, value):

@property
def default_folder(self):
if os.environ.get("RH_DEFAULT_FOLDER"):
self._default_folder = os.environ.get("RH_DEFAULT_FOLDER")

if self._simulate_logged_out:
return self.BASE_DEFAULTS["default_folder"]

Expand Down Expand Up @@ -130,6 +140,12 @@ def request_headers(self) -> dict:
"""Base request headers used to make requests to Runhouse Den."""
return {"Authorization": f"Bearer {self.token}"} if self.token else {}

@property
def cluster_request_headers(self) -> dict:
"""Base request headers used to make requests to a cluster."""
cluster_token = self.cluster_token
return {"Authorization": f"Bearer {cluster_token}"} if cluster_token else {}

def upload_defaults(
self,
defaults: Optional[Dict] = None,
Expand Down Expand Up @@ -267,3 +283,36 @@ def data_collection_enabled(self) -> bool:
return False

return True

def load_cluster_token_from_file(self, username: str):
path_to_file = Path(self.CLUSTER_TOKEN_PATH).expanduser()
if not path_to_file.exists():
# File will only exist if loading on a cluster and if `_set_owner=True`, rns address for cluster exists,
# and user has Den token saved locally
return

with open(path_to_file, "r") as f:
data = yaml.safe_load(f)
saved_cluster_token = data.get(username, {}).get("token")
return saved_cluster_token

def _get_or_create_cluster_token(
self, den_token: str = None, resource_address: str = None, username: str = None
):
if den_token and resource_address and username:
# If specific values are passed in, use those to build the token
return self._build_token_hash(den_token, resource_address, username)

den_token = self.token
username = self.username
if den_token is None or username is None:
return None

# Return the user's self-owned cluster token
return self._build_token_hash(den_token, username, username)

@staticmethod
def _build_token_hash(den_token: str, resource_address: str, username: str):
hash_input = (den_token + resource_address).encode("utf-8")
hash_hex = hashlib.sha256(hash_input).hexdigest()
return f"{hash_hex}+{resource_address}+{username}"
33 changes: 19 additions & 14 deletions runhouse/rns/rns_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import hashlib
import importlib
import json
import logging
Expand Down Expand Up @@ -147,6 +146,10 @@ def current_folder(self, value):
def token(self):
return self._configs.token

@property
def base_cluster_token(self):
return self._configs.cluster_token

@property
def username(self):
return self._configs.get("username", None)
Expand Down Expand Up @@ -199,12 +202,9 @@ def request_headers(
) -> Union[dict, None]:
"""Returns the authentication headers to use for requests made to Den or to a cluster.
If the request is being made to Den, we simply construct the request headers with the user's existing
Runhouse token.
If the request is being made to (or from) a cluster, we generate a new unique token to prevent exposing the
user's original Runhouse token on the cluster. This new token is based on the user's existing Den token and
the Den address of the resource (or cluster API) they are attempting to access.
We generate a new unique token to prevent exposing the user's original Runhouse token on the cluster.
This new token is based on the user's existing Den token and the Den address of the resource (or cluster API)
they are attempting to access.
For example, if userA tries to access a function on a cluster that was shared with them by userB, we generate a
new token containing userA's Den token and top level directory associated with the
Expand Down Expand Up @@ -235,8 +235,9 @@ def request_headers(
return None

if headers is None:
# Use the default headers (i.e. the user's original Den token)
headers: dict = self._configs.request_headers
# Use the default cluster headers, which includes the hash of the user's den token
# We can use this to authenticate requests to a cluster + Den
headers: dict = self._configs.cluster_request_headers

if not headers:
# TODO: allow this? means we failed to load token from configs
Expand All @@ -262,18 +263,22 @@ def request_headers(
"Failed to extract token from request auth header. Expected in format: Bearer <token>"
)

hashed_token = self.cluster_token(den_token, resource_address)
hashed_token = self.cluster_token_from_resource_address(
den_token, resource_address
)

return {"Authorization": f"Bearer {hashed_token}"}

def cluster_token(self, den_token: str, resource_address: str):
def cluster_token_from_resource_address(
self, den_token: str, resource_address: str
):
if "/" in resource_address:
# If provided as a full rns address, extract the top level directory
resource_address = self.base_folder(resource_address)

hash_input = (den_token + resource_address).encode("utf-8")
hash_hex = hashlib.sha256(hash_input).hexdigest()
return f"{hash_hex}+{resource_address}+{self._configs.username}"
return self._configs._get_or_create_cluster_token(
den_token, resource_address, username=self._configs.username
)

def resource_request_payload(self, payload) -> dict:
payload = remove_null_values_from_dict(payload)
Expand Down
5 changes: 4 additions & 1 deletion runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ def resource_access_level(self, token: str, resource_uri: str) -> Union[str, Non
# they have access to everything
if configs.token and (
configs.token == token
or rns_client.cluster_token(configs.token, resource_uri) == token
or rns_client.cluster_token_from_resource_address(
configs.token, resource_uri
)
== token
):
return ResourceAccess.WRITE
return self._auth_cache.lookup_access_level(token, resource_uri)
Expand Down
Loading

0 comments on commit d39ba16

Please sign in to comment.