Skip to content

Commit

Permalink
Add 'flatten' parameter to as_dict()
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiascode committed May 29, 2024
1 parent c52b01b commit a5b92da
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 75 deletions.
2 changes: 1 addition & 1 deletion tinytag/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _pop_switch(name: str) -> bool:


def _print_tag(tag: TinyTag, formatting: str, header_printed: bool = False) -> bool:
data = tag._as_dict()
data = tag.as_dict(flatten=True)
del data['images']
if formatting == 'json':
print(json.dumps(data))
Expand Down
56 changes: 34 additions & 22 deletions tinytag/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,12 @@
testfolder = os.path.join(os.path.dirname(__file__))


def compare_tag(results: dict[str, dict[str, Any]], expected: dict[str, dict[str, Any]],
def compare_tag(results: dict[str, Any],
expected: dict[str, Any],
file: str, prev_path: str | None = None) -> None:
def compare_values(path: str, result_val: int | float | str | dict[str, Any],
expected_val: int | float | str | dict[str, Any]) -> bool:
def compare_values(path: str,
result_val: str | int | float,
expected_val: str | int | float) -> bool:
# lets not copy *all* the lyrics inside the fixture
if (path == 'extra.lyrics'
and isinstance(expected_val, list) and isinstance(result_val, list)):
Expand All @@ -572,7 +574,7 @@ def compare_values(path: str, result_val: int | float | str | dict[str, Any],
return result_val == pytest.approx(expected_val)
return result_val == expected_val

def error_fmt(value: int | float | str | dict[str, Any]) -> str:
def error_fmt(value: str | int | float) -> str:
return f'{repr(value)} ({type(value)})'

assert isinstance(results, dict)
Expand All @@ -595,10 +597,9 @@ def error_fmt(value: int | float | str | dict[str, Any]) -> str:
def test_file_reading_tags_duration(testfile: str, expected: dict[str, dict[str, Any]]) -> None:
filename = os.path.join(testfolder, testfile)
tag = TinyTag.get(filename, tags=True, duration=True)
results = {
key: val for key, val in tag._as_dict().items()
if val is not None and key not in ('filename', 'images')
}
results = tag.as_dict(flatten=False)
for attr_name in ('filename', 'images'):
del results[attr_name]
compare_tag(results, expected, filename)
assert tag.images.any is None

Expand All @@ -608,10 +609,9 @@ def test_file_reading_tags(testfile: str, expected: dict[str, dict[str, Any]]) -
filename = os.path.join(testfolder, testfile)
excluded_attrs = {"bitdepth", "bitrate", "channels", "duration", "samplerate"}
tag = TinyTag.get(filename, tags=True, duration=False)
results = {
key: val for key, val in tag._as_dict().items()
if val is not None and key not in ('filename', 'images')
}
results = tag.as_dict(flatten=False)
for attr_name in ('filename', 'images'):
del results[attr_name]
expected = {
key: val for key, val in expected.items() if key not in excluded_attrs
}
Expand All @@ -624,14 +624,12 @@ def test_file_reading_duration(testfile: str, expected: dict[str, dict[str, Any]
filename = os.path.join(testfolder, testfile)
allowed_attrs = {"bitdepth", "bitrate", "channels", "duration", "filesize", "samplerate"}
tag = TinyTag.get(filename, tags=False, duration=True)
results = {
key: val for key, val in tag._as_dict().items()
if val is not None and key not in ('filename', 'images')
}
results = tag.as_dict(flatten=False)
for attr_name in ('filename', 'extra', 'images'):
del results[attr_name]
expected = {
key: val for key, val in expected.items() if key in allowed_attrs
}
expected["extra"] = {}
compare_tag(results, expected, filename)
assert tag.images.any is None

Expand Down Expand Up @@ -816,13 +814,27 @@ def test_to_str() -> None:
tag = TinyTag.get(os.path.join(testfolder, 'samples/id3v22-test.mp3'))
assert (
"'filesize': 5120, 'duration': 0.13836297152858082, 'channels': 2, 'bitrate': 160.0, "
"'bitdepth': None, 'samplerate': 44100, 'artist': 'Anais Mitchell', 'albumartist': None, "
"'composer': None, 'album': 'Hymns for the Exiled', 'disc': None, 'disc_total': None, "
"'title': 'cosmic american', 'track': 3, 'track_total': 11, 'genre': None, "
"'samplerate': 44100, 'artist': 'Anais Mitchell', "
"'album': 'Hymns for the Exiled', "
"'title': 'cosmic american', 'track': 3, 'track_total': 11, "
"'year': '2004', 'comment': 'Waterbug Records, www.anaismitchell.com', "
"'extra': {'encoded_by': ['iTunes v4.6'], 'itunnorm': [' 0000044E 00000061 00009B67 "
"000044C3 00022478 00022182 00007FCC 00007E5C 0002245E 0002214E'], 'itunes_cddb_1': "
"['9D09130B+174405+11+150+14097+27391+43983+65786+84877+99399+113226+132452+146426+"
"163829'], 'itunes_cddb_tracknumber': ['3']}, 'images': {'front_cover': [], "
"'back_cover': [], 'leaflet': [], 'media': [], 'other': [], 'extra': {}}"
"163829'], 'itunes_cddb_tracknumber': ['3']}, 'images': {'extra': {}}"
) in str(tag)


def test_to_str_flatten() -> None:
tag = TinyTag.get(os.path.join(testfolder, 'samples/id3v22-test.mp3'))
assert (
"'filesize': 5120, 'duration': 0.13836297152858082, 'channels': 2, 'bitrate': 160.0, "
"'samplerate': 44100, 'artist': ['Anais Mitchell'], "
"'album': ['Hymns for the Exiled'], "
"'title': ['cosmic american'], 'track': 3, 'track_total': 11, "
"'year': ['2004'], 'comment': ['Waterbug Records, www.anaismitchell.com'], "
"'encoded_by': ['iTunes v4.6'], 'itunnorm': [' 0000044E 00000061 00009B67 "
"000044C3 00022478 00022182 00007FCC 00007E5C 0002245E 0002214E'], 'itunes_cddb_1': "
"['9D09130B+174405+11+150+14097+27391+43983+65786+84877+99399+113226+132452+146426+"
"163829'], 'itunes_cddb_tracknumber': ['3'], 'images': {}"
) in str(tag.as_dict(flatten=True))
10 changes: 5 additions & 5 deletions tinytag/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,36 @@ def test_meta_data_output_default_json() -> None:
output = run_cli(mp3_with_image)
data = json.loads(output)
assert data
assert set(data.keys()) == tinytag_attributes
assert set(data.keys()).issubset(tinytag_attributes)


def test_meta_data_output_format_json() -> None:
output = run_cli('-f json ' + mp3_with_image)
data = json.loads(output)
assert data
assert set(data.keys()) == tinytag_attributes
assert set(data.keys()).issubset(tinytag_attributes)


def test_meta_data_output_format_csv() -> None:
output = run_cli('-f csv ' + mp3_with_image)
lines = [line for line in output.split(os.linesep) if line]
assert all(',' in line for line in lines)
attributes = set(line.split(',')[0] for line in lines)
assert set(attributes) == tinytag_attributes
assert set(attributes).issubset(tinytag_attributes)


def test_meta_data_output_format_tsv() -> None:
output = run_cli('-f tsv ' + mp3_with_image)
lines = [line for line in output.split(os.linesep) if line]
assert all('\t' in line for line in lines)
attributes = set(line.split('\t')[0] for line in lines)
assert set(attributes) == tinytag_attributes
assert set(attributes).issubset(tinytag_attributes)


def test_meta_data_output_format_tabularcsv() -> None:
output = run_cli('-f tabularcsv ' + mp3_with_image)
header, _line, _rest = output.split(os.linesep)
assert set(header.split(',')) == tinytag_attributes
assert set(header.split(',')).issubset(tinytag_attributes)


def test_fail_on_unsupported_file() -> None:
Expand Down
138 changes: 91 additions & 47 deletions tinytag/tinytag.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def __init__(self) -> None:
self._load_image = False
self._tags_parsed = False

def __repr__(self) -> str:
return str(self.as_dict(flatten=False))

@classmethod
def get(cls,
filename: bytes | str | PathLike[Any] | None = None,
Expand Down Expand Up @@ -151,11 +154,34 @@ def is_supported(cls, filename: bytes | str | PathLike[Any]) -> bool:
"""Check if a specific file is supported based on its file extension."""
return cls._get_parser_for_filename(filename) is not None

def __repr__(self) -> str:
return str(self._as_dict())

def _as_dict(self) -> dict[str, Any]:
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
def as_dict(self, flatten: bool = True) -> dict[
str,
str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
]:
"""Return a dictionary representation of the tag."""
fields: dict[
str,
str | int | float | list[str | TagImage] | dict[str, list[str | TagImage]]
] = {}
for key, value in self.__dict__.items():
if key.startswith('_'):
continue
if flatten and key == 'extra':
for extra_key, extra_values in value.items():
if extra_key in fields:
fields[extra_key] += extra_values
else:
fields[extra_key] = extra_values
continue
if key == 'images':
value = value.as_dict(flatten)
if value is None:
continue
if flatten and key != 'filename' and isinstance(value, str):
fields[key] = [value]
else:
fields[key] = value
return fields

@classmethod
def _get_parser_for_filename(
Expand Down Expand Up @@ -266,19 +292,6 @@ def _set_field(self, fieldname: str, value: str | int | float) -> None:
print(f'Setting field "{fieldname}" to "{new_value!r}"')
self.__dict__[fieldname] = new_value

def _set_image_field(self, fieldname: str, value: TagImage) -> None:
write_dest = self.images.__dict__
if fieldname.startswith(self._EXTRA_PREFIX):
fieldname = fieldname[len(self._EXTRA_PREFIX):]
write_dest = self.images.extra
old_values = write_dest.get(fieldname)
values = [value]
if old_values is not None:
values = old_values + values
if DEBUG:
print(f'Setting image field "{fieldname}"')
write_dest[fieldname] = values

def _determine_duration(self, fh: BinaryIO) -> None:
raise NotImplementedError

Expand All @@ -287,20 +300,18 @@ def _parse_tag(self, fh: BinaryIO) -> None:

def _update(self, other: TinyTag) -> None:
# update the values of this tag with the values from another tag
excluded_attrs = {'extra', 'images'}
for standard_key, standard_value in other._as_dict().items():
if (standard_key not in excluded_attrs
and standard_value is not None):
self._set_field(standard_key, standard_value)
for extra_key, extra_values in other.extra.items():
for extra_value in extra_values:
self._set_field(self._EXTRA_PREFIX + extra_key, extra_value)
for image_key, images in other.images._as_dict().items():
for image in images:
self._set_image_field(image_key, image)
for image_extra_key, images_extra in other.images.extra.items():
for image_extra in images_extra:
self._set_image_field(self._EXTRA_PREFIX + image_extra_key, image_extra)
for key, value in other.as_dict(flatten=False).items():
if isinstance(value, dict):
if key != 'extra':
continue
for extra_key, extra_values in value.items():
for extra_value in extra_values:
if isinstance(extra_value, str):
self._set_field(self._EXTRA_PREFIX + extra_key, extra_value)
continue
if value is not None and not isinstance(value, list):
self._set_field(key, value)
self.images._update(other.images)

@staticmethod
def _bytes_to_int_le(b: bytes) -> int:
Expand Down Expand Up @@ -333,6 +344,8 @@ def audio_offset(self) -> None:

class TagImages:
"""A class containing images embedded in an audio file."""
_EXTRA_PREFIX = 'extra.'

def __init__(self) -> None:
self.front_cover: list[TagImage] = []
self.back_cover: list[TagImage] = []
Expand All @@ -341,27 +354,58 @@ def __init__(self) -> None:
self.other: list[TagImage] = []
self.extra: dict[str, list[TagImage]] = {}

def __repr__(self) -> str:
return str(self.as_dict(flatten=False))

@property
def any(self) -> TagImage | None:
"""Return a cover image.
If not present, fall back to any other available image.
"""
for image_list in self._as_dict().values():
for image_list in self.as_dict(flatten=True).values():
for image in image_list:
return image
for extra_image_list in self.extra.values():
for extra_image in extra_image_list:
return extra_image
return None

def __repr__(self) -> str:
return str(vars(self))
def as_dict(self, flatten: bool = True) -> dict[str, list[TagImage]]:
"""Return a dictionary representation of the tag images."""
images: dict[str, list[TagImage]] = {}
for key, value in self.__dict__.items():
if key.startswith('_'):
continue
if flatten and key == 'extra':
for extra_key, extra_values in value.items():
if extra_key in images:
images[extra_key] += extra_values
else:
images[extra_key] = extra_values
continue
if value or key == 'extra':
images[key] = value
return images

def _as_dict(self) -> dict[str, list[TagImage]]:
return {
k: v for k, v in self.__dict__.items()
if not k.startswith('_') and k != 'extra'
}
def _set_field(self, fieldname: str, value: TagImage) -> None:
write_dest = self.__dict__
if fieldname.startswith(self._EXTRA_PREFIX):
fieldname = fieldname[len(self._EXTRA_PREFIX):]
write_dest = self.extra
old_values = write_dest.get(fieldname)
values = [value]
if old_values is not None:
values = old_values + values
if DEBUG:
print(f'Setting image field "{fieldname}"')
write_dest[fieldname] = values

def _update(self, other: TagImages) -> None:
for key, value in other.as_dict(flatten=False).items():
if isinstance(value, dict):
for extra_key, extra_values in value.items():
for image_extra in extra_values:
self._set_field(self._EXTRA_PREFIX + extra_key, image_extra)
continue
for image in value:
self._set_field(key, image)


class TagImage:
Expand Down Expand Up @@ -655,7 +699,7 @@ def _traverse_atoms(self, fh: BinaryIO, path: dict[bytes, Any],
print(' ' * 4 * len(curr_path), 'FIELD: ', fieldname)
if fieldname.startswith('images.'):
if self._load_image:
self._set_image_field(fieldname[len('images.'):], value)
self.images._set_field(fieldname[len('images.'):], value)
elif fieldname:
self._set_field(fieldname, value)
# if no action was specified using dict or callable, jump over atom
Expand Down Expand Up @@ -1117,7 +1161,7 @@ def _parse_frame(self, fh: BinaryIO, id3version: int | None = None) -> int:
description = self._decode_string(content[desc_start_pos:desc_end_pos])
field_name, image = self._create_tag_image(
content[desc_end_pos:], pic_type, mime_type, description)
self._set_image_field(field_name, image)
self.images._set_field(field_name, image)
elif frame_id not in self._DISALLOWED_FRAME_IDS:
# unknown, try to add to extra dict
if self._parse_tags:
Expand Down Expand Up @@ -1328,7 +1372,7 @@ def _parse_vorbis_comment(self, fh: BinaryIO, contains_vendor: bool = True) -> N
if DEBUG:
print('Found Vorbis TagImage', key, value[:64])
fieldname, fieldvalue = _Flac._parse_image(io.BytesIO(base64.b64decode(value)))
self._set_image_field(fieldname, fieldvalue)
self.images._set_field(fieldname, fieldvalue)
else:
if DEBUG:
print('Found Vorbis Comment', key, value[:64])
Expand Down Expand Up @@ -1537,7 +1581,7 @@ def _parse_tag(self, fh: BinaryIO) -> None:
self._update(oggtag)
elif block_type == self.METADATA_PICTURE and self._load_image:
fieldname, value = self._parse_image(fh)
self._set_image_field(fieldname, value)
self.images._set_field(fieldname, value)
elif block_type >= 127:
break # invalid block type
else:
Expand Down

0 comments on commit a5b92da

Please sign in to comment.