Skip to content

Commit

Permalink
code cleanup - group header and trailer and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matijakolaric committed May 11, 2020
1 parent 527e800 commit cab6632
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 72 deletions.
145 changes: 86 additions & 59 deletions music_metadata/edi/file.py
Expand Up @@ -11,80 +11,105 @@

from .records import *
from .transactions import EdiTransaction
import warnings

RE_GROUPS = re.compile(
r'(?P<lines>^GRH(?P<gtype>.{3})(?P<sequence>\d{5}).*?^GRT(\3).*?$)',
r'(?P<header>^GRH(?P<gtype>.{3})(?P<sequence>\d{5}).*?$)'
r'(?P<lines>.*?)(?P<trailer>^^GRT(\3).*?$)',
re.M | re.S)
RE_TRL = re.compile(r'^TRL.*?$', re.M | re.S)
RE_GRT = re.compile(r'^GRT.*?$', re.M | re.S)


class EdiGroup(object):
"""Parent class for all EDI Group types.
It is NOT an abstract class, separation down to transaction level is
independant of the type of EDI.
"""

header_class = EdiGRH
trailer_class = EdiGRT
transaction_classes = [EdiTransaction]

def __init__(self, gtype, lines=None, sequence=None, *args, **kwargs):
def __init__(self, gtype, header_line=None, trailer_line=None,
transaction_lines=None,
sequence=None, *args, **kwargs):
self.type = gtype
self.errors = []
self.valid = True
self.sequence = sequence
self.valid = True
self.errors = []
self.transaction_count = 0
self.record_count = 2
if lines:
self.lines = lines
else:
self.lines = ''
self.get_header()
self.get_trailer()
self.header_line = header_line
self.trailer_line = trailer_line
self.transaction_lines = transaction_lines
self._header = None
self._trailer = None
self._file = None

def __str__(self):
return self.type

def file(self, f=None):
if f:
self._file = ref(f)()
return self._file

def get_file(self):
return None
warnings.warn('Use EdiGroup.file() instead', DeprecationWarning)
return self.file()

def get_header(self):
if hasattr(self, '_header'):
return self._header
if self.lines:
self._header = self.header_class(self.lines.split('\n')[0])
if self._header.group_code != self.sequence:
e = FileError(
f'Wrong group ID: {self._header.group_code} instead of '
f'{self.sequence}')
self.errors.append(e)
self._header.error('group_code', e)
file = self.get_file()
if file:
file.valid = False
self.valid &= self._header.valid
def header(self, header_line=None, header=None):
if header:
self.header_line = ''
self._header = header
self.header_line = header.to_edi()
elif header_line:
self.header_line = header_line
self._header = self.header_class(self.header_line)
elif self._header:
return self._header
return None
elif self.header_line:
self._header = self.header_class(self.header_line)
else:
return None
self.valid &= self._header.valid
return self._header

def get_trailer(self):
if hasattr(self, '_trailer'):
return self._trailer
if self.lines:
grts = list(re.findall(RE_GRT, self.lines))
if len(grts) == 0:
raise FileError('Group trailer record (GRT) not found')
if len(grts) > 1:
raise FileError('Multiple group trailer records (GRT) found')
grt = grts[0]
self._trailer = self.trailer_class(grt)
self.valid &= self._trailer.valid
if self.sequence != self._trailer.group_code:
e = FileError(
f'Wrong group ID: {self._trailer.group_code} instead of'
f'{self.sequence}')
self.errors.append(e)
self._trailer.error('group_code', e)
file = self.get_file()
if file:
file.valid = False
def get_header(self):
warnings.warn('Use EdiGroup.header() instead', DeprecationWarning)
return self.header()

def trailer(self, trailer_line=None, trailer=None):
if trailer:
self.trailer_line = ''
self._trailer = trailer
elif trailer_line:
self.trailer_line = trailer_line
self._trailer = self.trailer_class(self.trailer_line)
elif self._trailer:
return self._trailer
return None
elif self.trailer_line:
self._trailer = self.trailer_class(self.trailer_line)
else:
return None
self.valid &= self._trailer.valid
if self.sequence != self._trailer.group_code:
e = FileError(
f'Wrong group ID: {self._trailer.group_code} instead of'
f'{self.sequence}')
self.errors.append(e)
self._trailer.error('group_code', e)
file = self.file()
if file:
file.valid = False
return self._trailer

def get_trailer(self):
warnings.warn('Use EdiGroup.trailer() instead', DeprecationWarning)
return self.trailer()

