Skip to content

Commit

Permalink
Merge branch 'test-cluster-folder-methods' into use-cluster-folder-apis
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Aug 8, 2024
2 parents 5e8e668 + a573fd2 commit d13f270
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 195 deletions.
255 changes: 136 additions & 119 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def local_path(self):
else:
return None

@property
def _use_http_endpoint(self):
"""Whether to use system APIs to perform folder operations on the cluster via HTTP."""
return isinstance(self.system, Cluster) and not self.system.on_this_cluster()

def mv(self, system, path: Optional[str] = None) -> None:
"""Move the folder to a new filesystem or cluster.
Expand All @@ -142,37 +147,37 @@ def mv(self, system, path: Optional[str] = None) -> None:
if path is None:
raise ValueError("A destination path must be specified.")

src_path = Path(self.path)
src_path = Path()
dest_path = Path(path)

if isinstance(self.system, Cluster):
return self.system._mv(path=src_path, dest_path=dest_path)

dest_path = dest_path.expanduser()
src_path = src_path.expanduser()
if self._use_http_endpoint:
self.system._mv(path=src_path, dest_path=dest_path)
else:
dest_path = dest_path.expanduser()
src_path = src_path.expanduser()

if not src_path.exists():
raise FileNotFoundError(f"The source path {src_path} does not exist.")
if not src_path.exists():
raise FileNotFoundError(f"The source path {src_path} does not exist.")

if system == self.DEFAULT_FS:
if dest_path.exists():
raise FileExistsError(
f"The destination path {dest_path} already exists."
)
if system == self.DEFAULT_FS:
if dest_path.exists():
raise FileExistsError(
f"The destination path {dest_path} already exists."
)

# Create the destination directory if it doesn't exist
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Create the destination directory if it doesn't exist
dest_path.parent.mkdir(parents=True, exist_ok=True)

# Move the directory
shutil.move(str(src_path), str(dest_path))
# Move the directory
shutil.move(str(src_path), str(dest_path))

# Update the path attribute
self.path = str(dest_path)
self.system = self.DEFAULT_FS
# Update the path attribute
self.path = str(dest_path)
self.system = self.DEFAULT_FS

else:
# TODO [JL] support moving to other systems
raise NotImplementedError(f"System {system} not supported for local mv")
else:
# TODO [JL] support moving to other systems
raise NotImplementedError(f"System {system} not supported for local mv")

def to(
self,
Expand Down Expand Up @@ -293,11 +298,10 @@ def _to_data_store(

def mkdir(self):
"""Create the folder in specified file system if it doesn't already exist."""
path = Path(self.path)
if isinstance(self.system, Cluster):
self.system._mkdir(path)
if self._use_http_endpoint:
self.system._mkdir(self.path)
else:
path = path.expanduser()
path = Path(self.path).expanduser()
if not path.exists():
path.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -509,45 +513,52 @@ def ls(self, full_paths: bool = True, sort: bool = False) -> List:
sort (Optional[bool]): Whether to sort the folder contents by time modified.
Defaults to ``False``.
"""
path = Path(self.path)
if isinstance(self.system, Cluster):
return self.system._ls(path)

paths = [p for p in path.expanduser().iterdir()]
if self._use_http_endpoint:
self.system._ls(self.path)
else:
path = Path(self.path).expanduser()
paths = [p for p in path.iterdir()]

# Sort the paths by modification time if sort is True
if sort:
paths.sort(key=lambda p: p.stat().st_mtime, reverse=True)
# Sort the paths by modification time if sort is True
if sort:
paths.sort(key=lambda p: p.stat().st_mtime, reverse=True)

# Convert paths to strings and format them based on full_paths
if full_paths:
return [str(p.resolve()) for p in paths]
else:
return [p.name for p in paths]
# Convert paths to strings and format them based on full_paths
if full_paths:
return [str(p.resolve()) for p in paths]
else:
return [p.name for p in paths]

def resources(self, full_paths: bool = False):
"""List the resources in the *RNS* folder.
"""List the resources in the folder.
Example:
>>> resources = my_folder.resources()
"""
try:
resources = [
path
for path in (
self.system._ls(path=Path(path))
if isinstance(self.system, Cluster)
else self.ls()
)
if (Path(path) / "config.json").exists()
]
except FileNotFoundError:
return []

