From d3e901bbe5c49042555192e9b4f9005179c58963 Mon Sep 17 00:00:00 2001 From: Shrill Shrestha Date: Tue, 25 May 2021 22:18:22 -0500 Subject: [PATCH 1/5] Port test/test_io.py to pytest --- test/test_io.py | 495 ++++++++++++++++++++++++++---------------------- 1 file changed, 268 insertions(+), 227 deletions(-) diff --git a/test/test_io.py b/test/test_io.py index e86ea9e84fc..7d90f47aa26 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -1,3 +1,4 @@ +import pytest import os import contextlib import sys @@ -5,7 +6,6 @@ import torch import torchvision.io as io from torchvision import get_video_backend -import unittest import warnings from urllib.error import URLError @@ -64,234 +64,275 @@ def temp_video(num_frames, height, width, fps, lossless=False, video_codec=None, os.unlink(f.name) -@unittest.skipIf(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - "video_reader backend not available") -@unittest.skipIf(av is None, "PyAV unavailable") -class TestIO(unittest.TestCase): - # compression adds artifacts, thus we add a tolerance of - # 6 in 0-255 range - TOLERANCE = 6 - - def test_write_read_video(self): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - lv, _, info = io.read_video(f_name) - assert_equal(data, lv) - self.assertEqual(info["video_fps"], 5) - - @unittest.skipIf(not io._HAS_VIDEO_OPT, "video_reader backend is not chosen") - def test_probe_video_from_file(self): - with temp_video(10, 300, 300, 5) as (f_name, data): - video_info = io._probe_video_from_file(f_name) - self.assertAlmostEqual(video_info.video_duration, 2, delta=0.1) - self.assertAlmostEqual(video_info.video_fps, 5, delta=0.1) - - @unittest.skipIf(not io._HAS_VIDEO_OPT, "video_reader backend is not chosen") - def test_probe_video_from_memory(self): - with temp_video(10, 300, 300, 5) as (f_name, data): - with open(f_name, "rb") as fp: - filebuffer = fp.read() - video_info = io._probe_video_from_memory(filebuffer) - self.assertAlmostEqual(video_info.video_duration, 2, delta=0.1) - self.assertAlmostEqual(video_info.video_fps, 5, delta=0.1) - - def test_read_timestamps(self): - with temp_video(10, 300, 300, 5) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - # note: not all formats/codecs provide accurate information for computing the - # timestamps. For the format that we use here, this information is available, - # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] - - self.assertEqual(pts, expected_pts) - container.close() - - def test_read_partial_video(self): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - for start in range(5): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) - s_data = data[start:(start + offset)] - self.assertEqual(len(lv), offset) - assert_equal(s_data, lv) - - if get_video_backend() == "pyav": - # for "video_reader" backend, we don't decode the closest early frame - # when the given start pts is not matching any frame pts - lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) - self.assertEqual(len(lv), 4) - assert_equal(data[4:8], lv) - - def test_read_partial_video_bframes(self): - # do not use lossless encoding, to test the presence of B-frames - options = {'bframes': '16', 'keyint': '10', 'min-keyint': '4'} - with temp_video(100, 300, 300, 5, options=options) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - for start in range(0, 80, 20): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) - s_data = data[start:(start + offset)] - self.assertEqual(len(lv), offset) - assert_equal(s_data, lv, rtol=0.0, atol=self.TOLERANCE) +# compression adds artifacts, thus we add a tolerance of +# 6 in 0-255 range +TOLERANCE = 6 + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_write_read_video(): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + lv, _, info = io.read_video(f_name) + assert_equal(data, lv) + assert info["video_fps"] == 5 + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +@pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") +def test_probe_video_from_file(): + with temp_video(10, 300, 300, 5) as (f_name, data): + video_info = io._probe_video_from_file(f_name) + assert pytest.approx(video_info.video_duration, 0.1) == 2 + assert pytest.approx(video_info.video_fps, 0.1) == 5 + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +@pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") +def test_probe_video_from_memory(): + with temp_video(10, 300, 300, 5) as (f_name, data): + with open(f_name, "rb") as fp: + filebuffer = fp.read() + video_info = io._probe_video_from_memory(filebuffer) + assert pytest.approx(video_info.video_duration, 0.1) == 2 + assert pytest.approx(video_info.video_fps, 0.1) == 5 + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_timestamps(): + with temp_video(10, 300, 300, 5) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + # note: not all formats/codecs provide accurate information for computing the + # timestamps. For the format that we use here, this information is available, + # so we use it as a baseline + container = av.open(f_name) + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] + + assert pts == expected_pts + container.close() + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_partial_video(): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + for start in range(5): + for offset in range(1, 4): + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv) + if get_video_backend() == "pyav": + # for "video_reader" backend, we don't decode the closest early frame + # when the given start pts is not matching any frame pts lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) - # TODO fix this - if get_video_backend() == 'pyav': - self.assertEqual(len(lv), 4) - assert_equal(data[4:8], lv, rtol=0.0, atol=self.TOLERANCE) - else: - self.assertEqual(len(lv), 3) - assert_equal(data[5:8], lv, rtol=0.0, atol=self.TOLERANCE) - - def test_read_packed_b_frames_divx_file(self): - name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" - f_name = os.path.join(VIDEO_DIR, name) - pts, fps = io.read_video_timestamps(f_name) - - self.assertEqual(pts, sorted(pts)) - self.assertEqual(fps, 30) - - def test_read_timestamps_from_packet(self): - with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - # note: not all formats/codecs provide accurate information for computing the - # timestamps. For the format that we use here, this information is available, - # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - # make sure we went through the optimized codepath - self.assertIn(b'Lavc', stream.codec_context.extradata) - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] - - self.assertEqual(pts, expected_pts) - container.close() - - def test_read_video_pts_unit_sec(self): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - lv, _, info = io.read_video(f_name, pts_unit='sec') - - assert_equal(data, lv) - self.assertEqual(info["video_fps"], 5) - self.assertEqual(info, {"video_fps": 5}) - - def test_read_timestamps_pts_unit_sec(self): - with temp_video(10, 300, 300, 5) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') - - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] - - self.assertEqual(pts, expected_pts) - container.close() - - def test_read_partial_video_pts_unit_sec(self): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') - - for start in range(5): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1], pts_unit='sec') - s_data = data[start:(start + offset)] - self.assertEqual(len(lv), offset) - assert_equal(s_data, lv) - - container = av.open(f_name) - stream = container.streams[0] - lv, _, _ = io.read_video(f_name, - int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], - pts_unit='sec') - if get_video_backend() == "pyav": - # for "video_reader" backend, we don't decode the closest early frame - # when the given start pts is not matching any frame pts - self.assertEqual(len(lv), 4) - assert_equal(data[4:8], lv) - container.close() - - def test_read_video_corrupted_file(self): - with tempfile.NamedTemporaryFile(suffix='.mp4') as f: - f.write(b'This is not an mpg4 file') - video, audio, info = io.read_video(f.name) - self.assertIsInstance(video, torch.Tensor) - self.assertIsInstance(audio, torch.Tensor) - self.assertEqual(video.numel(), 0) - self.assertEqual(audio.numel(), 0) - self.assertEqual(info, {}) - - def test_read_video_timestamps_corrupted_file(self): - with tempfile.NamedTemporaryFile(suffix='.mp4') as f: - f.write(b'This is not an mpg4 file') - video_pts, video_fps = io.read_video_timestamps(f.name) - self.assertEqual(video_pts, []) - self.assertIs(video_fps, None) - - @unittest.skip("Temporarily disabled due to new pyav") - def test_read_video_partially_corrupted_file(self): - with temp_video(5, 4, 4, 5, lossless=True) as (f_name, data): - with open(f_name, 'r+b') as f: - size = os.path.getsize(f_name) - bytes_to_overwrite = size // 10 - # seek to the middle of the file - f.seek(5 * bytes_to_overwrite) - # corrupt 10% of the file from the middle - f.write(b'\xff' * bytes_to_overwrite) - # this exercises the container.decode assertion check - video, audio, info = io.read_video(f.name, pts_unit='sec') - # check that size is not equal to 5, but 3 - # TODO fix this - if get_video_backend() == 'pyav': - self.assertEqual(len(video), 3) - else: - self.assertEqual(len(video), 4) - # but the valid decoded content is still correct - assert_equal(video[:3], data[:3]) - # and the last few frames are wrong - with self.assertRaises(AssertionError): - assert_equal(video, data) - - @unittest.skipIf(sys.platform == 'win32', 'temporarily disabled on Windows') - def test_write_video_with_audio(self): - f_name = os.path.join(VIDEO_DIR, "R6llTwEh07w.mp4") - video_tensor, audio_tensor, info = io.read_video(f_name, pts_unit="sec") - - with get_tmp_dir() as tmpdir: - out_f_name = os.path.join(tmpdir, "testing.mp4") - io.video.write_video( - out_f_name, - video_tensor, - round(info["video_fps"]), - video_codec="libx264rgb", - options={'crf': '0'}, - audio_array=audio_tensor, - audio_fps=info["audio_fps"], - audio_codec="aac", - ) - - out_video_tensor, out_audio_tensor, out_info = io.read_video( - out_f_name, pts_unit="sec" - ) - - self.assertEqual(info["video_fps"], out_info["video_fps"]) - assert_equal(video_tensor, out_video_tensor) - - audio_stream = av.open(f_name).streams.audio[0] - out_audio_stream = av.open(out_f_name).streams.audio[0] - - self.assertEqual(info["audio_fps"], out_info["audio_fps"]) - self.assertEqual(audio_stream.rate, out_audio_stream.rate) - self.assertAlmostEqual(audio_stream.frames, out_audio_stream.frames, delta=1) - self.assertEqual(audio_stream.frame_size, out_audio_stream.frame_size) - - # TODO add tests for audio + assert len(lv) == 4 + assert_equal(data[4:8], lv) + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_partial_video_bframes(): + # do not use lossless encoding, to test the presence of B-frames + options = {'bframes': '16', 'keyint': '10', 'min-keyint': '4'} + with temp_video(100, 300, 300, 5, options=options) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + for start in range(0, 80, 20): + for offset in range(1, 4): + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv, rtol=0.0, atol=TOLERANCE) + + lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) + # TODO fix this + if get_video_backend() == 'pyav': + assert len(lv) == 4 + assert_equal(data[4:8], lv, rtol=0.0, atol=TOLERANCE) + else: + assert len(lv) == 3 + assert_equal(data[5:8], lv, rtol=0.0, atol=TOLERANCE) + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_packed_b_frames_divx_file(): + name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" + f_name = os.path.join(VIDEO_DIR, name) + pts, fps = io.read_video_timestamps(f_name) + + assert pts == sorted(pts) + assert fps == 30 + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_timestamps_from_packet(): + with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + # note: not all formats/codecs provide accurate information for computing the + # timestamps. For the format that we use here, this information is available, + # so we use it as a baseline + container = av.open(f_name) + stream = container.streams[0] + # make sure we went through the optimized codepath + assert b'Lavc' in stream.codec_context.extradata + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] + + assert pts == expected_pts + container.close() + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_video_pts_unit_sec(): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + lv, _, info = io.read_video(f_name, pts_unit='sec') + + assert_equal(data, lv) + assert info["video_fps"] == 5 + assert info == {"video_fps": 5} + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_timestamps_pts_unit_sec(): + with temp_video(10, 300, 300, 5) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') + + container = av.open(f_name) + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] + + assert pts == expected_pts + container.close() + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_partial_video_pts_unit_sec(): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') + + for start in range(5): + for offset in range(1, 4): + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1], pts_unit='sec') + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv) + + container = av.open(f_name) + stream = container.streams[0] + lv, _, _ = io.read_video(f_name, + int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], + pts_unit='sec') + if get_video_backend() == "pyav": + # for "video_reader" backend, we don't decode the closest early frame + # when the given start pts is not matching any frame pts + assert len(lv) == 4 + assert_equal(data[4:8], lv) + container.close() + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_video_corrupted_file(): + with tempfile.NamedTemporaryFile(suffix='.mp4') as f: + f.write(b'This is not an mpg4 file') + video, audio, info = io.read_video(f.name) + assert isinstance(video, torch.Tensor) + assert isinstance(audio, torch.Tensor) + assert video.numel() == 0 + assert audio.numel() == 0 + assert info == {} + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +def test_read_video_timestamps_corrupted_file(): + with tempfile.NamedTemporaryFile(suffix='.mp4') as f: + f.write(b'This is not an mpg4 file') + video_pts, video_fps = io.read_video_timestamps(f.name) + assert video_pts == [] + assert video_fps is None + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +@pytest.mark.skip(reason="Temporarily disabled due to new pyav") +def test_read_video_partially_corrupted_file(): + with temp_video(5, 4, 4, 5, lossless=True) as (f_name, data): + with open(f_name, 'r+b') as f: + size = os.path.getsize(f_name) + bytes_to_overwrite = size // 10 + # seek to the middle of the file + f.seek(5 * bytes_to_overwrite) + # corrupt 10% of the file from the middle + f.write(b'\xff' * bytes_to_overwrite) + # this exercises the container.decode assertion check + video, audio, info = io.read_video(f.name, pts_unit='sec') + # check that size is not equal to 5, but 3 + # TODO fix this + if get_video_backend() == 'pyav': + assert len(video) == 3 + else: + assert len(video) == 4 + # but the valid decoded content is still correct + assert_equal(video[:3], data[:3]) + # and the last few frames are wrong + with pytest.raises(AssertionError): + assert_equal(video, data) + +@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, + reason="video_reader backend not available") +@pytest.mark.skipif(av is None, reason="PyAV unavailable") +@pytest.mark.skipif(sys.platform == 'win32', reason='temporarily disabled on Windows') +def test_write_video_with_audio(): + f_name = os.path.join(VIDEO_DIR, "R6llTwEh07w.mp4") + video_tensor, audio_tensor, info = io.read_video(f_name, pts_unit="sec") + + with get_tmp_dir() as tmpdir: + out_f_name = os.path.join(tmpdir, "testing.mp4") + io.video.write_video( + out_f_name, + video_tensor, + round(info["video_fps"]), + video_codec="libx264rgb", + options={'crf': '0'}, + audio_array=audio_tensor, + audio_fps=info["audio_fps"], + audio_codec="aac", + ) + + out_video_tensor, out_audio_tensor, out_info = io.read_video( + out_f_name, pts_unit="sec" + ) + + assert info["video_fps"] == out_info["video_fps"] + assert_equal(video_tensor, out_video_tensor) + + audio_stream = av.open(f_name).streams.audio[0] + out_audio_stream = av.open(out_f_name).streams.audio[0] + + assert info["audio_fps"] == out_info["audio_fps"] + assert audio_stream.rate == out_audio_stream.rate + assert pytest.approx(audio_stream.frames, 1) == out_audio_stream.frames + assert audio_stream.frame_size == out_audio_stream.frame_size + +# TODO add tests for audio if __name__ == '__main__': - unittest.main() + pytest.main(__file__) From 29e3f346b3c01863107ae9b123f45eb1f9976afe Mon Sep 17 00:00:00 2001 From: Shrill Shrestha Date: Tue, 25 May 2021 22:29:37 -0500 Subject: [PATCH 2/5] Maintain code formatting --- test/test_io.py | 49 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/test/test_io.py b/test/test_io.py index 7d90f47aa26..e8835289ed1 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -68,8 +68,9 @@ def temp_video(num_frames, height, width, fps, lossless=False, video_codec=None, # 6 in 0-255 range TOLERANCE = 6 + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_write_read_video(): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): @@ -77,8 +78,9 @@ def test_write_read_video(): assert_equal(data, lv) assert info["video_fps"] == 5 + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") def test_probe_video_from_file(): @@ -87,8 +89,9 @@ def test_probe_video_from_file(): assert pytest.approx(video_info.video_duration, 0.1) == 2 assert pytest.approx(video_info.video_fps, 0.1) == 5 + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") def test_probe_video_from_memory(): @@ -99,8 +102,9 @@ def test_probe_video_from_memory(): assert pytest.approx(video_info.video_duration, 0.1) == 2 assert pytest.approx(video_info.video_fps, 0.1) == 5 + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_timestamps(): with temp_video(10, 300, 300, 5) as (f_name, data): @@ -117,8 +121,9 @@ def test_read_timestamps(): assert pts == expected_pts container.close() + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_partial_video(): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): @@ -137,8 +142,9 @@ def test_read_partial_video(): assert len(lv) == 4 assert_equal(data[4:8], lv) + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_partial_video_bframes(): # do not use lossless encoding, to test the presence of B-frames @@ -161,8 +167,9 @@ def test_read_partial_video_bframes(): assert len(lv) == 3 assert_equal(data[5:8], lv, rtol=0.0, atol=TOLERANCE) + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_packed_b_frames_divx_file(): name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" @@ -172,8 +179,9 @@ def test_read_packed_b_frames_divx_file(): assert pts == sorted(pts) assert fps == 30 + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_timestamps_from_packet(): with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): @@ -192,8 +200,9 @@ def test_read_timestamps_from_packet(): assert pts == expected_pts container.close() + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_video_pts_unit_sec(): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): @@ -203,8 +212,9 @@ def test_read_video_pts_unit_sec(): assert info["video_fps"] == 5 assert info == {"video_fps": 5} + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_timestamps_pts_unit_sec(): with temp_video(10, 300, 300, 5) as (f_name, data): @@ -219,8 +229,9 @@ def test_read_timestamps_pts_unit_sec(): assert pts == expected_pts container.close() + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_partial_video_pts_unit_sec(): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): @@ -236,8 +247,8 @@ def test_read_partial_video_pts_unit_sec(): container = av.open(f_name) stream = container.streams[0] lv, _, _ = io.read_video(f_name, - int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], - pts_unit='sec') + int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], + pts_unit='sec') if get_video_backend() == "pyav": # for "video_reader" backend, we don't decode the closest early frame # when the given start pts is not matching any frame pts @@ -245,8 +256,9 @@ def test_read_partial_video_pts_unit_sec(): assert_equal(data[4:8], lv) container.close() + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_video_corrupted_file(): with tempfile.NamedTemporaryFile(suffix='.mp4') as f: @@ -258,8 +270,9 @@ def test_read_video_corrupted_file(): assert audio.numel() == 0 assert info == {} + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") def test_read_video_timestamps_corrupted_file(): with tempfile.NamedTemporaryFile(suffix='.mp4') as f: @@ -268,8 +281,9 @@ def test_read_video_timestamps_corrupted_file(): assert video_pts == [] assert video_fps is None + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") @pytest.mark.skip(reason="Temporarily disabled due to new pyav") def test_read_video_partially_corrupted_file(): @@ -295,8 +309,9 @@ def test_read_video_partially_corrupted_file(): with pytest.raises(AssertionError): assert_equal(video, data) + @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") + reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") @pytest.mark.skipif(sys.platform == 'win32', reason='temporarily disabled on Windows') def test_write_video_with_audio(): From 6551f318ba6113884c47fd017c3310f7846a8e15 Mon Sep 17 00:00:00 2001 From: Shrill Shrestha Date: Wed, 26 May 2021 11:34:49 -0500 Subject: [PATCH 3/5] Refactor test code - add skip decorators to TestVideo class - add absolute and relative tolorence to pytest.approx - add parametrization instead of nested loops - close container before assert statements --- test/test_io.py | 516 ++++++++++++++++++++++-------------------------- 1 file changed, 238 insertions(+), 278 deletions(-) diff --git a/test/test_io.py b/test/test_io.py index e8835289ed1..ad61a7a8eda 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -64,289 +64,249 @@ def temp_video(num_frames, height, width, fps, lossless=False, video_codec=None, os.unlink(f.name) -# compression adds artifacts, thus we add a tolerance of -# 6 in 0-255 range -TOLERANCE = 6 - - @pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, reason="video_reader backend not available") @pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_write_read_video(): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - lv, _, info = io.read_video(f_name) - assert_equal(data, lv) - assert info["video_fps"] == 5 - +class TestVideo: + # compression adds artifacts, thus we add a tolerance of + # 6 in 0-255 range + TOLERANCE = 6 + + def test_write_read_video(self): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + lv, _, info = io.read_video(f_name) + assert_equal(data, lv) + assert info["video_fps"] == 5 + + + @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") + def test_probe_video_from_file(self): + with temp_video(10, 300, 300, 5) as (f_name, data): + video_info = io._probe_video_from_file(f_name) + assert pytest.approx(2, rel=0.0, abs=0.1) == video_info.video_duration + assert pytest.approx(5, rel=0.0, abs=0.1) == video_info.video_fps + + + @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") + def test_probe_video_from_memory(self): + with temp_video(10, 300, 300, 5) as (f_name, data): + with open(f_name, "rb") as fp: + filebuffer = fp.read() + video_info = io._probe_video_from_memory(filebuffer) + assert pytest.approx(2, rel=0.0, abs=0.1) == video_info.video_duration + assert pytest.approx(5, rel=0.0, abs=0.1) == video_info.video_fps + + + def test_read_timestamps(self): + with temp_video(10, 300, 300, 5) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + # note: not all formats/codecs provide accurate information for computing the + # timestamps. For the format that we use here, this information is available, + # so we use it as a baseline + container = av.open(f_name) + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] + + container.close() + assert pts == expected_pts + + + @pytest.mark.parametrize('start', range(5)) + @pytest.mark.parametrize('offset', range(1, 4)) + def test_read_partial_video(self, start, offset): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv) + + if get_video_backend() == "pyav": + # for "video_reader" backend, we don't decode the closest early frame + # when the given start pts is not matching any frame pts + lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) + assert len(lv) == 4 + assert_equal(data[4:8], lv) + + + @pytest.mark.parametrize('start', range(0, 80, 20)) + @pytest.mark.parametrize('offset', range(1, 4)) + def test_read_partial_video_bframes(self, start, offset): + # do not use lossless encoding, to test the presence of B-frames + options = {'bframes': '16', 'keyint': '10', 'min-keyint': '4'} + with temp_video(100, 300, 300, 5, options=options) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv, rtol=0.0, atol=self.TOLERANCE) -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -@pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") -def test_probe_video_from_file(): - with temp_video(10, 300, 300, 5) as (f_name, data): - video_info = io._probe_video_from_file(f_name) - assert pytest.approx(video_info.video_duration, 0.1) == 2 - assert pytest.approx(video_info.video_fps, 0.1) == 5 - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -@pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") -def test_probe_video_from_memory(): - with temp_video(10, 300, 300, 5) as (f_name, data): - with open(f_name, "rb") as fp: - filebuffer = fp.read() - video_info = io._probe_video_from_memory(filebuffer) - assert pytest.approx(video_info.video_duration, 0.1) == 2 - assert pytest.approx(video_info.video_fps, 0.1) == 5 - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_timestamps(): - with temp_video(10, 300, 300, 5) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - # note: not all formats/codecs provide accurate information for computing the - # timestamps. For the format that we use here, this information is available, - # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] - - assert pts == expected_pts - container.close() - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_partial_video(): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - for start in range(5): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) - s_data = data[start:(start + offset)] - assert len(lv) == offset - assert_equal(s_data, lv) - - if get_video_backend() == "pyav": - # for "video_reader" backend, we don't decode the closest early frame - # when the given start pts is not matching any frame pts lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) - assert len(lv) == 4 - assert_equal(data[4:8], lv) - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_partial_video_bframes(): - # do not use lossless encoding, to test the presence of B-frames - options = {'bframes': '16', 'keyint': '10', 'min-keyint': '4'} - with temp_video(100, 300, 300, 5, options=options) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - for start in range(0, 80, 20): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) - s_data = data[start:(start + offset)] - assert len(lv) == offset - assert_equal(s_data, lv, rtol=0.0, atol=TOLERANCE) - - lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7]) - # TODO fix this - if get_video_backend() == 'pyav': - assert len(lv) == 4 - assert_equal(data[4:8], lv, rtol=0.0, atol=TOLERANCE) - else: - assert len(lv) == 3 - assert_equal(data[5:8], lv, rtol=0.0, atol=TOLERANCE) - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_packed_b_frames_divx_file(): - name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" - f_name = os.path.join(VIDEO_DIR, name) - pts, fps = io.read_video_timestamps(f_name) - - assert pts == sorted(pts) - assert fps == 30 - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_timestamps_from_packet(): - with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): - pts, _ = io.read_video_timestamps(f_name) - # note: not all formats/codecs provide accurate information for computing the - # timestamps. For the format that we use here, this information is available, - # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - # make sure we went through the optimized codepath - assert b'Lavc' in stream.codec_context.extradata - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] - - assert pts == expected_pts - container.close() - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_video_pts_unit_sec(): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - lv, _, info = io.read_video(f_name, pts_unit='sec') - - assert_equal(data, lv) - assert info["video_fps"] == 5 - assert info == {"video_fps": 5} - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_timestamps_pts_unit_sec(): - with temp_video(10, 300, 300, 5) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') - - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] - - assert pts == expected_pts - container.close() - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_partial_video_pts_unit_sec(): - with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): - pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') - - for start in range(5): - for offset in range(1, 4): - lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1], pts_unit='sec') - s_data = data[start:(start + offset)] - assert len(lv) == offset - assert_equal(s_data, lv) - - container = av.open(f_name) - stream = container.streams[0] - lv, _, _ = io.read_video(f_name, - int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], - pts_unit='sec') - if get_video_backend() == "pyav": - # for "video_reader" backend, we don't decode the closest early frame - # when the given start pts is not matching any frame pts - assert len(lv) == 4 - assert_equal(data[4:8], lv) - container.close() - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_video_corrupted_file(): - with tempfile.NamedTemporaryFile(suffix='.mp4') as f: - f.write(b'This is not an mpg4 file') - video, audio, info = io.read_video(f.name) - assert isinstance(video, torch.Tensor) - assert isinstance(audio, torch.Tensor) - assert video.numel() == 0 - assert audio.numel() == 0 - assert info == {} - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -def test_read_video_timestamps_corrupted_file(): - with tempfile.NamedTemporaryFile(suffix='.mp4') as f: - f.write(b'This is not an mpg4 file') - video_pts, video_fps = io.read_video_timestamps(f.name) - assert video_pts == [] - assert video_fps is None - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -@pytest.mark.skip(reason="Temporarily disabled due to new pyav") -def test_read_video_partially_corrupted_file(): - with temp_video(5, 4, 4, 5, lossless=True) as (f_name, data): - with open(f_name, 'r+b') as f: - size = os.path.getsize(f_name) - bytes_to_overwrite = size // 10 - # seek to the middle of the file - f.seek(5 * bytes_to_overwrite) - # corrupt 10% of the file from the middle - f.write(b'\xff' * bytes_to_overwrite) - # this exercises the container.decode assertion check - video, audio, info = io.read_video(f.name, pts_unit='sec') - # check that size is not equal to 5, but 3 - # TODO fix this - if get_video_backend() == 'pyav': - assert len(video) == 3 - else: - assert len(video) == 4 - # but the valid decoded content is still correct - assert_equal(video[:3], data[:3]) - # and the last few frames are wrong - with pytest.raises(AssertionError): - assert_equal(video, data) - - -@pytest.mark.skipif(get_video_backend() != "pyav" and not io._HAS_VIDEO_OPT, - reason="video_reader backend not available") -@pytest.mark.skipif(av is None, reason="PyAV unavailable") -@pytest.mark.skipif(sys.platform == 'win32', reason='temporarily disabled on Windows') -def test_write_video_with_audio(): - f_name = os.path.join(VIDEO_DIR, "R6llTwEh07w.mp4") - video_tensor, audio_tensor, info = io.read_video(f_name, pts_unit="sec") - - with get_tmp_dir() as tmpdir: - out_f_name = os.path.join(tmpdir, "testing.mp4") - io.video.write_video( - out_f_name, - video_tensor, - round(info["video_fps"]), - video_codec="libx264rgb", - options={'crf': '0'}, - audio_array=audio_tensor, - audio_fps=info["audio_fps"], - audio_codec="aac", - ) - - out_video_tensor, out_audio_tensor, out_info = io.read_video( - out_f_name, pts_unit="sec" - ) - - assert info["video_fps"] == out_info["video_fps"] - assert_equal(video_tensor, out_video_tensor) - - audio_stream = av.open(f_name).streams.audio[0] - out_audio_stream = av.open(out_f_name).streams.audio[0] - - assert info["audio_fps"] == out_info["audio_fps"] - assert audio_stream.rate == out_audio_stream.rate - assert pytest.approx(audio_stream.frames, 1) == out_audio_stream.frames - assert audio_stream.frame_size == out_audio_stream.frame_size - -# TODO add tests for audio + # TODO fix this + if get_video_backend() == 'pyav': + assert len(lv) == 4 + assert_equal(data[4:8], lv, rtol=0.0, atol=self.TOLERANCE) + else: + assert len(lv) == 3 + assert_equal(data[5:8], lv, rtol=0.0, atol=self.TOLERANCE) + + + def test_read_packed_b_frames_divx_file(self): + name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" + f_name = os.path.join(VIDEO_DIR, name) + pts, fps = io.read_video_timestamps(f_name) + + assert pts == sorted(pts) + assert fps == 30 + + + def test_read_timestamps_from_packet(self): + with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): + pts, _ = io.read_video_timestamps(f_name) + # note: not all formats/codecs provide accurate information for computing the + # timestamps. For the format that we use here, this information is available, + # so we use it as a baseline + container = av.open(f_name) + stream = container.streams[0] + # make sure we went through the optimized codepath + assert b'Lavc' in stream.codec_context.extradata + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] + + container.close() + assert pts == expected_pts + + + def test_read_video_pts_unit_sec(self): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + lv, _, info = io.read_video(f_name, pts_unit='sec') + + assert_equal(data, lv) + assert info["video_fps"] == 5 + assert info == {"video_fps": 5} + + + def test_read_timestamps_pts_unit_sec(self): + with temp_video(10, 300, 300, 5) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') + + container = av.open(f_name) + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] + + container.close() + assert pts == expected_pts + + + @pytest.mark.parametrize('start', range(5)) + @pytest.mark.parametrize('offset', range(1, 4)) + def test_read_partial_video_pts_unit_sec(self, start, offset): + with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): + pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') + + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1], pts_unit='sec') + s_data = data[start:(start + offset)] + assert len(lv) == offset + assert_equal(s_data, lv) + + container = av.open(f_name) + stream = container.streams[0] + lv, _, _ = io.read_video(f_name, + int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], + pts_unit='sec') + container.close() + if get_video_backend() == "pyav": + # for "video_reader" backend, we don't decode the closest early frame + # when the given start pts is not matching any frame pts + assert len(lv) == 4 + assert_equal(data[4:8], lv) + + + def test_read_video_corrupted_file(self): + with tempfile.NamedTemporaryFile(suffix='.mp4') as f: + f.write(b'This is not an mpg4 file') + video, audio, info = io.read_video(f.name) + assert isinstance(video, torch.Tensor) + assert isinstance(audio, torch.Tensor) + assert video.numel() == 0 + assert audio.numel() == 0 + assert info == {} + + + def test_read_video_timestamps_corrupted_file(self): + with tempfile.NamedTemporaryFile(suffix='.mp4') as f: + f.write(b'This is not an mpg4 file') + video_pts, video_fps = io.read_video_timestamps(f.name) + assert video_pts == [] + assert video_fps is None + + + @pytest.mark.skip(reason="Temporarily disabled due to new pyav") + def test_read_video_partially_corrupted_file(self): + with temp_video(5, 4, 4, 5, lossless=True) as (f_name, data): + with open(f_name, 'r+b') as f: + size = os.path.getsize(f_name) + bytes_to_overwrite = size // 10 + # seek to the middle of the file + f.seek(5 * bytes_to_overwrite) + # corrupt 10% of the file from the middle + f.write(b'\xff' * bytes_to_overwrite) + # this exercises the container.decode assertion check + video, audio, info = io.read_video(f.name, pts_unit='sec') + # check that size is not equal to 5, but 3 + # TODO fix this + if get_video_backend() == 'pyav': + assert len(video) == 3 + else: + assert len(video) == 4 + # but the valid decoded content is still correct + assert_equal(video[:3], data[:3]) + # and the last few frames are wrong + with pytest.raises(AssertionError): + assert_equal(video, data) + + + @pytest.mark.skipif(sys.platform == 'win32', reason='temporarily disabled on Windows') + def test_write_video_with_audio(self): + f_name = os.path.join(VIDEO_DIR, "R6llTwEh07w.mp4") + video_tensor, audio_tensor, info = io.read_video(f_name, pts_unit="sec") + + with get_tmp_dir() as tmpdir: + out_f_name = os.path.join(tmpdir, "testing.mp4") + io.video.write_video( + out_f_name, + video_tensor, + round(info["video_fps"]), + video_codec="libx264rgb", + options={'crf': '0'}, + audio_array=audio_tensor, + audio_fps=info["audio_fps"], + audio_codec="aac", + ) + + out_video_tensor, out_audio_tensor, out_info = io.read_video( + out_f_name, pts_unit="sec" + ) + + assert info["video_fps"] == out_info["video_fps"] + assert_equal(video_tensor, out_video_tensor) + + audio_stream = av.open(f_name).streams.audio[0] + out_audio_stream = av.open(out_f_name).streams.audio[0] + + assert info["audio_fps"] == out_info["audio_fps"] + assert audio_stream.rate == out_audio_stream.rate + assert pytest.approx(out_audio_stream.frames, rel=0.0, abs=1) == audio_stream.frames + assert audio_stream.frame_size == out_audio_stream.frame_size + + # TODO add tests for audio if __name__ == '__main__': From 3ca61ef320dca081bce2c674b4550d8962999c3c Mon Sep 17 00:00:00 2001 From: Shrill Shrestha Date: Wed, 26 May 2021 11:47:42 -0500 Subject: [PATCH 4/5] Maintain code formatting --- test/test_io.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/test/test_io.py b/test/test_io.py index ad61a7a8eda..fe39d2c4f64 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -78,7 +78,6 @@ def test_write_read_video(self): assert_equal(data, lv) assert info["video_fps"] == 5 - @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") def test_probe_video_from_file(self): with temp_video(10, 300, 300, 5) as (f_name, data): @@ -86,7 +85,6 @@ def test_probe_video_from_file(self): assert pytest.approx(2, rel=0.0, abs=0.1) == video_info.video_duration assert pytest.approx(5, rel=0.0, abs=0.1) == video_info.video_fps - @pytest.mark.skipif(not io._HAS_VIDEO_OPT, reason="video_reader backend is not chosen") def test_probe_video_from_memory(self): with temp_video(10, 300, 300, 5) as (f_name, data): @@ -96,7 +94,6 @@ def test_probe_video_from_memory(self): assert pytest.approx(2, rel=0.0, abs=0.1) == video_info.video_duration assert pytest.approx(5, rel=0.0, abs=0.1) == video_info.video_fps - def test_read_timestamps(self): with temp_video(10, 300, 300, 5) as (f_name, data): pts, _ = io.read_video_timestamps(f_name) @@ -112,13 +109,12 @@ def test_read_timestamps(self): container.close() assert pts == expected_pts - @pytest.mark.parametrize('start', range(5)) @pytest.mark.parametrize('offset', range(1, 4)) def test_read_partial_video(self, start, offset): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): pts, _ = io.read_video_timestamps(f_name) - + lv, _, _ = io.read_video(f_name, pts[start], pts[start + offset - 1]) s_data = data[start:(start + offset)] assert len(lv) == offset @@ -131,7 +127,6 @@ def test_read_partial_video(self, start, offset): assert len(lv) == 4 assert_equal(data[4:8], lv) - @pytest.mark.parametrize('start', range(0, 80, 20)) @pytest.mark.parametrize('offset', range(1, 4)) def test_read_partial_video_bframes(self, start, offset): @@ -154,7 +149,6 @@ def test_read_partial_video_bframes(self, start, offset): assert len(lv) == 3 assert_equal(data[5:8], lv, rtol=0.0, atol=self.TOLERANCE) - def test_read_packed_b_frames_divx_file(self): name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi" f_name = os.path.join(VIDEO_DIR, name) @@ -163,7 +157,6 @@ def test_read_packed_b_frames_divx_file(self): assert pts == sorted(pts) assert fps == 30 - def test_read_timestamps_from_packet(self): with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data): pts, _ = io.read_video_timestamps(f_name) @@ -181,7 +174,6 @@ def test_read_timestamps_from_packet(self): container.close() assert pts == expected_pts - def test_read_video_pts_unit_sec(self): with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data): lv, _, info = io.read_video(f_name, pts_unit='sec') @@ -190,7 +182,6 @@ def test_read_video_pts_unit_sec(self): assert info["video_fps"] == 5 assert info == {"video_fps": 5} - def test_read_timestamps_pts_unit_sec(self): with temp_video(10, 300, 300, 5) as (f_name, data): pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') @@ -204,7 +195,6 @@ def test_read_timestamps_pts_unit_sec(self): container.close() assert pts == expected_pts - @pytest.mark.parametrize('start', range(5)) @pytest.mark.parametrize('offset', range(1, 4)) def test_read_partial_video_pts_unit_sec(self, start, offset): @@ -219,8 +209,8 @@ def test_read_partial_video_pts_unit_sec(self, start, offset): container = av.open(f_name) stream = container.streams[0] lv, _, _ = io.read_video(f_name, - int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], - pts_unit='sec') + int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], + pts_unit='sec') container.close() if get_video_backend() == "pyav": # for "video_reader" backend, we don't decode the closest early frame @@ -228,7 +218,6 @@ def test_read_partial_video_pts_unit_sec(self, start, offset): assert len(lv) == 4 assert_equal(data[4:8], lv) - def test_read_video_corrupted_file(self): with tempfile.NamedTemporaryFile(suffix='.mp4') as f: f.write(b'This is not an mpg4 file') @@ -239,7 +228,6 @@ def test_read_video_corrupted_file(self): assert audio.numel() == 0 assert info == {} - def test_read_video_timestamps_corrupted_file(self): with tempfile.NamedTemporaryFile(suffix='.mp4') as f: f.write(b'This is not an mpg4 file') @@ -247,7 +235,6 @@ def test_read_video_timestamps_corrupted_file(self): assert video_pts == [] assert video_fps is None - @pytest.mark.skip(reason="Temporarily disabled due to new pyav") def test_read_video_partially_corrupted_file(self): with temp_video(5, 4, 4, 5, lossless=True) as (f_name, data): @@ -272,7 +259,6 @@ def test_read_video_partially_corrupted_file(self): with pytest.raises(AssertionError): assert_equal(video, data) - @pytest.mark.skipif(sys.platform == 'win32', reason='temporarily disabled on Windows') def test_write_video_with_audio(self): f_name = os.path.join(VIDEO_DIR, "R6llTwEh07w.mp4") From 71185ad128fe9f44d556fa6ef6ac49cd4ca7404c Mon Sep 17 00:00:00 2001 From: Shrill Shrestha Date: Thu, 27 May 2021 10:07:51 -0500 Subject: [PATCH 5/5] Refactor open/close to with..as --- test/test_io.py | 50 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/test/test_io.py b/test/test_io.py index fe39d2c4f64..93164a9997f 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -100,13 +100,12 @@ def test_read_timestamps(self): # note: not all formats/codecs provide accurate information for computing the # timestamps. For the format that we use here, this information is available, # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] + with av.open(f_name) as container: + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] - container.close() assert pts == expected_pts @pytest.mark.parametrize('start', range(5)) @@ -163,15 +162,14 @@ def test_read_timestamps_from_packet(self): # note: not all formats/codecs provide accurate information for computing the # timestamps. For the format that we use here, this information is available, # so we use it as a baseline - container = av.open(f_name) - stream = container.streams[0] - # make sure we went through the optimized codepath - assert b'Lavc' in stream.codec_context.extradata - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step for i in range(num_frames)] - - container.close() + with av.open(f_name) as container: + stream = container.streams[0] + # make sure we went through the optimized codepath + assert b'Lavc' in stream.codec_context.extradata + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step for i in range(num_frames)] + assert pts == expected_pts def test_read_video_pts_unit_sec(self): @@ -186,13 +184,12 @@ def test_read_timestamps_pts_unit_sec(self): with temp_video(10, 300, 300, 5) as (f_name, data): pts, _ = io.read_video_timestamps(f_name, pts_unit='sec') - container = av.open(f_name) - stream = container.streams[0] - pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) - num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) - expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] + with av.open(f_name) as container: + stream = container.streams[0] + pts_step = int(round(float(1 / (stream.average_rate * stream.time_base)))) + num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration))) + expected_pts = [i * pts_step * stream.time_base for i in range(num_frames)] - container.close() assert pts == expected_pts @pytest.mark.parametrize('start', range(5)) @@ -206,12 +203,11 @@ def test_read_partial_video_pts_unit_sec(self, start, offset): assert len(lv) == offset assert_equal(s_data, lv) - container = av.open(f_name) - stream = container.streams[0] - lv, _, _ = io.read_video(f_name, - int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], - pts_unit='sec') - container.close() + with av.open(f_name) as container: + stream = container.streams[0] + lv, _, _ = io.read_video(f_name, + int(pts[4] * (1.0 / stream.time_base) + 1) * stream.time_base, pts[7], + pts_unit='sec') if get_video_backend() == "pyav": # for "video_reader" backend, we don't decode the closest early frame # when the given start pts is not matching any frame pts