Skip to content

Commit

Permalink
aiff: Update in-memory offsets when modifying chunks
Browse files Browse the repository at this point in the history
This allows to safely perform multiple chunk insert / delete / resize operations on loaded IFF files.
  • Loading branch information
phw committed Oct 23, 2019
1 parent ffcb595 commit cd90a5f
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 104 deletions.
249 changes: 168 additions & 81 deletions mutagen/aiff.py
Expand Up @@ -19,8 +19,14 @@

from mutagen.id3 import ID3
from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
from mutagen._util import resize_bytes, delete_bytes, MutagenError, loadfile, \
convert_error
from mutagen._util import (
MutagenError,
convert_error,
delete_bytes,
insert_bytes,
loadfile,
resize_bytes,
)

__all__ = ["AIFF", "Open", "delete"]

Expand Down Expand Up @@ -74,69 +80,79 @@ class IFFChunk(object):
# Chunk headers are 8 bytes long (4 for ID and 4 for the size)
HEADER_SIZE = 8

def __init__(self, fileobj, parent_chunk=None):
self.__fileobj = fileobj
self.parent_chunk = parent_chunk
self.offset = fileobj.tell()
@classmethod
def parse(cls, fileobj, parent_chunk=None):
header = fileobj.read(cls.HEADER_SIZE)
if len(header) < cls.HEADER_SIZE:
raise InvalidChunk('Header size < %i' % cls.HEADER_SIZE)

header = fileobj.read(self.HEADER_SIZE)
if len(header) < self.HEADER_SIZE:
raise InvalidChunk()
id, data_size = struct.unpack('>4sI', header)
try:
id = id.decode('ascii').rstrip()
except UnicodeDecodeError as e:
raise InvalidChunk(e)

self.id, self.data_size = struct.unpack('>4si', header)
self.data_offset = fileobj.tell()
if not is_valid_chunk_id(id):
raise InvalidChunk('Invalid chunk ID %s' % id)

try:
self.id = self.id.decode('ascii')
except UnicodeDecodeError:
raise InvalidChunk()
return cls.get_class(id)(fileobj, id, data_size, parent_chunk)

if not is_valid_chunk_id(self.id):
raise InvalidChunk()
@classmethod
def get_class(cls, id):
if id == 'FORM':
return FormIFFChunk
else:
return cls

def __init__(self, fileobj, id, data_size, parent_chunk):
self._fileobj = fileobj
self.id = id
self.data_size = data_size
self.parent_chunk = parent_chunk
self.data_offset = fileobj.tell()
self.offset = self.data_offset - self.HEADER_SIZE
self._calculate_size()

def read(self):
"""Read the chunks data"""

self.__fileobj.seek(self.data_offset)
return self.__fileobj.read(self.data_size)
self._fileobj.seek(self.data_offset)
return self._fileobj.read(self.data_size)

def write(self, data):
"""Write the chunk data"""

if len(data) > self.data_size:
raise ValueError

self.__fileobj.seek(self.data_offset)
self.__fileobj.write(data)
self._fileobj.seek(self.data_offset)
self._fileobj.write(data)
# Write the padding bytes
padding = self.padding()
if padding:
self.__fileobj.seek(self.data_offset + self.data_size + 1)
self.__fileobj.write(b'\x00' * padding)
self._fileobj.seek(self.data_offset + self.data_size + 1)
self._fileobj.write(b'\x00' * padding)

def delete(self):
"""Removes the chunk from the file"""

delete_bytes(self.__fileobj, self.size, self.offset)
delete_bytes(self._fileobj, self.size, self.offset)
if self.parent_chunk is not None:
self.parent_chunk._update_size(
self.parent_chunk.data_size - self.size)
self.parent_chunk._remove_subchunk(self)

def _update_size(self, data_size):
def _update_size(self, size_diff, changed_subchunk=None):
"""Update the size of the chunk"""

self.__fileobj.seek(self.offset + 4)
self.__fileobj.write(pack('>I', data_size))
if self.parent_chunk is not None:
new_padding = data_size % 2
size_diff = (self.data_size + self.padding()) \
- (data_size + new_padding)
self.parent_chunk._update_size(
self.parent_chunk.data_size - size_diff)
self.data_size = data_size
old_size = self.size
self.data_size += size_diff
self._fileobj.seek(self.offset + 4)
self._fileobj.write(pack('>I', self.data_size))
self._calculate_size()
if self.parent_chunk is not None:
self.parent_chunk._update_size(self.size - old_size, self)
if changed_subchunk:
self._update_sibling_offsets(
changed_subchunk, old_size - self.size)

def _calculate_size(self):
self.size = self.HEADER_SIZE + self.data_size + self.padding()
Expand All @@ -146,9 +162,10 @@ def resize(self, new_data_size):
"""Resize the file and update the chunk sizes"""

