Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 116 additions & 109 deletions pandas/tests/io/test_compression.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import gzip
import io
import os
from pathlib import Path
import subprocess
import sys
import tarfile
Expand Down Expand Up @@ -31,16 +30,16 @@
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_compression_size(obj, method, compression_only):
def test_compression_size(obj, method, compression_only, temp_file):
if compression_only == "tar":
compression_only = {"method": "tar", "mode": "w:gz"}

with tm.ensure_clean() as path:
getattr(obj, method)(path, compression=compression_only)
compressed_size = os.path.getsize(path)
getattr(obj, method)(path, compression=None)
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size
path = temp_file
getattr(obj, method)(path, compression=compression_only)
compressed_size = os.path.getsize(path)
getattr(obj, method)(path, compression=None)
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size


@pytest.mark.parametrize(
Expand All @@ -54,22 +53,25 @@ def test_compression_size(obj, method, compression_only):
],
)
@pytest.mark.parametrize("method", ["to_csv", "to_json"])
def test_compression_size_fh(obj, method, compression_only):
with tm.ensure_clean() as path:
with icom.get_handle(
path,
"w:gz" if compression_only == "tar" else "w",
compression=compression_only,
) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
compressed_size = os.path.getsize(path)
with tm.ensure_clean() as path:
with icom.get_handle(path, "w", compression=None) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
uncompressed_size = os.path.getsize(path)
assert uncompressed_size > compressed_size
def test_compression_size_fh(obj, method, compression_only, temp_file):
path = temp_file
with icom.get_handle(
path,
"w:gz" if compression_only == "tar" else "w",
compression=compression_only,
) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
compressed_size = os.path.getsize(path)

# Create a new temporary file for uncompressed comparison
path2 = temp_file.parent / f"{temp_file.stem}_uncompressed{temp_file.suffix}"
path2.touch()
with icom.get_handle(path2, "w", compression=None) as handles:
getattr(obj, method)(handles.handle)
assert not handles.handle.closed
uncompressed_size = os.path.getsize(path2)
assert uncompressed_size > compressed_size


@pytest.mark.parametrize(
Expand All @@ -81,14 +83,19 @@ def test_compression_size_fh(obj, method, compression_only):
],
)
def test_dataframe_compression_defaults_to_infer(
write_method, write_kwargs, read_method, compression_only, compression_to_extension
write_method,
write_kwargs,
read_method,
compression_only,
compression_to_extension,
temp_file,
):
# GH22004
input = pd.DataFrame([[1.0, 0, -4], [3.4, 5, 2]], columns=["X", "Y", "Z"])
extension = compression_to_extension[compression_only]
with tm.ensure_clean("compressed" + extension) as path:
getattr(input, write_method)(path, **write_kwargs)
output = read_method(path, compression=compression_only)
path = temp_file.parent / f"compressed{extension}"
getattr(input, write_method)(path, **write_kwargs)
output = read_method(path, compression=compression_only)
tm.assert_frame_equal(output, input)


Expand All @@ -107,37 +114,38 @@ def test_series_compression_defaults_to_infer(
read_kwargs,
compression_only,
compression_to_extension,
temp_file,
):
# GH22004
input = pd.Series([0, 5, -2, 10], name="X")
extension = compression_to_extension[compression_only]
with tm.ensure_clean("compressed" + extension) as path:
getattr(input, write_method)(path, **write_kwargs)
if "squeeze" in read_kwargs:
kwargs = read_kwargs.copy()
del kwargs["squeeze"]
output = read_method(path, compression=compression_only, **kwargs).squeeze(
"columns"
)
else:
output = read_method(path, compression=compression_only, **read_kwargs)
path = temp_file.parent / f"compressed{extension}"
getattr(input, write_method)(path, **write_kwargs)
if "squeeze" in read_kwargs:
kwargs = read_kwargs.copy()
del kwargs["squeeze"]
output = read_method(path, compression=compression_only, **kwargs).squeeze(
"columns"
)
else:
output = read_method(path, compression=compression_only, **read_kwargs)
tm.assert_series_equal(output, input, check_names=False)


def test_compression_warning(compression_only):
def test_compression_warning(compression_only, temp_file):
# Assert that passing a file object to to_csv while explicitly specifying a
# compression protocol triggers a RuntimeWarning, as per GH21227.
df = pd.DataFrame(
100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
columns=["X", "Y", "Z"],
)
with tm.ensure_clean() as path:
with icom.get_handle(path, "w", compression=compression_only) as handles:
with tm.assert_produces_warning(RuntimeWarning, match="has no effect"):
df.to_csv(handles.handle, compression=compression_only)
path = temp_file
with icom.get_handle(path, "w", compression=compression_only) as handles:
with tm.assert_produces_warning(RuntimeWarning, match="has no effect"):
df.to_csv(handles.handle, compression=compression_only)


def test_compression_binary(compression_only):
def test_compression_binary(compression_only, temp_file):
"""
Binary file handles support compression.

Expand All @@ -150,13 +158,13 @@ def test_compression_binary(compression_only):
)

# with a file
with tm.ensure_clean() as path:
with open(path, mode="wb") as file:
df.to_csv(file, mode="wb", compression=compression_only)
file.seek(0) # file shouldn't be closed
tm.assert_frame_equal(
df, pd.read_csv(path, index_col=0, compression=compression_only)
)
path = temp_file
with open(path, mode="wb") as file:
df.to_csv(file, mode="wb", compression=compression_only)
file.seek(0) # file shouldn't be closed
tm.assert_frame_equal(
df, pd.read_csv(path, index_col=0, compression=compression_only)
)

# with BytesIO
file = io.BytesIO()
Expand All @@ -167,7 +175,7 @@ def test_compression_binary(compression_only):
)


def test_gzip_reproducibility_file_name():
def test_gzip_reproducibility_file_name(temp_file):
"""
Gzip should create reproducible archives with mtime.

