Skip to content

Commit

Permalink
Replace extract_values with regex formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
djhoese committed Sep 19, 2018
1 parent 34bb659 commit 04babb0
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 144 deletions.
173 changes: 103 additions & 70 deletions trollsift/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,73 +152,108 @@ def _extract_parsedef(fmt):
return parsedef, convdef


def _extract_values(parsedef, stri):
"""
Given a parse definition *parsedef* match and extract key value
pairs from input string *stri*.
"""
if len(parsedef) == 0:
return {}

match = parsedef.pop(0)
# we allow ourselves typechecking
# in case of this subroutine
if isinstance(match, (str, six.text_type)):
# match
if stri.find(match) == 0:
stri_next = stri[len(match):]
return _extract_values(parsedef, stri_next)
else:
raise ValueError
else:
key = list(match)[0]
fmt = match[key]
fmt_list = ["%f", "%a", "%A", "%b", "%B", "%z", "%Z",
"%p", "%c", "%x", "%X"]
if fmt is None or fmt.isalpha() or any([x in fmt for x in fmt_list]):
if len(parsedef) != 0:
next_match = parsedef[0]
# next match is string ...
if isinstance(next_match, (str, six.text_type)):
try:
count = fmt.count(next_match)
except AttributeError:
count = 0
pos = -1
for dummy in range(count + 1):
pos = stri.find(next_match, pos + 1)
value = stri[0:pos]
# next match is string key ...
else:
# pick out segment until string match,
# and parse in reverse,
rev_parsedef = []
x = ''
for x in parsedef:
if isinstance(x, (str, six.text_type)):
break
rev_parsedef.insert(0, x)
rev_parsedef = rev_parsedef + [match]
if isinstance(x, (str, six.text_type)):
rev_stri = stri[:stri.find(x)][::-1]
else:
rev_stri = stri[::-1]
# parse reversely and pick out value
value = _extract_values(rev_parsedef, rev_stri)[key][::-1]
else:
value = stri
stri_next = stri[len(value):]
keyvals = _extract_values(parsedef, stri_next)
keyvals[key] = value
return keyvals
# taken from https://docs.python.org/3/library/re.html#simulating-scanf
spec_regexes = {
'c': r'.',
'd': r'[-+]?\d',
'f': r'[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?',
'i': r'[-+]?(0[xX][\dA-Fa-f]+|0[0-7]*|\d+)',
'o': r'[-+]?[0-7]',
's': r'\S',
'x': r'[-+]?(0[xX])?[\dA-Fa-f]',
}
spec_regexes['e'] = spec_regexes['f']
spec_regexes['E'] = spec_regexes['f']
spec_regexes['g'] = spec_regexes['f']
spec_regexes['X'] = spec_regexes['x']
allow_multiple = ['c', 'd', 'o', 's', 'x', 'X']


class RegexFormatter(string.Formatter):

# special string to mark a parameter not being specified
UNPROVIDED_VALUE = '<trollsift unprovided value>'
ESCAPE_CHARACTERS = [x for x in string.punctuation if x not in '\\%']
ESCAPE_SETS = [(c, '\{}'.format(c)) for c in ESCAPE_CHARACTERS]

def _escape(self, s):
"""Escape bad characters for regular expressions.
Similar to `re.escape` but allows '%' to pass through.
"""
for ch, r_ch in self.ESCAPE_SETS:
s = s.replace(ch, r_ch)
return s

def parse(self, format_string):
parse_ret = super(RegexFormatter, self).parse(format_string)
for literal_text, field_name, format_spec, conversion in parse_ret:
# the parent class will call parse multiple times moving
# 'format_spec' to 'literal_text'. We only escape 'literal_text'
# so we don't escape things twice.
literal_text = self._escape(literal_text)
yield literal_text, field_name, format_spec, conversion

def get_value(self, key, args, kwargs):
try:
return super(RegexFormatter, self).get_value(key, args, kwargs)
except (IndexError, KeyError):
return key, self.UNPROVIDED_VALUE

def _regex_datetime(self, format_spec):
replace_str = format_spec
for fmt_key, fmt_val in DT_FMT.items():
if fmt_key == '%%':
# special case
replace_str.replace('%%', '%')
continue
count = fmt_val.count('?')
# either a series of numbers or letters/numbers
regex = r'\d{{{:d}}}'.format(count) if count else r'[^ \t\n\r\f\v\-_:]+'
replace_str = replace_str.replace(fmt_key, regex)
return replace_str