padding = new_data_size % 2
resize_bytes(self.__fileobj, self.data_size + self.padding(),
resize_bytes(self._fileobj, self.data_size + self.padding(),
new_data_size + padding, self.data_offset)
self._update_size(new_data_size)
size_diff = new_data_size - self.data_size
self._update_size(size_diff)

def padding(self):
"""Returns the number of padding bytes (0 or 1).
Expand All @@ -158,77 +175,147 @@ def padding(self):
return self.data_size % 2


class FormIFFChunk(IFFChunk):
"""A IFF chunk containing other chunks.
This is either a 'LIST' or 'RIFF'
"""

MIN_DATA_SIZE = 4

def __init__(self, fileobj, id, data_size, parent_chunk):
if id != u'FORM':
raise InvalidChunk('Expected FORM chunk, got %s' % id)

IFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)

# Lists always store an addtional identifier as 4 bytes
if data_size < self.MIN_DATA_SIZE:
raise InvalidChunk('FORM data size < %i' % self.MIN_DATA_SIZE)

# Read the FORM id (usually AIFF)
try:
self.name = fileobj.read(4).decode('ascii')
except UnicodeDecodeError as e:
raise error(e)

# Load all IFF subchunks
self.__subchunks = []

def subchunks(self):
"""Returns a list of all subchunks.
The list is lazily loaded on first access.
"""
if not self.__subchunks:
next_offset = self.data_offset + 4
while next_offset < self.offset + self.size:
self._fileobj.seek(next_offset)
try:
chunk = IFFChunk.parse(self._fileobj, self)
except InvalidChunk:
break
self.__subchunks.append(chunk)

# Calculate the location of the next chunk
next_offset = chunk.offset + chunk.size
return self.__subchunks

def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the FORM chunk"""

assert isinstance(id_, text_type)

if not is_valid_chunk_id(id_):
raise KeyError("Invalid IFF key.")

next_offset = self.offset + self.size
size = self.HEADER_SIZE
data_size = 0
if data:
data_size = len(data)
padding = data_size % 2
size += data_size + padding
insert_bytes(self._fileobj, size, next_offset)
self._fileobj.seek(next_offset)
self._fileobj.write(
pack('>4si', id_.ljust(4).encode('ascii'), data_size))
self._fileobj.seek(next_offset)
chunk = IFFChunk.parse(self._fileobj, self)
self._update_size(chunk.size)
if data:
chunk.write(data)
self.subchunks().append(chunk)
return chunk

def _remove_subchunk(self, chunk):
assert chunk in self.__subchunks
self._update_size(-chunk.size, chunk)
self.__subchunks.remove(chunk)

def _update_sibling_offsets(self, changed_subchunk, size_diff):
"""Update the offsets of subchunks after `changed_subchunk`.
"""
index = self.__subchunks.index(changed_subchunk)
sibling_chunks = self.__subchunks[index + 1:len(self.__subchunks)]
for sibling in sibling_chunks:
sibling.offset -= size_diff
sibling.data_offset -= size_diff


class IFFFile(object):
"""Representation of a IFF file"""

def __init__(self, fileobj):
self.__fileobj = fileobj
self.__chunks = {}

# AIFF Files always start with the FORM chunk which contains a 4 byte
# ID before the start of other chunks
fileobj.seek(0)
self.__chunks[u'FORM'] = IFFChunk(fileobj)

# Skip past the 4 byte FORM id
fileobj.seek(IFFChunk.HEADER_SIZE + 4)
self.root = IFFChunk.parse(fileobj)

# Where the next chunk can be located. We need to keep track of this
# since the size indicated in the FORM header may not match up with the
# offset determined from the size of the last chunk in the file
self.__next_offset = fileobj.tell()

# Load all of the chunks
while True:
try:
chunk = IFFChunk(fileobj, self[u'FORM'])
except InvalidChunk:
break
self.__chunks[chunk.id.strip()] = chunk

# Calculate the location of the next chunk,
# considering the pad byte
self.__next_offset = chunk.offset + chunk.size
fileobj.seek(self.__next_offset)
if self.root.id != u'FORM':
raise InvalidChunk("Root chunk must be a RIFF chunk, got %s"
% self.root.id)

def __contains__(self, id_):
"""Check if the IFF file contains a specific chunk"""

assert_valid_chunk_id(id_)

return id_ in self.__chunks
try:
self[id_]
return True
except KeyError:
return False

def __getitem__(self, id_):
"""Get a chunk from the IFF file"""

assert_valid_chunk_id(id_)

try:
return self.__chunks[id_]
except KeyError:
raise KeyError(
"%r has no %r chunk" % (self.__fileobj, id_))
if id_ == 'FORM': # For backwards compatibility
return self.root
found_chunk = None
for chunk in self.root.subchunks():
if chunk.id == id_:
found_chunk = chunk
break
else:
raise KeyError("No %r chunk found" % id_)
return found_chunk

def __delitem__(self, id_):
"""Remove a chunk from the IFF file"""

assert_valid_chunk_id(id_)
self.delete_chunk(id_)

self.__chunks.pop(id_).delete()

def insert_chunk(self, id_):
"""Insert a new chunk at the end of the IFF file"""
def delete_chunk(self, id_):
"""Remove a chunk from the RIFF file"""

assert_valid_chunk_id(id_)
self[id_].delete()

self.__fileobj.seek(self.__next_offset)
self.__fileobj.write(pack('>4si', id_.ljust(4).encode('ascii'), 0))
self.__fileobj.seek(self.__next_offset)
chunk = IFFChunk(self.__fileobj, self[u'FORM'])
self[u'FORM']._update_size(self[u'FORM'].data_size + chunk.size)
def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the IFF file"""

self.__chunks[id_] = chunk
self.__next_offset = chunk.offset + chunk.size
assert_valid_chunk_id(id_)
return self.root.insert_chunk(id_, data)


class AIFFInfo(StreamInfo):
Expand Down
44 changes: 21 additions & 23 deletions tests/test_aiff.py
Expand Up @@ -233,8 +233,8 @@ def test_FORM_chunk_resize(self):
self.iff_1_tmp[u'FORM'].resize(17000)
self.failUnlessEqual(
IFFFile(self.file_1_tmp)[u'FORM'].data_size, 17000)
self.iff_2_tmp[u'FORM'].resize(0)
self.failUnlessEqual(IFFFile(self.file_2_tmp)[u'FORM'].data_size, 0)
self.iff_2_tmp[u'FORM'].resize(4)
self.failUnlessEqual(IFFFile(self.file_2_tmp)[u'FORM'].data_size, 4)

def test_child_chunk_resize(self):
self.iff_1_tmp[u'ID3'].resize(128)
Expand Down Expand Up @@ -277,27 +277,25 @@ def test_insert_padded_chunks(self):
self.failUnlessEqual(0, self.iff_2_tmp[u'TST2'].padding())

def test_delete_padded_chunks(self):
self.iff_2_tmp.insert_chunk(u'TST1')
iff_file = self.iff_2_tmp
iff_file.insert_chunk(u'TST1')
# Resize to odd length, should insert 1 padding byte
self.iff_2_tmp[u'TST1'].resize(3)
iff_file[u'TST1'].resize(3)
# Insert another chunk after the first one
new_iff = IFFFile(self.file_2_tmp)
new_iff.insert_chunk(u'TST2')
new_iff[u'TST2'].resize(2)
new_iff = IFFFile(self.file_2_tmp)
self.failUnlessEqual(new_iff[u'FORM'].size, 16076)
self.failUnlessEqual(new_iff[u'FORM'].data_size, 16068)
self.failUnlessEqual(new_iff[u'TST1'].size, 12)
self.failUnlessEqual(new_iff[u'TST1'].data_size, 3)
self.failUnlessEqual(new_iff[u'TST1'].data_offset, 16062)
self.failUnlessEqual(new_iff[u'TST2'].size, 10)
self.failUnlessEqual(new_iff[u'TST2'].data_size, 2)
self.failUnlessEqual(new_iff[u'TST2'].data_offset, 16074)
iff_file.insert_chunk(u'TST2')
iff_file[u'TST2'].resize(2)
self.failUnlessEqual(iff_file[u'FORM'].size, 16076)
self.failUnlessEqual(iff_file[u'FORM'].data_size, 16068)
self.failUnlessEqual(iff_file[u'TST1'].size, 12)
self.failUnlessEqual(iff_file[u'TST1'].data_size, 3)
self.failUnlessEqual(iff_file[u'TST1'].data_offset, 16062)
self.failUnlessEqual(iff_file[u'TST2'].size, 10)
self.failUnlessEqual(iff_file[u'TST2'].data_size, 2)
self.failUnlessEqual(iff_file[u'TST2'].data_offset, 16074)
# Delete the odd chunk
del new_iff[u'TST1']
new_iff = IFFFile(self.file_2_tmp)
self.failUnlessEqual(new_iff[u'FORM'].size, 16064)
self.failUnlessEqual(new_iff[u'FORM'].data_size, 16056)
self.failUnlessEqual(new_iff[u'TST2'].size, 10)
self.failUnlessEqual(new_iff[u'TST2'].data_size, 2)
self.failUnlessEqual(new_iff[u'TST2'].data_offset, 16062)
iff_file.delete_chunk(u'TST1')
self.failUnlessEqual(iff_file[u'FORM'].size, 16064)
self.failUnlessEqual(iff_file[u'FORM'].data_size, 16056)
self.failUnlessEqual(iff_file[u'TST2'].size, 10)
self.failUnlessEqual(iff_file[u'TST2'].data_size, 2)
self.failUnlessEqual(iff_file[u'TST2'].data_offset, 16062)

0 comments on commit cd90a5f

Please sign in to comment.