Skip to content

Commit

Permalink
Changed audio reading to pysoundfile
Browse files Browse the repository at this point in the history
  • Loading branch information
mbsantiago committed Sep 9, 2023
1 parent 394acba commit 0c67aea
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 54 deletions.
80 changes: 55 additions & 25 deletions src/soundevent/audio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
"""

import os
from typing import Optional, Tuple
from typing import Dict, Optional, Tuple

import numpy as np
import soundfile as sf
import xarray as xr
from scipy.io import wavfile

from soundevent.audio.chunks import parse_into_chunks
from soundevent.audio.media_info import extract_media_info_from_chunks
from soundevent.audio.raw import RawData
from soundevent.data.clips import Clip
from soundevent.data.recordings import Recording

Expand All @@ -18,6 +21,17 @@
"load_recording",
]

PCM_SUBFORMATS_MAPPING: Dict[Tuple[int, int], str] = {
(1, 16): "PCM_16",
(1, 24): "PCM_24",
(1, 32): "PCM_32",
(1, 8): "PCM_U8",
(3, 32): "FLOAT",
(3, 64): "DOUBLE",
(6, 8): "ALAW",
(7, 8): "ULAW",
}


def load_audio(
path: os.PathLike,
Expand All @@ -42,30 +56,46 @@ def load_audio(
samplerate : int
The sample rate of the audio file in Hz.
"""
if offset == 0 and samples is None:
samplerate, data = wavfile.read(path, mmap=False)
else:
samplerate, mmap = wavfile.read(path, mmap=True)

if samples is None:
end_index = None
else:
end_index = offset + samples

data = mmap[offset:end_index]
# Add channel dimension if necessary
if data.ndim == 1:
data = data[:, None]

# Convert to float if necessary
if data.dtype == "int16":
data = data.astype("float32") / np.iinfo("int16").max
if data.dtype == "int32":
data = data.astype("float32") / np.iinfo("int32").max

