diff --git a/docs/source/sox_effects.rst b/docs/source/sox_effects.rst index 0aa34e7908..6eee11d8c7 100644 --- a/docs/source/sox_effects.rst +++ b/docs/source/sox_effects.rst @@ -5,10 +5,6 @@ torchaudio.sox_effects .. currentmodule:: torchaudio.sox_effects -.. warning:: - - The :py:class:`SoxEffect` and :py:class:`SoxEffectsChain` classes are deprecated. Please migrate to :func:`apply_effects_tensor` and :func:`apply_effects_file`. - Resource initialization / shutdown ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -35,18 +31,3 @@ Applying effects on file ------------------------ .. autofunction:: apply_effects_file - -Legacy -~~~~~~ - -SoxEffect ---------- - -.. autoclass:: SoxEffect - :members: - -SoxEffectsChain ---------------- - -.. autoclass:: SoxEffectsChain - :members: append_effect_to_chain, sox_build_flow_effects, clear_chain, set_input_file diff --git a/test/torchaudio_unittest/dataloader_test.py b/test/torchaudio_unittest/dataloader_test.py deleted file mode 100644 index 8f1d7e6fb3..0000000000 --- a/test/torchaudio_unittest/dataloader_test.py +++ /dev/null @@ -1,39 +0,0 @@ -import unittest - -import torchaudio -from torch.utils.data import Dataset, DataLoader - -from torchaudio_unittest import common_utils - - -class TORCHAUDIODS(Dataset): - def __init__(self): - sound_files = ["sinewave.wav", "steam-train-whistle-daniel_simon.mp3"] - self.data = [common_utils.get_asset_path(fn) for fn in sound_files] - self.si, self.ei = torchaudio.info(common_utils.get_asset_path("sinewave.wav")) - self.si.precision = 16 - self.E = torchaudio.sox_effects.SoxEffectsChain() - self.E.append_effect_to_chain("rate", [self.si.rate]) # resample to 16000hz - self.E.append_effect_to_chain("channels", [self.si.channels]) # mono signal - self.E.append_effect_to_chain("trim", [0, "16000s"]) # first 16000 samples of audio - - def __getitem__(self, index): - fn = self.data[index] - self.E.set_input_file(fn) - x, sr = self.E.sox_build_flow_effects() - return x - - def __len__(self): - return len(self.data) - - -class Test_DataLoader(common_utils.TorchaudioTestCase): - backend = 'sox' - - @common_utils.skipIfNoSoxBackend - def test_1(self): - expected_size = (2, 1, 16000) - ds = TORCHAUDIODS() - dl = DataLoader(ds, batch_size=2) - for x in dl: - self.assertTrue(x.size() == expected_size) diff --git a/test/torchaudio_unittest/sox_effect/sox_effects_chain_test.py b/test/torchaudio_unittest/sox_effect/sox_effects_chain_test.py deleted file mode 100644 index 765ab62094..0000000000 --- a/test/torchaudio_unittest/sox_effect/sox_effects_chain_test.py +++ /dev/null @@ -1,224 +0,0 @@ -import sys -import math -import unittest - -import torch -import torchaudio - -from .. import common_utils - - -@common_utils.skipIfNoSoxBackend -class Test_SoxEffectsChain(common_utils.TorchaudioTestCase): - backend = 'sox' - - test_filepath = common_utils.get_asset_path("steam-train-whistle-daniel_simon.wav") - - def test_single_channel(self): - fn_sine = common_utils.get_asset_path("sinewave.wav") - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(fn_sine) - E.append_effect_to_chain("echos", [0.8, 0.7, 40, 0.25, 63, 0.3]) - x, sr = E.sox_build_flow_effects() - # check if effects worked - # print(x.size()) - - def test_rate_channels(self): - target_rate = 16000 - target_channels = 1 - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("rate", [target_rate]) - E.append_effect_to_chain("channels", [target_channels]) - x, sr = E.sox_build_flow_effects() - # check if effects worked - self.assertEqual(sr, target_rate) - self.assertEqual(x.size(0), target_channels) - - @unittest.skipIf(sys.platform == 'darwin', 'This test is known to fail on macOS') - def test_lowpass_speed(self): - speed = .8 - si, _ = torchaudio.info(self.test_filepath) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("lowpass", 100) - E.append_effect_to_chain("speed", speed) - E.append_effect_to_chain("rate", si.rate) - x, sr = E.sox_build_flow_effects() - # check if effects worked, add small tolerance for rounding effects - self.assertEqual(x.size(1), int((si.length / si.channels) / speed), atol=1, rtol=1e-8) - - def test_ulaw_and_siginfo(self): - si_out = torchaudio.sox_signalinfo_t() - ei_out = torchaudio.sox_encodinginfo_t() - si_out.precision = 8 - ei_out.encoding = torchaudio.get_sox_encoding_t(9) - ei_out.bits_per_sample = 8 - si_in, ei_in = torchaudio.info(self.test_filepath) - si_out.rate = 44100 - si_out.channels = 2 - E = torchaudio.sox_effects.SoxEffectsChain(out_siginfo=si_out, out_encinfo=ei_out) - E.set_input_file(self.test_filepath) - x, sr = E.sox_build_flow_effects() - # Note: the output was encoded into ulaw because the - # number of unique values in the output is less than 256. - self.assertLess(x.unique().size(0), 2**8 + 1) - self.assertEqual(x.numel(), si_in.length) - - def test_band_chorus(self): - si_in, ei_in = torchaudio.info(self.test_filepath) - ei_in.encoding = torchaudio.get_sox_encoding_t(1) - E = torchaudio.sox_effects.SoxEffectsChain(out_encinfo=ei_in, out_siginfo=si_in) - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("band", ["-n", "10k", "3.5k"]) - E.append_effect_to_chain("chorus", [.5, .7, 55, 0.4, .25, 2, '-s']) - E.append_effect_to_chain("rate", [si_in.rate]) - E.append_effect_to_chain("channels", [si_in.channels]) - x, sr = E.sox_build_flow_effects() - # The chorus effect will make the output file longer than the input - self.assertEqual(x.size(0), si_in.channels) - self.assertGreaterEqual(x.size(1) * x.size(0), si_in.length) - - def test_synth(self): - si_in, ei_in = torchaudio.info(self.test_filepath) - len_in_seconds = si_in.length / si_in.channels / si_in.rate - ei_in.encoding = torchaudio.get_sox_encoding_t(1) - E = torchaudio.sox_effects.SoxEffectsChain(out_encinfo=ei_in, out_siginfo=si_in) - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("synth", [str(len_in_seconds), "pinknoise", "mix"]) - E.append_effect_to_chain("rate", [44100]) - E.append_effect_to_chain("channels", [2]) - x, sr = E.sox_build_flow_effects() - self.assertEqual(x.size(0), si_in.channels) - self.assertEqual(si_in.length, x.size(0) * x.size(1)) - - def test_gain(self): - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("gain", ["5"]) - x, sr = E.sox_build_flow_effects() - E.clear_chain() - self.assertTrue(x.abs().max().item(), 1.) - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("gain", ["-e", "-5"]) - x, sr = E.sox_build_flow_effects() - E.clear_chain() - self.assertLess(x.abs().max().item(), 1.) - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("gain", ["-b", "8"]) - x, sr = E.sox_build_flow_effects() - E.clear_chain() - self.assertTrue(x.abs().max().item(), 1.) - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("gain", ["-n", "-10"]) - x, sr = E.sox_build_flow_effects() - E.clear_chain() - self.assertLess(x.abs().max().item(), 1.) - - def test_tempo_or_speed(self): - tempo = .8 - si, _ = torchaudio.info(self.test_filepath) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("tempo", ["-s", tempo]) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / tempo), delta=1) - # tempo > 1 - E.clear_chain() - tempo = 1.2 - E.append_effect_to_chain("tempo", ["-s", tempo]) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / tempo), delta=1) - # tempo > 1 - E.clear_chain() - speed = 1.2 - E.append_effect_to_chain("speed", [speed]) - E.append_effect_to_chain("rate", [si.rate]) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / speed), delta=1) - # speed < 1 - E.clear_chain() - speed = 0.8 - E.append_effect_to_chain("speed", [speed]) - E.append_effect_to_chain("rate", [si.rate]) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / speed), delta=1) - - def test_trim(self): - x_orig, _ = torchaudio.load(self.test_filepath) - offset = "10000s" - offset_int = int(offset[:-1]) - num_frames = "20000s" - num_frames_int = int(num_frames[:-1]) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("trim", [offset, num_frames]) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertTrue(x.allclose(x_orig[:, offset_int:(offset_int + num_frames_int)], rtol=1e-4, atol=1e-4)) - - def test_silence_contrast(self): - si, _ = torchaudio.info(self.test_filepath) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("silence", [1, 100, 1]) - E.append_effect_to_chain("contrast", []) - x, sr = E.sox_build_flow_effects() - # check if effect worked - self.assertLess(x.numel(), si.length) - - def test_reverse(self): - x_orig, _ = torchaudio.load(self.test_filepath) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("reverse", "") - x_rev, _ = E.sox_build_flow_effects() - # check if effect worked - rev_idx = torch.LongTensor(range(x_orig.size(1))[::-1]) - self.assertTrue(x_orig.allclose(x_rev[:, rev_idx], rtol=1e-5, atol=2e-5)) - - def test_compand_fade(self): - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("compand", ["0.3,1", "6:-70,-60,-20", "-5", "-90", "0.2"]) - E.append_effect_to_chain("fade", ["q", "0.25", "0", "0.33"]) - x, _ = E.sox_build_flow_effects() - # check if effect worked - # print(x.size()) - - def test_biquad_delay(self): - si, _ = torchaudio.info(self.test_filepath) - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - E.append_effect_to_chain("biquad", ["0.25136437", "0.50272873", "0.25136437", - "1.0", "-0.17123075", "0.17668821"]) - E.append_effect_to_chain("delay", ["15000s"]) - x, _ = E.sox_build_flow_effects() - # check if effect worked - self.assertTrue(x.size(1) == (si.length / si.channels) + 15000) - - def test_invalid_effect_name(self): - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - # there is no effect named "special" - with self.assertRaises(LookupError): - E.append_effect_to_chain("special", [""]) - - def test_unimplemented_effect(self): - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - # the sox spectrogram function is not implemented in torchaudio - with self.assertRaises(NotImplementedError): - E.append_effect_to_chain("spectrogram", [""]) - - def test_invalid_effect_options(self): - E = torchaudio.sox_effects.SoxEffectsChain() - E.set_input_file(self.test_filepath) - # first two options should be combined to "0.3,1" - E.append_effect_to_chain("compand", ["0.3", "1", "6:-70,-60,-20", "-5", "-90", "0.2"]) - with self.assertRaises(RuntimeError): - E.sox_build_flow_effects() diff --git a/torchaudio/__init__.py b/torchaudio/__init__.py index 389d91f635..550b6a2b7a 100644 --- a/torchaudio/__init__.py +++ b/torchaudio/__init__.py @@ -24,36 +24,8 @@ SignalInfo, EncodingInfo, ) -from torchaudio.sox_effects import ( - init_sox_effects as _init_sox_effects, - shutdown_sox_effects as _shutdown_sox_effects, -) try: from .version import __version__, git_version # noqa: F401 except ImportError: pass - - -@_mod_utils.deprecated( - "Please remove the function call to initialize_sox. " - "Resource initialization is now automatically handled.") -def initialize_sox(): - """Initialize sox effects. - - This function is deprecated. See :py:func:`torchaudio.sox_effects.init_sox_effects` - """ - _init_sox_effects() - - -@_mod_utils.deprecated( - "Please remove the function call to torchaudio.shutdown_sox. " - "Resource clean up is now automatically handled. " - "In the unlikely event that you need to manually shutdown sox, " - "please use torchaudio.sox_effects.shutdown_sox_effects.") -def shutdown_sox(): - """Shutdown sox effects. - - This function is deprecated. See :py:func:`torchaudio.sox_effects.shutdown_sox_effects` - """ - _shutdown_sox_effects() diff --git a/torchaudio/csrc/sox.cpp b/torchaudio/csrc/sox.cpp index 0f099946fd..3be7d01314 100644 --- a/torchaudio/csrc/sox.cpp +++ b/torchaudio/csrc/sox.cpp @@ -175,207 +175,10 @@ void write_audio_file( } } -int build_flow_effects(const std::string& file_name, - at::Tensor otensor, - bool ch_first, - sox_signalinfo_t* target_signal, - sox_encodinginfo_t* target_encoding, - const char* file_type, - std::vector pyeffs, - int max_num_eopts) { - - /* This function builds an effects flow and puts the results into a tensor. - It can also be used to re-encode audio using any of the available encoding - options in SoX including sample rate and channel re-encoding. */ - - // open input - sox_format_t* input = sox_open_read(file_name.c_str(), nullptr, nullptr, nullptr); - if (input == nullptr) { - throw std::runtime_error("Error opening audio file"); - } - - // only used if target signal or encoding are null - sox_signalinfo_t empty_signal; - sox_encodinginfo_t empty_encoding; - - // set signalinfo and encodinginfo if blank - if(target_signal == nullptr) { - target_signal = &empty_signal; - target_signal->rate = input->signal.rate; - target_signal->channels = input->signal.channels; - target_signal->length = SOX_UNSPEC; - target_signal->precision = input->signal.precision; -#if SOX_LIB_VERSION_CODE >= 918272 // >= 14.3.0 - target_signal->mult = nullptr; -#endif - } - if(target_encoding == nullptr) { - target_encoding = &empty_encoding; - target_encoding->encoding = SOX_ENCODING_SIGN2; // Sample format - target_encoding->bits_per_sample = input->signal.precision; // Bits per sample - target_encoding->compression = 0.0; // Compression factor - target_encoding->reverse_bytes = sox_option_default; // Should bytes be reversed - target_encoding->reverse_nibbles = sox_option_default; // Should nibbles be reversed - target_encoding->reverse_bits = sox_option_default; // Should bits be reversed (pairs of bits?) - target_encoding->opposite_endian = sox_false; // Reverse endianness - } - - // check for rate or channels effect and change the output signalinfo accordingly - for (SoxEffect se : pyeffs) { - if (se.ename == "rate") { - target_signal->rate = std::stod(se.eopts[0]); - } else if (se.ename == "channels") { - target_signal->channels = std::stoi(se.eopts[0]); - } - } - - // create interm_signal for effects, intermediate steps change this in-place - sox_signalinfo_t interm_signal = input->signal; - -#ifdef __APPLE__ - // According to Mozilla Deepspeech sox_open_memstream_write doesn't work - // with OSX - char tmp_name[] = "/tmp/fileXXXXXX"; - int tmp_fd = mkstemp(tmp_name); - close(tmp_fd); - sox_format_t* output = sox_open_write(tmp_name, target_signal, - target_encoding, "wav", nullptr, nullptr); -#else - // create buffer and buffer_size for output in memwrite - char* buffer; - size_t buffer_size; - // in-memory descriptor (this may not work for OSX) - sox_format_t* output = sox_open_memstream_write(&buffer, - &buffer_size, - target_signal, - target_encoding, - file_type, nullptr); -#endif - if (output == nullptr) { - throw std::runtime_error("Error opening output memstream/temporary file"); - } - // Setup the effects chain to decode/resample - sox_effects_chain_t* chain = - sox_create_effects_chain(&input->encoding, &output->encoding); - - sox_effect_t* e = sox_create_effect(sox_find_effect("input")); - char* io_args[1]; - io_args[0] = (char*)input; - sox_effect_options(e, 1, io_args); - sox_add_effect(chain, e, &interm_signal, &input->signal); - free(e); - - for(SoxEffect tae : pyeffs) { - if(tae.ename == "no_effects") break; - e = sox_create_effect(sox_find_effect(tae.ename.c_str())); - e->global_info->global_info->verbosity = 1; - if(tae.eopts[0] == "") { - sox_effect_options(e, 0, nullptr); - } else { - int num_opts = tae.eopts.size(); - char* sox_args[max_num_eopts]; - for(std::vector::size_type i = 0; i != tae.eopts.size(); i++) { - sox_args[i] = (char*) tae.eopts[i].c_str(); - } - if(sox_effect_options(e, num_opts, sox_args) != SOX_SUCCESS) { -#ifdef __APPLE__ - unlink(tmp_name); -#endif - throw std::runtime_error("invalid effect options, see SoX docs for details"); - } - } - sox_add_effect(chain, e, &interm_signal, &output->signal); - free(e); - } - - e = sox_create_effect(sox_find_effect("output")); - io_args[0] = (char*)output; - sox_effect_options(e, 1, io_args); - sox_add_effect(chain, e, &interm_signal, &output->signal); - free(e); - - // Finally run the effects chain - sox_flow_effects(chain, nullptr, nullptr); - sox_delete_effects_chain(chain); - - // Close sox handles, buffer does not get properly sized until these are closed - sox_close(output); - sox_close(input); - - int sr; - // Read the in-memory audio buffer or temp file that we just wrote. -#ifdef __APPLE__ - /* - Temporary filetype must have a valid header. Wav seems to work here while - raw does not. Certain effects like chorus caused strange behavior on the mac. - */ - // read_audio_file reads the temporary file and returns the sr and otensor - sr = read_audio_file(tmp_name, otensor, ch_first, 0, 0, - target_signal, target_encoding, "wav"); - // delete temporary audio file - unlink(tmp_name); -#else - // Resize output tensor to desired dimensions, different effects result in output->signal.length, - // interm_signal.length and buffer size being inconsistent with the result of the file output. - // We prioritize in the order: output->signal.length > interm_signal.length > buffer_size - // Could be related to: https://sourceforge.net/p/sox/bugs/314/ - int nc, ns; - if (output->signal.length == 0) { - // sometimes interm_signal length is extremely large, but the buffer_size - // is double the length of the output signal - if (interm_signal.length > (buffer_size * 10)) { - ns = buffer_size / 2; - } else { - ns = interm_signal.length; - } - nc = interm_signal.channels; - } else { - nc = output->signal.channels; - ns = output->signal.length; - } - otensor.resize_({ns/nc, nc}); - otensor = otensor.contiguous(); - - input = sox_open_mem_read(buffer, buffer_size, target_signal, target_encoding, file_type); - std::vector samples(buffer_size); - const int64_t samples_read = sox_read(input, samples.data(), buffer_size); - assert(samples_read != nc * ns && samples_read != 0); - AT_DISPATCH_ALL_TYPES(otensor.scalar_type(), "effects_buffer", [&] { - auto* data = otensor.data_ptr(); - std::copy(samples.begin(), samples.begin() + samples_read, data); - }); - // free buffer and close mem_read - sox_close(input); - free(buffer); - - if (ch_first) { - otensor.transpose_(1, 0); - } - sr = target_signal->rate; - -#endif - // return sample rate, output tensor modified in-place - return sr; -} } // namespace audio } // namespace torch PYBIND11_MODULE(_torchaudio, m) { - py::class_(m, "SoxEffect") - .def(py::init<>()) - .def("__repr__", [](const torch::audio::SoxEffect &self) { - std::stringstream ss; - std::string sep; - ss << "SoxEffect (" << self.ename << " ,["; - for(std::string s : self.eopts) { - ss << sep << "\"" << s << "\""; - sep = ", "; - } - ss << "])\n"; - return ss.str(); - }) - .def_readwrite("ename", &torch::audio::SoxEffect::ename) - .def_readwrite("eopts", &torch::audio::SoxEffect::eopts); py::class_(m, "sox_signalinfo_t") .def(py::init<>()) .def("__repr__", [](const sox_signalinfo_t &self) { @@ -468,8 +271,4 @@ PYBIND11_MODULE(_torchaudio, m) { "get_info", &torch::audio::get_info, "Gets information about an audio file"); - m.def( - "build_flow_effects", - &torch::audio::build_flow_effects, - "build effects and flow chain into tensors"); } diff --git a/torchaudio/csrc/sox.h b/torchaudio/csrc/sox.h index 8093f0732e..1344031330 100644 --- a/torchaudio/csrc/sox.h +++ b/torchaudio/csrc/sox.h @@ -44,26 +44,4 @@ void write_audio_file( /// error occurred during reading of the audio data. std::tuple get_info( const std::string& file_name); - -// Struct for build_flow_effects function -struct SoxEffect { - SoxEffect() : ename(""), eopts({""}) { } - std::string ename; - std::vector eopts; -}; - -/// Build a SoX chain, flow the effects, and capture the results in a tensor. -/// An audio file from the given `path` flows through an effects chain given -/// by a list of effects and effect options to an output buffer which is encoded -/// into memory to a target signal type and target signal encoding. The resulting -/// buffer is then placed into a tensor. This function returns the output tensor -/// and the sample rate of the output tensor. -int build_flow_effects(const std::string& file_name, - at::Tensor otensor, - bool ch_first, - sox_signalinfo_t* target_signal, - sox_encodinginfo_t* target_encoding, - const char* file_type, - std::vector pyeffs, - int max_num_eopts); }} // namespace torch::audio diff --git a/torchaudio/sox_effects/__init__.py b/torchaudio/sox_effects/__init__.py index d9650173c5..5baf406b10 100644 --- a/torchaudio/sox_effects/__init__.py +++ b/torchaudio/sox_effects/__init__.py @@ -5,8 +5,6 @@ effect_names, apply_effects_tensor, apply_effects_file, - SoxEffect, - SoxEffectsChain, ) diff --git a/torchaudio/sox_effects/sox_effects.py b/torchaudio/sox_effects/sox_effects.py index 9262786541..8b50d19aa7 100644 --- a/torchaudio/sox_effects/sox_effects.py +++ b/torchaudio/sox_effects/sox_effects.py @@ -1,19 +1,11 @@ -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import List, Tuple import torch -from torch import Tensor -from torchaudio._internal import ( - module_utils as _mod_utils, - misc_ops as _misc_ops, -) +from torchaudio._internal import module_utils as _mod_utils from torchaudio.utils.sox_utils import list_effects -if _mod_utils.is_module_available('torchaudio._torchaudio'): - from torchaudio import _torchaudio - - @_mod_utils.requires_module('torchaudio._torchaudio') def init_sox_effects(): """Initialize resources required to use sox effects. @@ -257,189 +249,3 @@ def apply_effects_file( """ signal = torch.ops.torchaudio.sox_effects_apply_effects_file(path, effects, normalize, channels_first) return signal.get_tensor(), signal.get_sample_rate() - - -@_mod_utils.requires_module('torchaudio._torchaudio') -@_mod_utils.deprecated('Please migrate to `apply_effects_file` or `apply_effects_tensor`.') -def SoxEffect(): - r"""Create an object for passing sox effect information between python and c++ - - Warning: - This function is deprecated. - Please migrate to :func:`apply_effects_file` or :func:`apply_effects_tensor`. - - Returns: - SoxEffect: An object with the following attributes: ename (str) which is the - name of effect, and eopts (List[str]) which is a list of effect options. - """ - return _torchaudio.SoxEffect() - - -@_mod_utils.deprecated('Please migrate to `apply_effects_file` or `apply_effects_tensor`.') -class SoxEffectsChain(object): - r"""SoX effects chain class. - - Warning: - This class is deprecated. - Please migrate to :func:`apply_effects_file` or :func:`apply_effects_tensor`. - - Args: - normalization (bool, number, or callable, optional): - If boolean ``True``, then output is divided by ``1 << 31`` - (assumes signed 32-bit audio), and normalizes to ``[-1, 1]``. - If ``number``, then output is divided by that number. - If ``callable``, then the output is passed as a parameter to the given function, then - the output is divided by the result. (Default: ``True``) - channels_first (bool, optional): - Set channels first or length first in result. (Default: ``True``) - out_siginfo (sox_signalinfo_t, optional): - a sox_signalinfo_t type, which could be helpful if the audio type cannot be - automatically determined. (Default: ``None``) - out_encinfo (sox_encodinginfo_t, optional): - a sox_encodinginfo_t type, which could be set if the audio type cannot be - automatically determined. (Default: ``None``) - filetype (str, optional): - a filetype or extension to be set if sox cannot determine it automatically. - (Default: ``'raw'``) - - Returns: - Tuple[Tensor, int]: - An output Tensor of size ``[C x L]`` or ``[L x C]`` where L is the number - of audio frames and C is the number of channels. An integer which is the sample rate of the - audio (as listed in the metadata of the file) - - Example - >>> class MyDataset(Dataset): - ... def __init__(self, audiodir_path): - ... self.data = [ - ... os.path.join(audiodir_path, fn) - ... for fn in os.listdir(audiodir_path)] - ... self.E = torchaudio.sox_effects.SoxEffectsChain() - ... self.E.append_effect_to_chain("rate", [16000]) # resample to 16000hz - ... self.E.append_effect_to_chain("channels", ["1"]) # mono signal - ... def __getitem__(self, index): - ... fn = self.data[index] - ... self.E.set_input_file(fn) - ... x, sr = self.E.sox_build_flow_effects() - ... return x, sr - ... - ... def __len__(self): - ... return len(self.data) - ... - >>> ds = MyDataset(path_to_audio_files) - >>> for sig, sr in ds: - ... pass - """ - - EFFECTS_UNIMPLEMENTED = {"spectrogram", "splice", "noiseprof", "fir"} - - def __init__(self, - normalization: Union[bool, float, Callable] = True, - channels_first: bool = True, - out_siginfo: Any = None, - out_encinfo: Any = None, - filetype: str = "raw") -> None: - self.input_file: Optional[str] = None - self.chain: List[str] = [] - self.MAX_EFFECT_OPTS = 20 - self.out_siginfo = out_siginfo - self.out_encinfo = out_encinfo - self.filetype = filetype - self.normalization = normalization - self.channels_first = channels_first - - # Define in __init__ to avoid calling at import time - self.EFFECTS_AVAILABLE = set(effect_names()) - - def append_effect_to_chain(self, - ename: str, - eargs: Optional[Union[List[str], str]] = None) -> None: - r"""Append effect to a sox effects chain. - - Args: - ename (str): which is the name of effect - eargs (List[str] or str, optional): which is a list of effect options. (Default: ``None``) - """ - e = SoxEffect() - # check if we have a valid effect - ename = self._check_effect(ename) - if eargs is None or eargs == []: - eargs = [""] - elif not isinstance(eargs, list): - eargs = [eargs] - eargs = self._flatten(eargs) - if len(eargs) > self.MAX_EFFECT_OPTS: - raise RuntimeError("Number of effect options ({}) is greater than max " - "suggested number of options {}. Increase MAX_EFFECT_OPTS " - "or lower the number of effect options".format(len(eargs), self.MAX_EFFECT_OPTS)) - e.ename = ename - e.eopts = eargs - self.chain.append(e) - - @_mod_utils.requires_module('torchaudio._torchaudio') - def sox_build_flow_effects(self, - out: Optional[Tensor] = None) -> Tuple[Tensor, int]: - r"""Build effects chain and flow effects from input file to output tensor - - Args: - out (Tensor, optional): Where the output will be written to. (Default: ``None``) - - Returns: - Tuple[Tensor, int]: An output Tensor of size `[C x L]` or `[L x C]` where - L is the number of audio frames and C is the number of channels. - An integer which is the sample rate of the audio (as listed in the metadata of the file) - """ - # initialize output tensor - if out is not None: - _misc_ops.check_input(out) - else: - out = torch.FloatTensor() - if not len(self.chain): - e = SoxEffect() - e.ename = "no_effects" - e.eopts = [""] - self.chain.append(e) - - # print("effect options:", [x.eopts for x in self.chain]) - - sr = _torchaudio.build_flow_effects(self.input_file, - out, - self.channels_first, - self.out_siginfo, - self.out_encinfo, - self.filetype, - self.chain, - self.MAX_EFFECT_OPTS) - - _misc_ops.normalize_audio(out, self.normalization) - - return out, sr - - def clear_chain(self) -> None: - r"""Clear effects chain in python - """ - self.chain = [] - - def set_input_file(self, input_file: str) -> None: - r"""Set input file for input of chain - - Args: - input_file (str): The path to the input file. - """ - self.input_file = input_file - - def _check_effect(self, e: str) -> str: - if e.lower() in self.EFFECTS_UNIMPLEMENTED: - raise NotImplementedError("This effect ({}) is not implement in torchaudio".format(e)) - elif e.lower() not in self.EFFECTS_AVAILABLE: - raise LookupError("Effect name, {}, not valid".format(e.lower())) - return e.lower() - - # https://stackoverflow.com/questions/12472338/flattening-a-list-recursively - # convenience function to flatten list recursively - def _flatten(self, x: list) -> list: - if x == []: - return [] - if isinstance(x[0], list): - return self._flatten(x[:1]) + self._flatten(x[:1]) - return [str(a) for a in x[:1]] + self._flatten(x[1:])