if full_paths:
return [self.rns_address + "/" + Path(path).stem for path in resources]
else:
return [Path(path).stem for path in resources]
if self._use_http_endpoint:
paths = self.system._ls(path=self.path, full_paths=full_paths)
resources = [
path
for path in paths
if self.system._exists(path=f"{path}/config.json")
]
return resources
else:
resources = [
path for path in self.ls() if (Path(path) / "config.json").exists()
]
if full_paths:
return [
self.rns_address + "/" + Path(path).stem for path in resources
]
else:
return [Path(path).stem for path in resources]

except Exception as e:
logger.error(f"Error listing resources in folder: {e}")
return []

@property
def rns_address(self):
Expand Down Expand Up @@ -674,7 +685,7 @@ def get(self, name, mode="rb", encoding=None):
Example:
>>> contents = my_folder.get(file_name)
"""
if isinstance(self.system, Cluster):
if self._use_http_endpoint:
return self.system._get(
path=Path(self.path) / name, mode=mode, encoding=encoding
)
Expand All @@ -694,8 +705,11 @@ def exists_in_system(self):
Example:
>>> exists_on_system = my_folder.exists_in_system()
"""
full_path = Path(self.path).expanduser()
return full_path.exists() and full_path.is_dir()
if self._use_http_endpoint:
return self.system._exists(self.path)
else:
full_path = Path(self.path).expanduser()
return full_path.exists() and full_path.is_dir()

def rm(self, contents: list = None, recursive: bool = True):
"""Delete a folder from the file system. Optionally provide a list of folder contents to delete.
Expand All @@ -708,37 +722,36 @@ def rm(self, contents: list = None, recursive: bool = True):
Example:
>>> my_folder.rm()
"""
path = Path(self.path)
if isinstance(self.system, Cluster):
return self.system._rm(path, contents, recursive)

folder_path = path.expanduser()
if contents:
for content in contents:
content_path = folder_path / content
if content_path.exists():
if content_path.is_file():
content_path.unlink()
elif content_path.is_dir() and recursive:
shutil.rmtree(content_path)
else:
raise ValueError(
f"Path {content_path} is a directory and recursive is set to False"
)
if self._use_http_endpoint:
self.system._rm(self.path, contents, recursive)
else:
if recursive:
shutil.rmtree(folder_path)
else:
if folder_path.is_dir():
for item in folder_path.iterdir():
if item.is_file():
item.unlink()
folder_path = Path(self.path).expanduser()
if contents:
for content in contents:
content_path = folder_path / content
if content_path.exists():
if content_path.is_file():
content_path.unlink()
elif content_path.is_dir() and recursive:
shutil.rmtree(content_path)
else:
raise ValueError(
f"Folder {item} found in {folder_path}, recursive is set to False"
f"Path {content_path} is a directory and recursive is set to False"
)
else:
if recursive:
shutil.rmtree(folder_path)
else:
folder_path.unlink()
if folder_path.is_dir():
for item in folder_path.iterdir():
if item.is_file():
item.unlink()
else:
raise ValueError(
f"Folder {item} found in {folder_path}, recursive is set to False"
)
else:
folder_path.unlink()

def put(self, contents, overwrite=False, mode: str = "wb"):
"""Put given contents in folder.
Expand All @@ -754,51 +767,55 @@ def put(self, contents, overwrite=False, mode: str = "wb"):
Example:
>>> my_folder.put(contents={"filename.txt": data})
"""

# Handle lists of resources just for convenience
if isinstance(contents, list):
for resource in contents:
self.put(resource, overwrite=overwrite)
return