def regex_field(self, value, format_spec):
if value != self.UNPROVIDED_VALUE:
return super(RegexFormatter, self).format_field(value, format_spec)

# Replace format spec with glob patterns (*, ?, etc)
if not format_spec:
return r'.*'
if '%' in format_spec:
return self._regex_datetime(format_spec)
char_type = spec_regexes[format_spec[-1]]
num_match = re.search('[0-9]+', format_spec)
num = 0 if num_match is None else int(num_match.group(0))
has_multiple = format_spec[-1] in allow_multiple
if num == 0 and has_multiple:
# don't know the count
return r'{}*'.format(char_type)
elif num == 0:
# floats and other types can't have multiple
return char_type
elif format_spec[-1] in allow_multiple:
return r'{}{{{:d}}}'.format(char_type, num)
else:
# find number of chars
num = _get_number_from_fmt(fmt)
value = stri[0:num]
stri_next = stri[len(value):]
keyvals = _extract_values(parsedef, stri_next)
keyvals[key] = value
return keyvals
return r'{}'.format(char_type)

def format_field(self, value, format_spec):
if not isinstance(value, tuple) or value[1] != self.UNPROVIDED_VALUE:
return super(RegexFormatter, self).format_field(value, format_spec)
field_name, value = value
new_value = self.regex_field(value, format_spec)
return '(?P<{}>{})'.format(field_name, new_value)

def extract_values(self, fmt, stri):
regex = self.format(fmt)
match = re.match(regex, stri)
if match is None:
raise ValueError("String does not match pattern.")
return match.groupdict()


regex_formatter = RegexFormatter()


def _get_number_from_fmt(fmt):
Expand Down Expand Up @@ -271,9 +306,7 @@ def _convert(convdef, stri):


def _collect_keyvals_from_parsedef(parsedef):
'''Collect dict keys and values from parsedef.
'''

"""Collect dict keys and values from parsedef."""
keys, vals = [], []