Expand All @@ -183,13 +191,12 @@ def test_gzip_reproducibility_file_name():
compression_options = {"method": "gzip", "mtime": 1}

# test for filename
with tm.ensure_clean() as path:
path = Path(path)
df.to_csv(path, compression=compression_options)
time.sleep(0.1)
output = path.read_bytes()
df.to_csv(path, compression=compression_options)
assert output == path.read_bytes()
path = temp_file
df.to_csv(path, compression=compression_options)
time.sleep(0.1)
output = path.read_bytes()
df.to_csv(path, compression=compression_options)
assert output == path.read_bytes()


def test_gzip_reproducibility_file_object():
Expand Down Expand Up @@ -259,14 +266,14 @@ def test_with_missing_lzma_runtime():
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_gzip_compression_level(obj, method):
def test_gzip_compression_level(obj, method, temp_file):
# GH33196
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression="gzip")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
path = temp_file
getattr(obj, method)(path, compression="gzip")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast


@pytest.mark.parametrize(
Expand All @@ -280,15 +287,15 @@ def test_gzip_compression_level(obj, method):
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_xz_compression_level_read(obj, method):
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression="xz")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "xz", "preset": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
if method == "to_csv":
pd.read_csv(path, compression="xz")
def test_xz_compression_level_read(obj, method, temp_file):
path = temp_file
getattr(obj, method)(path, compression="xz")
compressed_size_default = os.path.getsize(path)
getattr(obj, method)(path, compression={"method": "xz", "preset": 1})
compressed_size_fast = os.path.getsize(path)
assert compressed_size_default < compressed_size_fast
if method == "to_csv":
pd.read_csv(path, compression="xz")


@pytest.mark.parametrize(
Expand All @@ -302,13 +309,13 @@ def test_xz_compression_level_read(obj, method):
],
)
@pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
def test_bzip_compression_level(obj, method):
def test_bzip_compression_level(obj, method, temp_file):
"""GH33196 bzip needs file size > 100k to show a size difference between
compression levels, so here we just check if the call works when
compression is passed as a dict.
"""
with tm.ensure_clean() as path:
getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1})
path = temp_file
getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1})


@pytest.mark.parametrize(
Expand All @@ -318,21 +325,21 @@ def test_bzip_compression_level(obj, method):
(".tar", tarfile.TarFile),
],
)
def test_empty_archive_zip(suffix, archive):
with tm.ensure_clean(filename=suffix) as path:
with archive(path, "w"):
pass
with pytest.raises(ValueError, match="Zero files found"):
pd.read_csv(path)
def test_empty_archive_zip(suffix, archive, temp_file):
path = temp_file.parent / f"archive{suffix}"
with archive(path, "w"):
pass
with pytest.raises(ValueError, match="Zero files found"):
pd.read_csv(path)


def test_ambiguous_archive_zip():
with tm.ensure_clean(filename=".zip") as path:
with zipfile.ZipFile(path, "w") as file:
file.writestr("a.csv", "foo,bar")
file.writestr("b.csv", "foo,bar")
with pytest.raises(ValueError, match="Multiple files found in ZIP file"):
pd.read_csv(path)
def test_ambiguous_archive_zip(temp_file):
path = temp_file.parent / "archive.zip"
with zipfile.ZipFile(path, "w") as file:
file.writestr("a.csv", "foo,bar")
file.writestr("b.csv", "foo,bar")
with pytest.raises(ValueError, match="Multiple files found in ZIP file"):
pd.read_csv(path)


def test_ambiguous_archive_tar(tmp_path):
Expand All @@ -352,24 +359,24 @@ def test_ambiguous_archive_tar(tmp_path):
pd.read_csv(tarpath)


def test_tar_gz_to_different_filename():
with tm.ensure_clean(filename=".foo") as file:
pd.DataFrame(
[["1", "2"]],
columns=["foo", "bar"],
).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False)
with gzip.open(file) as uncompressed:
with tarfile.TarFile(fileobj=uncompressed) as archive:
members = archive.getmembers()
assert len(members) == 1
content = archive.extractfile(members[0]).read().decode("utf8")

if is_platform_windows():
expected = "foo,bar\r\n1,2\r\n"
else:
expected = "foo,bar\n1,2\n"

assert content == expected
def test_tar_gz_to_different_filename(temp_file):
file = temp_file.parent / "archive.foo"
pd.DataFrame(
[["1", "2"]],
columns=["foo", "bar"],
).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False)
with gzip.open(file) as uncompressed:
with tarfile.TarFile(fileobj=uncompressed) as archive:
members = archive.getmembers()
assert len(members) == 1
content = archive.extractfile(members[0]).read().decode("utf8")

if is_platform_windows():
expected = "foo,bar\r\n1,2\r\n"
else:
expected = "foo,bar\n1,2\n"

assert content == expected


def test_tar_no_error_on_close():
Expand Down
Loading
Loading