Skip to content

Commit

Permalink
Merge pull request #15 from sertit/fix/extract_file_zip_without_direc…
Browse files Browse the repository at this point in the history
…tory

Fix/extract file zip without directory
  • Loading branch information
remi-braun committed Apr 10, 2024
2 parents 3aca8e5 + 6526932 commit 0248105
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.36.2 (2024-mm-dd)

- FIX: Fix `files.extract_file` when there are only files in the root of the zip archive ([#14](https://github.com/sertit/sertit-utils/pull/14))
- FIX: FIX metadata handling with `rasters_rio.read` when reading with indexes

## 1.36.1 (2024-03-26)
Expand Down
21 changes: 13 additions & 8 deletions CI/SCRIPTS/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ def test_archive():
# Archives
zip_file = files_path().joinpath("test_zip.zip")
zip2_file = files_path().joinpath("test_zip.zip") # For overwrite
zip_without_directory = files_path().joinpath("test_zip_without_directory.zip")
tar_file = files_path().joinpath("test_tar.tar")
tar_gz_file = files_path().joinpath("test_targz.tar.gz")

# Core dir
core_dir = files_path().joinpath("core")
folder = core_dir
archives = [zip_file, tar_file, tar_gz_file, folder, zip2_file]
archives = [
zip_file,
tar_file,
tar_gz_file,
folder,
zip2_file,
zip_without_directory,
]

# Extract
extracted_dirs = files.extract_files(archives, tmp_dir, overwrite=True)
Expand All @@ -59,12 +67,9 @@ def test_archive():
folder_path=core_dir, archive_path=archive_base, fmt=fmt
)
out = files.extract_file(archive_fn, tmp_dir)
if fmt == "zip":
ci.assert_dir_equal(core_dir, out)
else:
# For tar and tar.gz, an additional folder is created because these formats dont have any file tree
out_dir = path.listdir_abspath(out)[0]
ci.assert_dir_equal(core_dir, out_dir)
# an additional folder is created
out_dir = path.listdir_abspath(out)[0]
ci.assert_dir_equal(core_dir, out_dir)

# Remove out directory in order to avoid any interferences
files.remove(out)
Expand All @@ -79,7 +84,7 @@ def test_archive():

# Extract
unzip_out = os.path.join(tmp_dir, "out")
files.extract_file(zip_out, unzip_out)
unzip_out = files.extract_file(zip_out, unzip_out)

# Test
unzip_dirs = path.listdir_abspath(unzip_out)
Expand Down
122 changes: 43 additions & 79 deletions sertit/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from lxml import etree, html
from tqdm import tqdm

from sertit import AnyPath, logs, misc, path
from sertit import AnyPath, logs, path
from sertit.logs import SU_NAME
from sertit.types import AnyPathStrType, AnyPathType

Expand Down Expand Up @@ -169,24 +169,24 @@ def extract_file(
file_path: AnyPathStrType,
output: AnyPathStrType,
overwrite: bool = False,
) -> Union[list, AnyPathType]:
) -> AnyPathType:
"""
Extract an archived file (zip or others). Overwrites if specified.
For zipfiles, in case of multiple folders archived, pay attention that what is returned is the first folder.
If the archive don't contain a root directory with the name of the archive without the extension, create it
Args:
file_path (str): Archive file path
output (str): Output where to put the extracted file
overwrite (bool): Overwrite found extracted files
output (str): Output where to put the extracted directory
overwrite (bool): Overwrite found extracted directory
Returns:
Union[list, AnyPathType]: Extracted file paths (as str if only one)
AnyPathType: Extracted directory paths
Example:
>>> file_path = 'D:/path/to/zip.zip'
>>> output = 'D:/path/to/output'
>>> extract_file(file_path, output, overwrite=True)
D:/path/to/output.zip'
D:/path/to/output/zip'
"""
# Convert to path
file_path = AnyPath(file_path)
Expand All @@ -196,86 +196,50 @@ def extract_file(
if file_path.is_dir():
return file_path

# Manage archive type
if file_path.suffix == ".zip":
# Manage the case with several directories inside one zipfile
arch = zipfile.ZipFile(file_path, "r")
# Beware with .SEN3 and .SAFE extensions
archive_output = output.joinpath(path.get_filename(file_path))

# zipfile.namelist returns the relative path of the file names in the archive
# if a file is in the root of the archive, there is no "/", so it should not be included
extr_names = list({p.split("/")[0] for p in arch.namelist() if "/" in p})
elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]:
# Tar files have no subdirectories, so create one
extr_names = [path.get_filename(file_path)]
arch = tarfile.open(file_path, "r")
else:
raise TypeError(
f"Only .zip, .tar and .tar.gz files can be extracted, not {file_path}"
# In case not overwrite and the extracted directory already exists
if not overwrite and archive_output.exists():
LOGGER.debug(
"Already existing extracted %s. It won't be overwritten.",
archive_output,
)
return archive_output

# Get extracted list
extr_dirs = [output.joinpath(extr_name) for extr_name in extr_names]

# Loop over basedirs from inside the archive
for extr_dir in extr_dirs:
extr_name = extr_dir.name
# Manage overwriting
if extr_dir.is_dir():
if overwrite:
LOGGER.debug(
"Already existing extracted %s. It will be overwritten as asked.",
extr_names,
)
remove(extr_dir)
else:
LOGGER.debug(
"Already existing extracted %s. It won't be overwritten.",
extr_names,
)
def extract_sub_dir(arch, filename_list):
top_level_files = list({item.split("/")[0] for item in filename_list})

# When the only root directory in the archive has the right name, we don't have to create it
if len(top_level_files) == 1 and archive_output.name == path.get_filename(
top_level_files[0]
):
arch.extractall(archive_output.parent)
archive_output.parent.joinpath(top_level_files[0]).rename(archive_output)
else:
LOGGER.info("Extracting %s", extr_names)
# Inside docker, extracting files is really slow -> copy the archive in a tmp directory
with tempfile.TemporaryDirectory() as tmp_dir:
if misc.in_docker():
# Create a tmp directory
copy(file_path, tmp_dir)
file_path = os.path.join(tmp_dir, os.path.basename(file_path))
tmp_extr_output = tmp_dir

# Recreate dir with tmp output
tmp_extr_dir = os.path.join(tmp_extr_output, extr_name)
else:
tmp_extr_output = output
tmp_extr_dir = extr_dir

if str(file_path).endswith(".zip"):
members = [name for name in arch.namelist() if extr_name in name]
else:
members = arch.getmembers() # Always extract all files for TAR data

# Tar files do not contain a file tree
tmp_extr_output = tmp_extr_dir

# Extract product
try:
os.makedirs(tmp_extr_dir, exist_ok=True)
arch.extractall(path=tmp_extr_output, members=members)
except tarfile.ReadError as ex:
raise TypeError(f"Impossible to extract {file_path}") from ex

# Copy back if we are running inside docker and clean tmp dir
if misc.in_docker():
copy(tmp_extr_dir, extr_dir)
arch.extractall(archive_output)

# Close archive
arch.close()
# Manage archive type
if file_path.suffix == ".zip":
with zipfile.ZipFile(file_path, "r") as zip_file:
extract_sub_dir(zip_file, zip_file.namelist())
elif file_path.suffix == ".tar" or file_path.suffixes == [".tar", ".gz"]:
with tarfile.open(file_path, "r") as tar_file:
extract_sub_dir(tar_file, tar_file.getnames())
elif file_path.suffix == ".7z":
try:
import py7zr

# Return str for compatibility reasons
if len(extr_dirs) == 1:
extr_dirs = extr_dirs[0]
with py7zr.SevenZipFile(file_path, "r") as z7_file:
extract_sub_dir(z7_file, z7_file.getnames())
except ModuleNotFoundError:
raise TypeError("Please install 'py7zr' to extract .7z files")
else:
raise TypeError(
f"Only .zip, .tar, .tar.gz and .7z files can be extracted, not {file_path}"
)

return extr_dirs
return archive_output


def extract_files(
Expand Down

0 comments on commit 0248105

Please sign in to comment.