From e54dfba997932918b2137920ad81787c62521c1d Mon Sep 17 00:00:00 2001 From: "guangli.bao" Date: Thu, 27 Nov 2025 23:42:35 +0800 Subject: [PATCH 1/2] ut for audio and vision Signed-off-by: guangli.bao --- tests/unit/extras/__init__.py | 0 tests/unit/extras/test_audio.py | 212 ++++++++++++++++++++++ tests/unit/extras/test_vision.py | 291 +++++++++++++++++++++++++++++++ 3 files changed, 503 insertions(+) create mode 100644 tests/unit/extras/__init__.py create mode 100644 tests/unit/extras/test_audio.py create mode 100644 tests/unit/extras/test_vision.py diff --git a/tests/unit/extras/__init__.py b/tests/unit/extras/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/extras/test_audio.py b/tests/unit/extras/test_audio.py new file mode 100644 index 000000000..47589c633 --- /dev/null +++ b/tests/unit/extras/test_audio.py @@ -0,0 +1,212 @@ +import tempfile +import wave +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest +import torch + +from guidellm.extras.audio import encode_audio + + +@pytest.fixture +def sample_audio_tensor(): + sample_rate = 16000 + t = torch.linspace(0, 1, sample_rate) + return 0.3 * torch.sin(2 * np.pi * 440 * t).unsqueeze(0) + + +@pytest.fixture +def sample_wav_file(sample_audio_tensor): + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + f.write(b"fake_wav_content") + temp_path = Path(f.name) + yield temp_path + + if temp_path.exists(): + temp_path.unlink() + + +@pytest.fixture +def real_wav_file(): + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + sample_rate = 16000 + duration = 1.0 + t = np.linspace(0, duration, int(sample_rate * duration)) + audio_data = (np.sin(2 * np.pi * 440 * t) * 32767).astype(np.int16) + + with wave.open(f.name, "wb") as wav_file: + wav_file.setnchannels(1) + wav_file.setsampwidth(2) + wav_file.setframerate(sample_rate) + wav_file.writeframes(audio_data.tobytes()) + + temp_path = Path(f.name) + + yield temp_path + + if temp_path.exists(): + temp_path.unlink() + + +def test_encode_audio_with_tensor_input(sample_audio_tensor): + result = encode_audio( + audio=sample_audio_tensor, + sample_rate=16000, + audio_format="mp3", + bitrate="64k", + b64encode=False, + ) + + assert result["type"] == "audio_file" + assert isinstance(result["audio"], bytes) + assert result["format"] == "mp3" + assert result["mimetype"] == "audio/mp3" + assert result["audio_samples"] == 16000 + assert result["audio_seconds"] == 1.0 + assert isinstance(result["audio_bytes"], int) + assert result["audio_bytes"] > 0 + + +def test_encode_audio_with_base64(sample_audio_tensor): + result = encode_audio(audio=sample_audio_tensor, sample_rate=16000, b64encode=True) + + assert result["type"] == "audio_base64" + assert isinstance(result["audio"], str) + import base64 + + try: + decoded = base64.b64decode(result["audio"]) + assert len(decoded) > 0 + except (base64.binascii.Error, ValueError) as e: + pytest.fail(f"Invalid base64 encoding: {e}") + + +def test_encode_audio_with_numpy_array(sample_audio_tensor): + numpy_audio = sample_audio_tensor.numpy() + + result = encode_audio(audio=numpy_audio, sample_rate=16000) + + assert result["type"] == "audio_file" + assert isinstance(result["audio"], bytes) + assert result["audio_bytes"] > 0 + + +def test_encode_audio_with_real_file_path(real_wav_file): + result = encode_audio(audio=real_wav_file, sample_rate=16000, max_duration=1.0) + + assert result["type"] == "audio_file" + assert isinstance(result["audio"], bytes) + assert result["format"] == "mp3" + assert result["mimetype"] == "audio/mp3" + assert result["file_name"] == Path(real_wav_file).name + assert result["audio_bytes"] > 0 + assert result["audio_seconds"] <= 1.0 + + +def test_encode_audio_with_dict_input_complete(): + audio_dict = {"data": torch.randn(1, 16000), "sample_rate": 16000} + + result = encode_audio(audio=audio_dict) + + assert result["type"] == "audio_file" + assert result["audio_bytes"] > 0 + assert result["audio_samples"] == 16000 + assert result["audio_seconds"] == 1.0 + + +@patch("httpx.get") +@patch("guidellm.extras.audio._encode_audio") +def test_encode_audio_with_url(mock_http_get, sample_audio_tensor): + # mock http get response + mock_response = MagicMock() + mock_response.content = b"fake_audio_content" + mock_response.raise_for_status = MagicMock() + mock_http_get.return_value = mock_response + + # mock decode - return sample audio tensor + with patch("guidellm.extras.audio._decode_audio") as mock_decoder: + mock_audio_result = MagicMock() + mock_audio_result.data = sample_audio_tensor + mock_audio_result.sample_rate = 16000 + mock_decoder.return_value = mock_audio_result + + result = encode_audio(audio="https://example.com/audio.wav", sample_rate=16000) + assert result["type"] == "audio_file" + + +def test_encode_audio_with_max_duration(sample_audio_tensor): + long_audio = torch.randn(1, 32000) + + result = encode_audio(audio=long_audio, sample_rate=16000, max_duration=1.0) + + assert result["audio_seconds"] == 1.0 + + +def test_encode_audio_different_formats(sample_audio_tensor): + formats = ["mp3", "wav", "flac"] + + for fmt in formats: + result = encode_audio( + audio=sample_audio_tensor, sample_rate=16000, audio_format=fmt + ) + + assert result["format"] == fmt + assert result["mimetype"] == f"audio/{fmt}" + assert result["audio_bytes"] > 0 + + +def test_encode_audio_resampling(sample_audio_tensor): + original_rate = 16000 + target_rate = 8000 + + result = encode_audio( + audio=sample_audio_tensor, + sample_rate=original_rate, + encode_sample_rate=target_rate, + ) + + assert "audio_samples" in result + + +def test_encode_audio_error_handling(): + with pytest.raises(ValueError): + encode_audio(audio=123) + + with pytest.raises(ValueError): + encode_audio(audio=torch.randn(1, 16000), sample_rate=None) + + with pytest.raises(ValueError): + encode_audio(audio="/nonexistent/path/audio.wav") + + +def test_audio_quality_preservation(sample_audio_tensor): + result = encode_audio( + audio=sample_audio_tensor, + sample_rate=16000, + audio_format="mp3", + bitrate="128k", + ) + + assert len(result["audio"]) > 1000 + + +def test_end_to_end_audio_processing(sample_audio_tensor): + original_samples = sample_audio_tensor.shape[1] + original_duration = original_samples / 16000 + + result = encode_audio( + audio=sample_audio_tensor, + sample_rate=16000, + audio_format="mp3", + bitrate="64k", + b64encode=True, + max_duration=0.5, + ) + + assert result["type"] == "audio_base64" + assert isinstance(result["audio"], str) + assert result["format"] == "mp3" + assert result["audio_samples"] == 16000 + assert result["audio_seconds"] <= original_duration diff --git a/tests/unit/extras/test_vision.py b/tests/unit/extras/test_vision.py new file mode 100644 index 000000000..65fe8e69d --- /dev/null +++ b/tests/unit/extras/test_vision.py @@ -0,0 +1,291 @@ +import base64 +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest +from PIL import Image + +from guidellm.extras.vision import ( + encode_image, + encode_video, + get_file_format, + resize_image, +) + + +@pytest.fixture +def sample_jpeg_file(): + # Create a valid JPEG image + rng = np.random.default_rng(42) + img_array = rng.integers(0, 255, (100, 100, 3), dtype=np.uint8) + img = Image.fromarray(img_array) + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + img.save(f.name, format="JPEG", quality=95) + temp_path = Path(f.name) + + yield temp_path + # Clean up + if temp_path.exists(): + temp_path.unlink() + + +@pytest.fixture +def sample_image_array(sample_jpeg_file) -> np.ndarray: + img = Image.open(sample_jpeg_file) + return np.array(img) + + +@pytest.fixture +def sample_image_bytes(sample_jpeg_file) -> bytes: + with Path.open(sample_jpeg_file, "rb") as f: + return f.read() + + +# Fixture for common test video +@pytest.fixture +def sample_video_file(): + """Create a temporary video file for testing""" + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: + f.write(b"sample video content for testing") + temp_path = Path(f.name) + + yield temp_path + + if temp_path.exists(): + temp_path.unlink() + + +def test_encode_image_base64(sample_image_bytes: bytes): + result = encode_image(sample_image_bytes, encode_type="base64") + assert result["type"] == "image_base64" + assert "image" in result + assert result["image_bytes"] > 0 + assert result["image_pixels"] > 0 + + +def test_encode_image_url(): + result = encode_image(image="https://example.com/vision.jpg", encode_type="url") + assert result["type"] == "image_url" + assert result["image"] == "https://example.com/vision.jpg" + + +def test_resize_image(sample_image_array: np.ndarray): + # Convert numpy array to PIL Image + pil_image = Image.fromarray(sample_image_array) + + original_height, original_width = sample_image_array.shape[:2] + new_width, new_height = 100, 100 + + resized_image = resize_image( + pil_image, # Pass PIL Image instead of numpy array + width=new_width, + height=new_height, + ) + assert isinstance(resized_image, Image.Image) + assert resized_image.size == (new_width, new_height) + + +def test_get_file_format(sample_jpeg_file): + file_format = get_file_format(sample_jpeg_file) + assert file_format == "jpg" + + +def test_encode_video_with_fixture(sample_video_file): + result = encode_video(video=sample_video_file, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/mp4;base64,") + assert result["video_bytes"] == 32 # Length of "sample video content for testing" + + +def test_encode_video_with_url_base64(): + """Test encoding a video URL with base64 encoding""" + test_url = "https://example.com/video.mp4" + mock_video_content = b"fake video content" + + with patch("httpx.get") as mock_get: + mock_response = MagicMock() + mock_response.content = mock_video_content + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + + result = encode_video(video=test_url, encode_type="base64") + + mock_get.assert_called_once_with(test_url) + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/unknown;base64,") + assert result["video_bytes"] == len(mock_video_content) + assert result["video_frames"] is None + assert result["video_seconds"] is None + + +def test_encode_video_with_url_url_encoding(): + """Test encoding a video URL with url encoding""" + test_url = "https://example.com/video.mp4" + result = encode_video(video=test_url, encode_type="url") + + assert result["type"] == "video_url" + assert result["video"] == test_url + assert result["video_frames"] is None + assert result["video_seconds"] is None + assert result["video_bytes"] is None + + +def test_encode_video_with_base64_string(): + """Test encoding an already base64 encoded video string""" + test_video_bytes = b"fake video content" + base64_video = base64.b64encode(test_video_bytes).decode("utf-8") + data_url = f"data:video/mp4;base64,{base64_video}" + + result = encode_video(video=data_url, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"] == data_url + assert result["video_bytes"] == len(base64_video) * 3 // 4 + assert result["video_frames"] is None + assert result["video_seconds"] is None + + +def test_encode_video_with_file_path(sample_video_file): + result = encode_video(video=sample_video_file, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/mp4;base64,") + assert result["video_bytes"] == len(b"sample video content for testing") + assert result["video_frames"] is None + assert result["video_seconds"] is None + + # Verify base64 encoding is correct + base64_part = result["video"].split(",", 1)[1] + decoded_bytes = base64.b64decode(base64_part) + assert decoded_bytes == b"sample video content for testing" + + +def test_encode_video_with_path_object(): + """Test encoding a video from Path object""" + with tempfile.NamedTemporaryFile(suffix=".avi", delete=False) as f: + try: + mp4_content = b"\x00\x00\x00\x18ftypmp42\x00\x00\x00\x00mp42isom" + f.write(mp4_content) + f.flush() + temp_path = Path(f.name) + + result = encode_video(video=temp_path, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/avi;base64,") + assert result["video_bytes"] == len(mp4_content) + assert result["video_frames"] is None + assert result["video_seconds"] is None + finally: + if temp_path.exists(): + temp_path.unlink() + + +def test_encode_video_with_raw_bytes(): + """Test encoding video from raw bytes""" + video_bytes = b"raw video bytes content" + + result = encode_video(video=video_bytes, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/unknown;base64,") + assert result["video_bytes"] == len(video_bytes) + assert result["video_frames"] is None + assert result["video_seconds"] is None + + # Verify base64 encoding + base64_part = result["video"].split(",", 1)[1] + decoded_bytes = base64.b64decode(base64_part) + assert decoded_bytes == video_bytes + + +def test_encode_video_url_with_http_error(): + """Test URL encoding when HTTP request fails""" + test_url = "https://example.com/video.mp4" + + with patch("httpx.get") as mock_get: + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = Exception("HTTP Error") + mock_get.return_value = mock_response + + with pytest.raises(Exception, match="HTTP Error"): + encode_video(video=test_url, encode_type="base64") + + +def test_encode_video_with_none_encode_type(): + """Test encoding with None encode_type""" + video_bytes = b"test video" + + result = encode_video(video=video_bytes, encode_type=None) + + # Should default to base64 encoding + assert result["type"] == "video_base64" + assert result["video"].startswith("data:video/unknown;base64,") + + +def test_encode_video_with_unsupported_type(): + """Test encoding with unsupported video type""" + with pytest.raises(ValueError, match="Unsupported video type"): + encode_video(video=123, encode_type="base64") # int is not supported + + +def test_encode_video_file_not_found(): + """Test encoding with non-existent file path""" + non_existent_path = "/path/that/does/not/exist/video.mp4" + + with pytest.raises(FileNotFoundError): + encode_video(video=non_existent_path, encode_type="base64") + + +def test_encode_video_base64_correctness(): + """Test that base64 encoding is mathematically correct""" + # Use a known input to verify base64 encoding + test_bytes = b"Hello World" + expected_base64 = base64.b64encode(test_bytes).decode("utf-8") + + result = encode_video(video=test_bytes, encode_type="base64") + + base64_part = result["video"].split(",", 1)[1] + assert base64_part == expected_base64 + assert result["video_bytes"] == len(test_bytes) + + +def test_encode_video_data_url_format(): + """Test that data URL format is correct""" + video_bytes = b"test video data" + + result = encode_video(video=video_bytes, encode_type="base64") + + assert result["video"].startswith("data:video/unknown;base64,") + # Verify the format is exactly as expected + parts = result["video"].split(",", 1) + assert len(parts) == 2 + assert parts[0] == "data:video/unknown;base64" + assert base64.b64decode(parts[1]) == video_bytes + + +# Additional test for edge cases +def test_encode_video_empty_bytes(): + """Test encoding empty video bytes""" + result = encode_video(video=b"", encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video"] == "data:video/unknown;base64," + assert result["video_bytes"] == 0 + + +def test_encode_video_large_content(): + """Test encoding with larger video content""" + large_content = b"x" * 1024 * 1024 # 1MB of data + + result = encode_video(video=large_content, encode_type="base64") + + assert result["type"] == "video_base64" + assert result["video_bytes"] == len(large_content) + # Verify we can decode it back + base64_part = result["video"].split(",", 1)[1] + decoded = base64.b64decode(base64_part) + assert decoded == large_content From 2600d0e334701355266ce34b78ce830ae819de15 Mon Sep 17 00:00:00 2001 From: "guangli.bao" Date: Wed, 3 Dec 2025 13:46:09 +0800 Subject: [PATCH 2/2] fix workflow failure Signed-off-by: guangli.bao --- .github/actions/run-tox/action.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/actions/run-tox/action.yml b/.github/actions/run-tox/action.yml index 132424b1e..5e6f16682 100644 --- a/.github/actions/run-tox/action.yml +++ b/.github/actions/run-tox/action.yml @@ -17,6 +17,10 @@ runs: uses: pdm-project/setup-pdm@v4 with: python-version: ${{ inputs.python-version }} + - name: Install system dependencies + run: | + sudo apt install -y ffmpeg + shell: bash - name: Install dependencies run: | pip install tox tox-pdm