diff --git a/music_metadata/edi/fields.py b/music_metadata/edi/fields.py index 35a8bd6..0ade863 100644 --- a/music_metadata/edi/fields.py +++ b/music_metadata/edi/fields.py @@ -24,6 +24,7 @@ def __init__(self, size, mandatory=False, *args, **kwargs): def __set_name__(self, owner, name): self._name = name + self._record = owner def __get__(self, instance, owner=None): if instance: @@ -49,8 +50,10 @@ def verbose(self, value): """Return verbose (human-readable) value""" return value - def to_html(self, value, label='', error=None): + def to_html(self, value, label=None, error=None): """Create HTML representation for EDI, used in syntax highlighting""" + if label is None: + label = self._name if self: edi_value = self.to_edi(value) verbose_value = self.verbose(value) @@ -73,6 +76,8 @@ def to_html(self, value, label='', error=None): def to_dict(self, record, label=None, verbosity=1): """Create the dictionary with the value and additional data.""" + if label is None: + label = self._name value = getattr(record, label or self._name) valid = record.valid or label not in record.errors if value is None and valid and verbosity <= 1: @@ -105,8 +110,10 @@ def __set__(self, instance, value): try: value = int(value) except ValueError: + super().__set__(instance, value) raise RecordError(f'Value "{value}" is not numeric') if not 0 <= value < 10 ** self._size: + super().__set__(instance, value) raise FieldError( f'Not between 0 "{value}" and "{10 ** self._size - 1}"') super().__set__(instance, value) @@ -126,8 +133,9 @@ def __init__(self, size, constant=None, *args, **kwargs): if len(constant) == size: self._constant = constant else: + length = len(constant) raise AttributeError( - f'Value "{value}" is not {size} characters long.') + f'Value "{ length }" is not { size } characters long.') else: self._constant = ' ' * size super().__init__(size, *args, **kwargs) @@ -155,8 +163,8 @@ def __set__(self, instance, value): value = value.strip() if value and value not in self._choices.keys(): super().__set__(instance, value) - raise FieldError('Value must be one of ' - f'''\"{'", "'.join(self._edi_keys)}\"''') + keys = ', '.join(self._edi_keys) + raise FieldError(f'Value must be one of "{ keys }"') super().__set__(instance, value) def verbose(self, value): @@ -188,13 +196,14 @@ def __set__(self, instance, value): instance.__dict__[self._name] = None return value = dict( - (('Y', True), ('N', False), ('U', None), (' ', None))).get(value, - value) + (('Y', True), ('N', False), ('U', None), (' ', None)) + ).get(value, value) super().__set__(instance, value) def to_edi(self, value): - value = dict(((True, 'Y'), (False, 'N'), - (None, 'U' if self._mandatory else ' '))).get(value, ' ') + value = dict(( + (True, 'Y'), (False, 'N'), (None, 'U' if self._mandatory else ' ') + )).get(value, ' ') return value @@ -210,10 +219,13 @@ def __init__(self, *args, **kwargs): self._edi_keys = ('Y', 'N') def __set__(self, instance, value): - value = dict((('Y', True), ('N', False), (' ', None))).get(value, - value) + value = dict(( + ('Y', True), ('N', False), (' ', None) + )).get(value, value) super().__set__(instance, value) def to_edi(self, value): - value = dict(((True, 'Y'), (False, 'N'), (None, ' '))).get(value, ' ') + value = dict(( + (True, 'Y'), (False, 'N'), (None, ' ') + )).get(value, ' ') return value diff --git a/music_metadata/edi/file.py b/music_metadata/edi/file.py index 44055b1..0fb3482 100644 --- a/music_metadata/edi/file.py +++ b/music_metadata/edi/file.py @@ -55,6 +55,7 @@ def get_header(self): f'{self.sequence}') self.errors.append(e) self._header.error('group_code', e) + self.get_file().valid = False self.valid &= self._header.valid return self._header return None @@ -77,6 +78,7 @@ def get_trailer(self): f'{self.sequence}') self.errors.append(e) self._trailer.error('group_code', e) + self.get_file().valid = False return self._trailer return None @@ -88,13 +90,13 @@ def get_transaction_class(self): def get_transactions(self): sequence = 0 - pattern = re.compile(r'(^{0}.*?(?=^GRT|^{0}))'.format(self.type), - re.S | re.M) + pattern = re.compile( + r'(^{0}.*?(?=^GRT|^{0}))'.format(self.type), re.S | re.M) try: for transaction_lines in re.findall(pattern, self.lines): transaction_class = self.get_transaction_class() - transaction = transaction_class(self.type, transaction_lines, - sequence) + transaction = transaction_class( + self.type, transaction_lines, sequence) for error in transaction.errors: if isinstance(error, FileError): self.valid = False @@ -111,17 +113,19 @@ def get_transactions(self): if self.transaction_count != trailer.transaction_count: self.valid = False self.get_file().valid = False - e = FileError(f'Wrong transaction count in GRT: ' - f'{trailer.transaction_count}, counted ' - f'{self.transaction_count}') + e = FileError( + f'Wrong transaction count in GRT: ' + f'{trailer.transaction_count}, counted ' + f'{self.transaction_count}') self.errors.append(e) trailer.error('transaction_count', e) if self.record_count != trailer.record_count: self.valid = False self.get_file().valid = False - e = FileError(f'Wrong record count in GRT: ' - f'{trailer.record_count}, counted ' - f'{self.record_count}') + e = FileError( + f'Wrong record count in GRT: ' + f'{trailer.record_count}, counted ' + f'{self.record_count}') self.errors.append(e) trailer.error('record_count', e) except FileError as e: @@ -221,27 +225,30 @@ def get_groups(self): if expected_sequence != trailer.group_count: self.valid = False - e = FileError('Wrong group count in TRL: ' - f'{trailer.group_count}, ' - f'counted {expected_sequence}') + e = FileError( + 'Wrong group count in TRL: ' + f'{trailer.group_count}, ' + f'counted {expected_sequence}') self.file_errors.append(e) trailer.error('group_count', e) self.transaction_count = transaction_count if transaction_count != trailer.transaction_count: self.valid = False - e = FileError('Wrong transaction count in TRL: ' - f'{trailer.transaction_count}, ' - f'GRTs say {transaction_count}') + e = FileError( + 'Wrong transaction count in TRL: ' + f'{trailer.transaction_count}, ' + f'GRTs say {transaction_count}') self.file_errors.append(e) trailer.error('transaction_count', e) self.record_count = record_count if record_count != trailer.record_count: self.valid = False - e = FileError('Wrong record count in TRL: ' - f'{trailer.record_count}, ' - f'GRTs say {record_count}') + e = FileError( + 'Wrong record count in TRL: ' + f'{trailer.record_count}, ' + f'GRTs say {record_count}') self.file_errors.append(e) trailer.error('record_count', e) @@ -250,6 +257,3 @@ def list_groups(self): def get_encoding_from_header(self): return 'latin1' - - def to_dict(self): - raise Exception('CWR not loaded!') diff --git a/music_metadata/edi/records.py b/music_metadata/edi/records.py index 911ad66..bd33c2d 100644 --- a/music_metadata/edi/records.py +++ b/music_metadata/edi/records.py @@ -44,6 +44,11 @@ def __init__(self, line=None, sequence=None): else: raise FileError(f'Record too short: {line}') + def __setattr__(self, key, value): + super().__setattr__(key, value) + if key == 'record_type' and self.type is None: + self.type = key + def to_edi(self): output = '' for label, field in self._fields.items(): @@ -60,7 +65,8 @@ def __str__(self): def warning(self, field, error): """Add an error, do not invalidate.""" if field and field not in self.labels: - raise AttributeError(f'No such field {field} in {self.labels}') + labels = ', '.join(self.labels) + raise AttributeError(f'No such field { field } in { labels }') self.errors[field] = error def error(self, field, error): @@ -86,12 +92,12 @@ def split_into_fields(self): pos += field._size value = self.line[start:end] if start < actual_length < end: - self.warning(label, FieldError('Field truncated')) + self.warning(label, FieldWarning('Field truncated')) elif end > actual_length: if field._mandatory: self.error(label, RecordError('Mandatory field missing')) else: - self.warning(label, FieldError( + self.warning(label, FieldWarning( 'Field missing at the end of the line.')) try: setattr(self, label, value) @@ -117,19 +123,18 @@ def labels(self): return self.get_fields().keys() def to_html(self): - classes = f'record {self.type.lower()}' + classes = f'record { self.type.lower() }' if not self.valid: classes += ' invalid' - output = f'' + output = f'' for field in self.fields: value = getattr(self, field._name) - s = field.to_html(value, f'{field._name}', - self.errors.get(field._name)) + s = field.to_html( + value, f'{ field._name }', self.errors.get(field._name)) output += s else: output += EdiField.to_html( - None, self.rest, label='', - error=self.errors.get(None)) + None, self.rest, label='', error=self.errors.get(None)) output += '' return output @@ -147,15 +152,17 @@ def validate_sequences(self, transaction_sequence, record_sequence): """This is really a file-level validation.""" if self.transaction_sequence_number != transaction_sequence: - e = FileError(f'Wrong transaction sequence ' - f'{self.transaction_sequence_number}, should be ' - f'{transaction_sequence}') + e = FileError( + f'Wrong transaction sequence ' + f'{ self.transaction_sequence_number }, should be ' + f'{ transaction_sequence }') self.error('transaction_sequence_number', e) if self.record_sequence_number != record_sequence: - e = FileError(f'Wrong transaction sequence ' - f'{self.record_sequence_number}, should be ' - f'{record_sequence}') + e = FileError( + f'Wrong transaction sequence ' + f'{ self.record_sequence_number }, should be ' + f'{ record_sequence }') self.error('record_sequence_number', e) def validate(self): @@ -168,9 +175,9 @@ def to_dict(self, verbosity=1): # first three fields can be skipped if label in [ - 'record_type', - 'transaction_sequence_number', - 'record_sequence_number' + 'record_type', + 'transaction_sequence_number', + 'record_sequence_number' ] and label not in self.errors.keys(): continue diff --git a/music_metadata/edi/tests/__init__.py b/music_metadata/edi/tests/__init__.py index 506779e..c322104 100644 --- a/music_metadata/edi/tests/__init__.py +++ b/music_metadata/edi/tests/__init__.py @@ -2,6 +2,7 @@ import unittest from music_metadata.edi.file import EdiFile, EdiGroup +from music_metadata.edi.fields import * from music_metadata.edi.records import * from music_metadata.edi.transactions import EdiTransaction @@ -12,6 +13,134 @@ class TestEdi(unittest.TestCase): + def test_edifield(self): + self.assertIsInstance(EdiRecord.record_type, EdiField) + record = EdiRecord() + field = list(record.fields)[0] + self.assertEqual(record.to_edi(), ' ') + self.assertIsNone(field.to_dict(record, verbosity=0)) + record.record_type = 'HDR' + self.assertEqual(record.to_edi(), 'HDR') + self.assertEqual( + record.to_html(), + 'HDR') + self.assertIn('error', record.to_dict()) + self.assertIsInstance(field.to_dict(record), dict) + record.error('record_type', FieldError('testing')) + self.assertEqual( + record.to_html(), + 'HDR') + self.assertIn('error', field.to_dict(record, verbosity=0)) + self.assertEqual( + field.to_html('ABC'), + 'ABC') + + def test_edinumericfield(self): + record = EdiTransactionRecord() + record.transaction_sequence_number = '12345678' + with self.assertRaises(RecordError): + record.transaction_sequence_number = 'AAA' + with self.assertRaises(FieldError): + record.transaction_sequence_number = '123456789' + + def test_other_fields(self): + + with self.assertRaises(AttributeError): + class Record(EdiTransactionRecord): + bad_constant = EdiConstantField(size=5, constant='123') + + class Record(EdiTransactionRecord): + filler = EdiConstantField(size=5) + constant = EdiConstantField(size=5, constant='12345') + bool = EdiBooleanField(mandatory=True) + flag = EdiFlagField(mandatory=True) + choice = EdiListField(size=2, choices=( + ('A', 'Aa'), ('B', 'Bb'), ('AB', 'AaBb'))) + + record = Record() + record.record_type='ABC' + record.bool = 'Y' + record.choice = 'A ' + record.flag = 'U' + + self.assertEqual(record.to_edi(), 'ABC0000000000000000 YUA ') + self.assertEqual( + record.to_html(), + 'ABC0000000000000000 ' + ' YUA ') + + to_dict = record.to_dict(verbosity=1) + should_return = OrderedDict([ + ('bool', OrderedDict([ + ('valid', True), ('value', True), ('verbose_value', 'Yes')])), + ('choice', OrderedDict([ + ('valid', True), ('value', 'A'), ('verbose_value', 'Aa')]))]) + self.assertEqual(to_dict, should_return) + + record.get_fields()['flag']._mandatory = False + record.flag = 'N' + + to_dict = record.to_dict(verbosity=0) + should_return = OrderedDict([ + ('bool', True), ('flag', False), ('choice', 'A')]) + self.assertEqual(to_dict, should_return) + + # Test the exceptions, they must still change the values + with self.assertRaises(FieldWarning): + record.constant = '01234' + self.assertEqual(record.constant, '01234') + with self.assertRaises(FieldError): + record.choice = 'C' + self.assertEqual(record.constant, '01234') + + def test_edirecord(self): + with self.assertRaises(FileError): + record = EdiRecord('AA') + + class Record(EdiRecord): + fil = EdiConstantField(size=1) + num = EdiNumericField(size=4, mandatory=True) + txt = EdiField(size=4, mandatory=False) + + record = Record('HDR 001') + self.assertTrue(record.valid) + self.assertIn('num', record.errors) + self.assertIn('txt', record.errors) + self.assertEqual(str(record), 'HDR 0001 ') + + # FieldWarning + record = Record('HDRX001 ') + self.assertTrue(record.valid) + self.assertIn('fil', record.errors) + self.assertEqual(str(record), 'HDRX0001 ') + + record = Record('HDR ') + self.assertFalse(record.valid) + self.assertIn('num', record.errors) + self.assertEqual(str(record), 'HDR 0000 ') + + with self.assertRaises(AttributeError): + record.warning('rest', FieldWarning('test')) + def transaction_0(self, transaction): self.assertEqual(str(transaction), 'NWR00000000') self.assertFalse(transaction.valid) @@ -107,17 +236,33 @@ def test_cwr30_processing(self): self.assertEqual(str(e), CWR3_PATH) self.assertEqual(e.get_header().record_type, 'HDR') for group in e.get_groups(): + self.assertEqual(group.get_header().transaction_type, 'ISR') + self.assertEqual(group.get_file(), e) for transaction in group.get_transactions(): for record in transaction.records: record.to_html() + self.assertTrue(group.valid) + self.assertTrue(e.valid) + del group._header + group.sequence += 2 + header = group.get_header() + self.assertFalse(header.valid) + self.assertFalse(group.valid) + self.assertFalse(e.valid) + del group._trailer + trailer = group.get_trailer() + self.assertFalse(trailer.valid) def test_new_file(self): """ Test EDI generation. """ EdiFile() - EdiGroup(gtype='WRK') + group = EdiGroup(gtype='WRK') + self.assertIsNone(group.get_file()) t = EdiTransaction(gtype='WRK') + self.assertEqual( + t.to_dict(), {'error': 'Not implemented for this file type.'}) r = EdiTransactionRecord() r.record_type = t.type with self.assertRaises(ValueError): diff --git a/music_metadata/edi/transactions.py b/music_metadata/edi/transactions.py index 4215620..4589630 100644 --- a/music_metadata/edi/transactions.py +++ b/music_metadata/edi/transactions.py @@ -24,7 +24,7 @@ def __init__(self, gtype, lines=None, sequence=None, *args, **kwargs): self.lines = lines self.records = list(self.split_into_records()) else: - self.lines = [] + self.lines = '' self.records = [] def __str__(self): @@ -38,8 +38,8 @@ def split_into_records(self): for expected_r_sequence, line in enumerate( self.lines.strip('\n').split('\n')): try: - record = self.get_record_class(line[0:3])( - line, expected_r_sequence) + Record = self.get_record_class(line[0:3]) + record = Record(line, expected_r_sequence) except (RecordError, FileError) as e: record = EdiRecord(line, expected_r_sequence) record.error(None, e) @@ -56,4 +56,4 @@ def split_into_records(self): yield record def to_dict(self, verbosity=1): - return {'error': 'Not implemented for this file type.'} \ No newline at end of file + return {'error': 'Not implemented for this file type.'}