for itm in parsedef:
Expand All @@ -290,7 +323,7 @@ def parse(fmt, stri):
'''

parsedef, convdef = _extract_parsedef(fmt)
keyvals = _extract_values(parsedef, stri)
keyvals = regex_formatter.extract_values(fmt, stri)
for key in convdef.keys():
keyvals[key] = _convert(convdef[key], keyvals[key])

Expand Down
110 changes: 36 additions & 74 deletions trollsift/tests/unittests/test_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import datetime as dt

from trollsift.parser import _extract_parsedef, _extract_values
from trollsift.parser import _extract_parsedef, regex_formatter
from trollsift.parser import _convert, _collect_keyvals_from_parsedef
from trollsift.parser import parse, globify, validate, is_one2one

Expand All @@ -28,122 +28,73 @@ def test_extract_parsedef(self):
'_', {'orbit': '05d'}, '.l1b'])

def test_extract_values(self):
# Run
parsedef = ['/somedir/', {'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}, '.l1b']
result = _extract_values(parsedef, self.string)
# Assert
fmt = "/somedir/{directory}/hrpt_{platform:4s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:d}.l1b"
result = regex_formatter.extract_values(fmt, self.string)
self.assertDictEqual(result, {'directory': 'otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_end(self):
# Run
parsedef = ['/somedir/', {'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}]
result = _extract_values(parsedef, self.string3)
# Assert
fmt = "/somedir/{directory}/hrpt_{platform:4s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:d}"
result = regex_formatter.extract_values(fmt, self.string3)
self.assertDictEqual(result, {'directory': 'otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_beginning(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}]
result = _extract_values(parsedef, self.string4)
# Assert
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_beginning(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}]
result = _extract_values(parsedef, self.string4)
# Assert
fmt = "{directory}/hrpt_{platform:4s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:d}"
result = regex_formatter.extract_values(fmt, self.string4)
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_s4spair(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': 's'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}]
result = _extract_values(parsedef, self.string4)
# Assert
fmt = "{directory}/hrpt_{platform:4s}{platnum:s}_{time:%Y%m%d_%H%M}_{orbit:d}"
result = regex_formatter.extract_values(fmt, self.string4)
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_ss2pair(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': 's'}, {'platnum': 's2'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': 'd'}]
result = _extract_values(parsedef, self.string4)
# Assert
fmt = "{directory}/hrpt_{platform:s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:d}"
result = regex_formatter.extract_values(fmt, self.string4)
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '69022'})

def test_extract_values_ss2pair_end(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': 's'}, {'platnum': 's2'}]
result = _extract_values(parsedef, "/somedir/otherdir/hrpt_noaa16")
# Assert
fmt = "{directory}/hrpt_{platform:s}{platnum:2s}"
result = regex_formatter.extract_values(fmt, "/somedir/otherdir/hrpt_noaa16")
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'platnum': '16'})

def test_extract_values_sdatetimepair_end(self):
# Run
parsedef = [{'directory': None}, '/hrpt_',
{'platform': 's'}, {'date': '%Y%m%d'}]
result = _extract_values(
parsedef, "/somedir/otherdir/hrpt_noaa20140212")
# Assert
fmt = "{directory}/hrpt_{platform:s}{date:%Y%m%d}"
result = regex_formatter.extract_values(fmt, "/somedir/otherdir/hrpt_noaa20140212")
self.assertDictEqual(result, {'directory': '/somedir/otherdir',
'platform': 'noaa', 'date': '20140212'})

def test_extract_values_everything(self):
# Run
parsedef = [{'everything': None}]
result = _extract_values(parsedef, self.string)
# Assert
fmt = "{everything}"
result = regex_formatter.extract_values(fmt, self.string)
self.assertDictEqual(
result, {'everything': '/somedir/otherdir/hrpt_noaa16_20140210_1004_69022.l1b'})

def test_extract_values_padding2(self):
# Run
parsedef = ['/somedir/', {'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_',
{'orbit': '0>5d'}, '.l1b']
result = _extract_values(parsedef, self.string2)
fmt = "/somedir/{directory}/hrpt_{platform:4s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:0>5d}.l1b"
# parsedef = ['/somedir/', {'directory': None}, '/hrpt_',
# {'platform': '4s'}, {'platnum': '2s'},
# '_', {'time': '%Y%m%d_%H%M'}, '_',
# {'orbit': '0>5d'}, '.l1b']
result = regex_formatter.extract_values(fmt, self.string2)
# Assert
self.assertDictEqual(result, {'directory': 'otherdir',
'platform': 'noaa', 'platnum': '16',
'time': '20140210_1004', 'orbit': '00022'})

def test_extract_values_fails(self):
# Run
parsedef = ['/somedir/', {'directory': None}, '/hrpt_',
{'platform': '4s'}, {'platnum': '2s'},
'_', {'time': '%Y%m%d_%H%M'}, '_', {'orbit': '4d'}, '.l1b']
self.assertRaises(ValueError, _extract_values, parsedef, self.string)
fmt = '/somedir/{directory}/hrpt_{platform:4s}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:4d}.l1b'
self.assertRaises(ValueError, regex_formatter.extract_values, fmt, self.string)

def test_convert_digits(self):
self.assertEqual(_convert('d', '69022'), 69022)
Expand All @@ -164,6 +115,17 @@ def test_parse(self):
'time': dt.datetime(2014, 2, 12, 14, 12),
'orbit': 12345})

def test_parse_wildcards(self):
# Run
result = parse(
"hrpt_{platform}{platnum:2s}_{time:%Y%m%d_%H%M}_{orbit:05d}{ext}",
"hrpt_noaa19_20140212_1412_12345.l1b")
# Assert
self.assertDictEqual(result, {'platform': 'noaa', 'platnum': '19',
'time': dt.datetime(2014, 2, 12, 14, 12),
'orbit': 12345,
'ext': '.l1b'})

def test_parse_align(self):
filepattern="H-000-{hrit_format:4s}__-{platform_name:4s}________-{channel_name:_<9s}-{segment:_<9s}-{start_time:%Y%m%d%H%M}-__"
result = parse(filepattern, "H-000-MSG3__-MSG3________-IR_039___-000007___-201506051700-__")
Expand Down

0 comments on commit 04babb0

Please sign in to comment.