def get_transaction_class(self):
for transaction_class in self.transaction_classes:
Expand All @@ -95,28 +120,28 @@ def get_transaction_class(self):
def get_transactions(self):
sequence = 0
pattern = re.compile(
r'(^{0}.*?(?=^GRT|^{0}))'.format(self.type), re.S | re.M)
r'(^{0}.*?)(?=^{0}|\Z)'.format(self.type), re.S | re.M)
try:
for transaction_lines in re.findall(pattern, self.lines):
for transaction_lines in re.findall(pattern, self.transaction_lines):
transaction_class = self.get_transaction_class()
transaction = transaction_class(
self.type, transaction_lines, sequence)
for error in transaction.errors:
if isinstance(error, FileError):
self.valid = False
self.errors.append(error)
self.get_file().valid = False
self.get_file().file_errors.append(error)
self.file().valid = False
self.file().file_errors.append(error)
break
yield transaction
sequence += 1
self.transaction_count += 1
self.record_count += len(transaction.records)
else:
trailer = self.get_trailer()
trailer = self.trailer()
if self.transaction_count != trailer.transaction_count:
self.valid = False
self.get_file().valid = False
self.file().valid = False
e = FileError(
f'Wrong transaction count in GRT: '
f'{trailer.transaction_count}, counted '
Expand All @@ -125,7 +150,7 @@ def get_transactions(self):
trailer.error('transaction_count', e)
if self.record_count != trailer.record_count:
self.valid = False
self.get_file().valid = False
self.file().valid = False
e = FileError(
f'Wrong record count in GRT: '
f'{trailer.record_count}, counted '
Expand Down Expand Up @@ -210,14 +235,16 @@ def get_groups(self):
for r in re.finditer(RE_GROUPS, self.read()):
expected_sequence += 1
d = r.groupdict()
group = self.group_class(d['gtype'], d['lines'], expected_sequence)
group = self.group_class(
d['gtype'], d['header'], d['trailer'], d['lines'],
expected_sequence)
self.group_count += 1
self.valid &= group.valid
trailer = group.get_trailer()
trailer = group.trailer()
record_count += trailer.record_count
transaction_count += trailer.transaction_count
self.valid &= group.valid
group.get_file = ref(self)
group.file(self)
yield group
else:
if expected_sequence == 0:
Expand Down
34 changes: 22 additions & 12 deletions music_metadata/edi/tests/__init__.py
Expand Up @@ -201,7 +201,10 @@ def test_cwr21_processing(self):
with open(CWR2_PATH, 'rb') as f:
e = EdiFile(f)
self.assertEqual(str(e), CWR2_PATH)
self.assertEqual(e.get_header().record_type, 'HDR')
header = e.get_header()
self.assertEqual(header.record_type, 'HDR')
self.assertEqual(header.get_transmission_dict(), {})
self.assertEqual(header.get_submitter_dict(), {})
for group in e.get_groups():
self.assertEqual(str(group), 'NWR')

Expand Down Expand Up @@ -236,30 +239,37 @@ def test_cwr30_processing(self):
self.assertEqual(str(e), CWR3_PATH)
self.assertEqual(e.get_header().record_type, 'HDR')
for group in e.get_groups():
self.assertEqual(group.get_header().transaction_type, 'ISR')
self.assertEqual(group.get_file(), e)
self.assertEqual(group.header().transaction_type, 'ISR')
self.assertEqual(group.file(), e)
for transaction in group.get_transactions():
for record in transaction.records:
record.to_html()
self.assertTrue(group.valid)
self.assertTrue(e.valid)
del group._header
group.sequence += 2
header = group.get_header()
self.assertFalse(header.valid)
self.assertFalse(group.valid)
self.assertFalse(e.valid)
del group._trailer
trailer = group.get_trailer()
header = group.header()
header.group_code += 1
group.header(header=header)
group.header(header.to_edi())
trailer = group.trailer()
trailer.group_code += 1
trailer.transaction_count += 1
group.trailer(trailer=trailer)
self.assertFalse(trailer.valid)
group.trailer(trailer.to_edi())
group.list_transactions()
self.assertFalse(group.valid)
# self.assertFalse(e.valid)
# del group._trailer
# trailer = group.get_trailer()
# self.assertFalse(trailer.valid)

def test_new_file(self):
"""
Test EDI generation.
"""
EdiFile()
group = EdiGroup(gtype='WRK')
self.assertIsNone(group.get_file())
self.assertIsNone(group.file())
t = EdiTransaction(gtype='WRK')
self.assertEqual(
t.to_dict(), {'error': 'Not implemented for this file type.'})
Expand Down
2 changes: 1 addition & 1 deletion music_metadata/edi/transactions.py
Expand Up @@ -63,7 +63,7 @@ def split_into_records(self):
self.valid &= record.valid
for error in record.errors.values():
if isinstance(error, FileError):
self.errors.append(error)
self.error(error, record)
yield record

def to_dict(self, verbosity=1):
Expand Down

0 comments on commit cab6632

Please sign in to comment.