return data, samplerate
"""
if samples is None:
samples = -1

with open(path, "rb") as fp:
chunks = parse_into_chunks(fp)

# Extract the media information from the fmt chunk.
fmt = chunks.subchunks["fmt "]
media_info = extract_media_info_from_chunks(fp, fmt)

# Get the subformat for the soundfile library to
# read the audio data.
subformat = PCM_SUBFORMATS_MAPPING.get(
(media_info.audio_format, media_info.bit_depth)
)
if subformat is None:
raise ValueError(
f"Unsupported audio format: {media_info.audio_format} "
f"with bit depth {media_info.bit_depth}."
"Valid formats are: "
f"{PCM_SUBFORMATS_MAPPING.keys()}."
)

# Position the file pointer at the start of the data chunk.
data = chunks.subchunks["data"]
raw = RawData(fp, data)

return sf.read(
raw,
start=offset,
frames=samples,
dtype="float32",
always_2d=True,
format="RAW",
subtype=subformat,
samplerate=media_info.samplerate,
channels=media_info.channels,
)


def load_recording(recording: Recording) -> xr.DataArray:
Expand Down
120 changes: 92 additions & 28 deletions src/soundevent/audio/media_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import hashlib
import os
from dataclasses import dataclass
from typing import Union
from typing import IO, Union

from soundevent.audio.chunks import parse_into_chunks
from soundevent.audio.chunks import Chunk, parse_into_chunks

__all__ = [
"MediaInfo",
Expand All @@ -16,6 +16,36 @@
PathLike = Union[os.PathLike, str]


@dataclass
class FormatInfo:
"""Information stored in the format chunk."""

audio_format: int
"""Format code for the waveform audio data."""

bit_depth: int
"""Bit depth."""

samplerate: int
"""Sample rate in Hz."""

channels: int
"""Number of channels."""

byte_rate: int
"""Byte rate.
byte_rate = samplerate * channels * bit_depth/8
"""

block_align: int
"""Block align.
The number of bytes for one sample including all channels.
block_align = channels * bit_depth/8
"""


@dataclass
class MediaInfo:
"""Media information."""
Expand All @@ -39,6 +69,50 @@ class MediaInfo:
"""Number of channels."""


def extract_media_info_from_chunks(
fp: IO[bytes],
fmt_chunk: Chunk,
) -> FormatInfo:
"""Return the media information from the fmt chunk.
Parameters
----------
fp : BytesIO
File pointer to the WAV file.
chunk : Chunk
The fmt chunk.
Returns
-------
MediaInfo
Notes
-----
The structure of the format chunk is described in
(WAV PCM soundfile format)[http://soundfile.sapp.org/doc/WaveFormat/].
"""
# Go to the start of the fmt chunk after the chunk id and
# chunk size.
fp.seek(fmt_chunk.position + 8)

audio_format = int.from_bytes(fp.read(2), "little")
channels = int.from_bytes(fp.read(2), "little")
samplerate = int.from_bytes(fp.read(4), "little")
byte_rate = int.from_bytes(fp.read(4), "little")
block_align = int.from_bytes(fp.read(2), "little")
bit_depth = int.from_bytes(fp.read(2), "little")

return FormatInfo(
audio_format=audio_format,
bit_depth=bit_depth,
samplerate=samplerate,
channels=channels,
byte_rate=byte_rate,
block_align=block_align,
)


def get_media_info(path: PathLike) -> MediaInfo:
"""Return the media information from the WAV file.
Expand All @@ -65,37 +139,27 @@ def get_media_info(path: PathLike) -> MediaInfo:
with open(path, "rb") as wav:
chunk = parse_into_chunks(wav)

# Get info from the fmt chunk. The fmt chunk is the first
# subchunk of the root chunk.
fmt_chunk = chunk.subchunks["fmt "]

# Go to the start of the fmt chunk after the chunk id and
# chunk size.
wav.seek(fmt_chunk.position + 8)

audio_format = int.from_bytes(wav.read(2), "little")
channels = int.from_bytes(wav.read(2), "little")
samplerate = int.from_bytes(wav.read(4), "little")
wav.read(4) # Skip byte rate.
wav.read(2) # Skip block align.
bit_depth = int.from_bytes(wav.read(2), "little")
# Get info from the fmt chunk
fmt = chunk.subchunks["fmt "]
fmt_info = extract_media_info_from_chunks(wav, fmt)

# Get size of data chunk. Notice that the size of the data
# chunk is the size of the data subchunk divided by the number
# of channels and the bit depth.
data_chunk = chunk.subchunks["data"]
samples = 8 * data_chunk.size // (channels * bit_depth)

duration = samples / samplerate

return MediaInfo(
audio_format=audio_format,
bit_depth=audio_format,
samplerate_hz=samplerate,
channels=channels,
samples=samples,
duration_s=duration,
)
samples = (
8 * data_chunk.size // (fmt_info.channels * fmt_info.bit_depth)
)
duration = samples / fmt_info.samplerate

return MediaInfo(
audio_format=fmt_info.audio_format,
bit_depth=fmt_info.bit_depth,
samplerate_hz=fmt_info.samplerate,
duration_s=duration,
samples=samples,
channels=fmt_info.channels,
)


BUFFER_SIZE = 65536
Expand Down
136 changes: 136 additions & 0 deletions src/soundevent/audio/raw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Raw Audio module.
This module contains the RawData class which is a
file-like object that wraps the data buffer of a
WAV file and is meant to replicate the structure
of a RAW audio file.
A RAW audio file is a file that contains only the
contents of the data chunk of a WAV file without
any of the other chunks.
Handling RAW audio files is useful as WAV files
can come with various chunks that are not standard,
such as the Guano metadata chunk. This unexpected
chunks can sometimes cause problems when reading
the WAV file with other libraries and so it is
useful to be able to read only the data chunk of
a WAV file.
"""

import os
from io import BufferedIOBase, RawIOBase
from typing import Optional

from soundevent.audio.chunks import Chunk


class RawData(RawIOBase):
"""A file-like object that wraps a the data buffer of a WAV file.
This file-like object only contains the data buffer of a WAV without any
of the other chunks.
"""

chunk: Chunk
"""The chunk that is being read."""

initial_position: int
"""The initial position of the file pointer.
Should point to the start of the data chunk.
"""

fp: BufferedIOBase
"""The file pointer to the WAV file."""

size: int
"""The size of the data chunk in bytes."""

def __init__(self, fp: BufferedIOBase, chunk: Chunk):
"""Initialize a new RawData object."""
self.chunk = chunk
self.fp = fp
self.size = chunk.size

# Position the file pointer at the start of the data chunk.
# We add 8 to the position to account for the chunk id and
# chunk size.
self.initial_position = chunk.position + 8

# Position the file pointer at the start of the data chunk.
self.fp.seek(self.initial_position)

assert self.fp.tell() == self.initial_position

def close(self) -> None:
"""Close the file."""
self.fp.close()

@property
def closed(self) -> bool:
"""Return True if the file is closed."""
return self.fp.closed

def fileno(self) -> int:
"""Return the file descriptor."""
return self.fp.fileno()

def flush(self) -> None:
"""Flush the file."""

def isatty(self) -> bool:
"""Return True if the file is a tty."""
return False

def readable(self) -> bool:
"""Return True if the file is readable."""
return True

def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int:
"""Seek the file pointer."""
if whence == os.SEEK_SET:
return self.fp.seek(
self.initial_position + offset,
os.SEEK_SET,
)

if whence == os.SEEK_END:
return self.fp.seek(
self.initial_position + self.size + offset,
os.SEEK_SET,
)

return self.fp.seek(offset, whence)

def seekable(self) -> bool:
"""Return True if the file is seekable."""
return True

def tell(self) -> int:
"""Return the file pointer position."""
return self.fp.tell() - self.initial_position

def truncate(self, size: Optional[int] = None, /) -> int:
"""Truncate the file."""
if size is None:
size = self.tell()
return self.fp.truncate(size)

def writable(self) -> bool:
"""Return True if the file is writable."""
return False

def read(self, size: int = -1, /) -> bytes:
"""Read bytes from the file."""
if size == -1:
size = self.size - self.tell()
return self.fp.read(size)

def readall(self, /) -> bytes:
"""Read all bytes from the file."""
return self.fp.read(self.size - self.tell())

def readinto(self, b, /):
"""Read bytes into a buffer."""
return self.fp.readinto(b)
Binary file added tests/test_audio/24bitdepth.wav
Binary file not shown.
Loading

0 comments on commit 0c67aea

Please sign in to comment.