Skip to content

Commit

Permalink
Add presorted ifexists/ifnotexist processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
CraigMiloRogers committed Oct 20, 2020
1 parent 98293c8 commit 99260cc
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
6 changes: 6 additions & 0 deletions kgtk/cli/ifexists.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def h(msg: str)->str:
help="Preserve record order when cacheing the input file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--presorted", dest="presorted", help="When True, assume that the input and filter files are both presorted. Use a merge-style algorithm that does not require caching either file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--field-separator", dest="field_separator",
help=h("Separator for multifield keys (default=%(default)s)"),
default=KgtkIfExists.FIELD_SEPARATOR_DEFAULT)
Expand All @@ -91,6 +94,7 @@ def run(input_file: KGTKFiles,

cache_input: bool = False,
preserve_order: bool = False,
presorted: bool = False,

field_separator: typing.Optional[str] = None,

Expand Down Expand Up @@ -143,6 +147,7 @@ def run(input_file: KGTKFiles,
print("--filter-keys=%s" % " ".join(filter_keys), file=error_file)
print("--cache-input=%s" % str(cache_input), file=error_file)
print("--preserve-order=%s" % str(preserve_order), file=error_file)
print("--presortedr=%s" % str(presorted), file=error_file)
print("--field-separator=%s" % repr(field_separator), file=error_file)
input_reader_options.show(out=error_file, who="input")
filter_reader_options.show(out=error_file, who="filter")
Expand All @@ -160,6 +165,7 @@ def run(input_file: KGTKFiles,
invert=False,
cache_input=cache_input,
preserve_order=preserve_order,
presorted=presorted,
field_separator=field_separator,
input_reader_options=input_reader_options,
filter_reader_options=filter_reader_options,
Expand Down
6 changes: 6 additions & 0 deletions kgtk/cli/ifnotexists.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def h(msg: str)->str:
help="Preserve record order when cacheing the input file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--presorted", dest="presorted", help="When True, assume that the input and filter files are both presorted. Use a merge-style algorithm that does not require caching either file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--field-separator", dest="field_separator",
help=h("Separator for multifield keys (default=%(default)s)"),
default=KgtkIfExists.FIELD_SEPARATOR_DEFAULT)
Expand All @@ -91,6 +94,7 @@ def run(input_file: KGTKFiles,

cache_input: bool = False,
preserve_order: bool = False,
presorted: bool = False,

field_separator: typing.Optional[str] = None,

Expand Down Expand Up @@ -143,6 +147,7 @@ def run(input_file: KGTKFiles,
print("--filter-keys=%s" % " ".join(filter_keys), file=error_file)
print("--cache-input=%s" % str(cache_input), file=error_file)
print("--preserve-order=%s" % str(preserve_order), file=error_file)
print("--presortedr=%s" % str(presorted), file=error_file)
print("--field-separator='%s'" % repr(field_separator), file=error_file)
input_reader_options.show(out=error_file, who="input")
filter_reader_options.show(out=error_file, who="filter")
Expand All @@ -160,6 +165,7 @@ def run(input_file: KGTKFiles,
invert=True,
cache_input=cache_input,
preserve_order=preserve_order,
presorted=presorted,
field_separator=field_separator,
input_reader_options=input_reader_options,
filter_reader_options=filter_reader_options,
Expand Down
111 changes: 109 additions & 2 deletions kgtk/iff/kgtkifexists.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
Note: By default, this implementation builds im-memory sets of all the key
values in the second file (the filter file). Optionally, it will cache the
first file (the input file) instead.
first file (the input file) instead. If both input files are presorted,
neither file will be cached.
Note: By default, input records are passed in order to the output file. When
the input file is cached, the output records are order by key value (alpha
Expand Down Expand Up @@ -55,6 +56,7 @@ class KgtkIfExists(KgtkFormat):
invert: bool = attr.ib(validator=attr.validators.instance_of(bool), default=False)
cache_input: bool = attr.ib(validator=attr.validators.instance_of(bool), default=False)
preserve_order: bool = attr.ib(validator=attr.validators.instance_of(bool), default=False)
presorted: bool = attr.ib(validator=attr.validators.instance_of(bool), default=False)

# TODO: find working validators
# value_options: typing.Optional[KgtkValueOptions] = attr.ib(attr.validators.optional(attr.validators.instance_of(KgtkValueOptions)), default=None)
Expand Down Expand Up @@ -200,6 +202,98 @@ def process_cacheing_filter(self,
file=self.error_file, flush=True)


def process_presorted_files(self,
input_kr: KgtkReader,
filter_kr: KgtkReader,
input_key_columns: typing.List[int],
filter_key_columns: typing.List[int],
ew: typing.Optional[KgtkWriter],
rew: typing.Optional[KgtkWriter],
):
if self.verbose:
print("Processing presorted files.", file=self.error_file, flush=True)

input_line_count: int = 0
accept_line_count: int = 0
reject_line_count: int = 0

filter_row: typing.Optional[typing.List[str]] = None
filter_key: typing.Optional[str] = None
filter_done: bool = False

# TODO: join these two code paths using xor?
row: typing.List[str]
input_key: str
for row in input_kr:
# We have read another row from the input file.
input_line_count += 1
if filter_done:
if self.very_verbose:
print("Draining [%s]" % ", ".join([item for item in row]), file=self.error_file, flush=True)
# The filter file has run out of rows.
if self.invert:
accept_line_count += 1
if ew is not None:
ew.write(row)
else:
reject_line_count += 1
if rew is not None:
rew.write(row)
continue


input_key = self.build_key(row, input_key_columns)
if filter_key is None or input_key > filter_key:
# Either we have not yet read a filter row, or the input
# row is beyond the current fiklter row in sorted order.
# Read more filter rows.
for filter_row in filter_kr:
filter_key = self.build_key(filter_row, filter_key_columns)
if input_key <= filter_key:
# Either we have a match, or the filter row is now beyond
# the input row in sorted order.
break
if self.very_verbose:
print("Skipping filter row [%s]" ", ".join([item for intem in filter_row]), file=self.error_file, flush=True)
else:
# The filter file has run out of filter rows.
if self.very_verbose:
print("Out of filter rows", file=self.error_file, flush=True)
filter_key = None
filter_done = True

if filter_key is None or input_key < filter_key:
# Either the filter file has run out of filter rows, or the filter row is
# beyond the input row in sorted order.
if self.very_verbose:
print("Skip this input row: [%s]" % ", ".join([item for item in row]), file=self.error_file, flush=True)
if self.invert:
accept_line_count += 1
if ew is not None:
ew.write(row)
else:
reject_line_count += 1
if rew is not None:
rew.write(row)

else: # input_key == filter_key
# If we get her, the input row has a matching filter row.
if self.very_verbose:
print("Keep this input row: [%s]" % ", ".join([item for item in row]), file=self.error_file, flush=True)
if self.invert:
reject_line_count += 1
if rew is not None:
rew.write(row)
else:
accept_line_count += 1
if ew is not None:
ew.write(row)

if self.verbose:
print("Read %d records, accepted %d records, rejected %d records." % (input_line_count, accept_line_count, reject_line_count),
file=self.error_file, flush=True)


def process_cacheing_input(self,
input_kr: KgtkReader,
filter_kr: KgtkReader,
Expand Down Expand Up @@ -414,7 +508,15 @@ def process(self):
verbose=self.verbose,
very_verbose=self.very_verbose)

if self.cache_input:
if self.presorted:
self.process_presorted_files(input_kr=input_kr,
filter_kr=filter_kr,
input_key_columns=input_key_columns,
filter_key_columns=filter_key_columns,
ew=ew,
rew=rew)

elif self.cache_input:
if self.preserve_order:
self.process_cacheing_input_preserving_order(input_kr=input_kr,
filter_kr=filter_kr,
Expand Down Expand Up @@ -466,6 +568,9 @@ def main():
parser.add_argument( "--preserve-order", dest="preserve_order", help="Preserve record order when cacheing the input file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--presorted", dest="presorted", help="When True, assume that the input and filter files are both presorted. Use a merge-style algorithm that does not require caching either file. (default=%(default)s).",
type=optional_bool, nargs='?', const=True, default=False)

parser.add_argument( "--input-keys", dest="input_keys", help="The key columns in the input file (default=None).", nargs='*')
parser.add_argument( "--filter-keys", dest="filter_keys", help="The key columns in the filter file (default=None).", nargs='*')

Expand All @@ -492,6 +597,7 @@ def main():
print("--invert=%s" % str(args.invert), file=error_file)
print("--cache-input=%s" % str(args.cache_input), file=error_file)
print("--preserve-order=%s" % str(args.preserve_order), file=error_file)
print("--presorted=%s" % str(args.presorted), file=error_file)
if args.input_keys is not None:
print("--input-keys %s" % " ".join(args.input_keys), file=error_file)
if args.filter_keys is not None:
Expand All @@ -510,6 +616,7 @@ def main():
invert=args.invert,
cache_input=args.cache_input,
preserve_order=args.preserve_order,
presorted=args.presorted,
input_reader_options=input_reader_options,
filter_reader_options=filter_reader_options,
value_options=value_options,
Expand Down

0 comments on commit 99260cc

Please sign in to comment.