if isinstance(self.system, Cluster):
return self.system._put(
path=Path(self.path), contents=contents, overwrite=overwrite, mode=mode
if self._use_http_endpoint:
self.system._put(
path=self.path, contents=contents, overwrite=overwrite, mode=mode
)

full_path = str(Path(self.path).expanduser())
if isinstance(contents, Folder):
if contents.path is None: # Should only be the case when Folder is created
contents.path = os.path.join(full_path, contents.name)
return

if not isinstance(contents, dict):
raise TypeError(
"`contents` argument must be a dict mapping filenames to file-like objects"
)

if overwrite is False:
# Check if files exist and raise an error if they do
existing_files = set(os.listdir(full_path))
intersection = existing_files.intersection(set(contents.keys()))
if intersection:
raise FileExistsError(
f"File(s) {intersection} already exist(s) at path: {full_path}. "
f"Cannot save them with overwrite={overwrite}."
else:
self.mkdir()
full_path = str(Path(self.path).expanduser())
if isinstance(contents, Folder):
if (
contents.path is None
): # Should only be the case when Folder is created
contents.path = os.path.join(full_path, contents.name)
return

if not isinstance(contents, dict):
raise TypeError(
"`contents` argument must be a dict mapping filenames to file-like objects"
)

for filename, file_obj in contents.items():
file_obj = self._serialize_file_obj(file_obj)
file_path = Path(full_path) / filename
if not overwrite and file_path.exists():
raise FileExistsError(f"File {file_path} already exists.")
if overwrite is False:
# Check if files exist and raise an error if they do
existing_files = set(os.listdir(full_path))
intersection = existing_files.intersection(set(contents.keys()))
if intersection:
raise FileExistsError(
f"File(s) {intersection} already exist(s) at path: {full_path}. "
f"Cannot save them with overwrite={overwrite}."
)

try:
with open(file_path, mode) as f:
f.write(file_obj)
for filename, file_obj in contents.items():
file_obj = self._serialize_file_obj(file_obj)
file_path = Path(full_path) / filename
if not overwrite and file_path.exists():
raise FileExistsError(f"File {file_path} already exists.")

except Exception as e:
raise RuntimeError(f"Failed to write {filename} to {file_path}: {e}")
try:
with open(file_path, mode) as f:
f.write(file_obj)

except Exception as e:
raise RuntimeError(
f"Failed to write {filename} to {file_path}: {e}"
)

@staticmethod
def _serialize_file_obj(file_obj):
Expand Down
20 changes: 14 additions & 6 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,14 +1787,14 @@ def _enable_or_update_status_check(
##############################################
# Folder Operations
##############################################
def _ls(self, path: Path, full_paths: bool = True, sort: bool = False):
def _ls(self, path: Union[str, Path], full_paths: bool = True, sort: bool = False):
return self.client.folder_operation(
operation="ls", path=path, full_paths=full_paths, sort=sort
)

def _get(
self,
path: Path,
path: Union[str, Path],
mode: str = "rb",
encoding: str = None,
):
Expand All @@ -1807,7 +1807,7 @@ def _get(

def _put(
self,
path: Path,
path: Union[str, Path],
contents: Union[Dict[str, Any], Resource, List[Resource]],
mode: str = "wb",
overwrite: bool = False,
Expand All @@ -1822,15 +1822,23 @@ def _put(
serialization=serialization,
)

def _rm(self, path: Path, contents: List[str] = None, recursive: bool = False):
def _rm(
self,
path: Union[str, Path],
contents: List[str] = None,
recursive: bool = False,
):
return self.client.folder_operation(
operation="rm", path=path, contents=contents, recursive=recursive
)

def _mkdir(self, path: Path):
def _mkdir(self, path: Union[str, Path]):
return self.client.folder_operation(operation="mkdir", path=path)

def _mv(self, path: Path, dest_path: Path):
def _mv(self, path: Union[str, Path], dest_path: Union[str, Path]):
return self.client.folder_operation(
operation="mv", path=path, dest_path=dest_path
)

def _exists(self, path: Union[str, Path]):
return self.client.folder_operation(operation="exists", path=path)
Loading

0 comments on commit d13f270

Please sign in to comment.