From e7401689fff48994c0a4b5e1f27de53ad1549130 Mon Sep 17 00:00:00 2001 From: Amy Sutedja Date: Fri, 5 Jun 2020 14:12:46 -0700 Subject: [PATCH] Rename and cleanup follow-up for chunk synchronization --- splunklib/searchcommands/internals.py | 56 +++++++++++++++------- splunklib/searchcommands/search_command.py | 10 ++-- tests/searchcommands/test_internals_v2.py | 6 ++- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index d9fdf9e7a..41caceb32 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -35,6 +35,7 @@ import os import re import sys +import warnings from . import environment @@ -505,8 +506,8 @@ def __init__(self, ofile, maxresultrows=None): self._inspector = OrderedDict() self._chunk_count = 0 - self._record_count = 0 - self._total_record_count = 0 + self._pending_record_count = 0 + self._committed_record_count = 0 @property def is_flushed(self): @@ -524,6 +525,30 @@ def ofile(self): def ofile(self, value): self._ofile = set_binary_mode(value) + @property + def pending_record_count(self): + return self._pending_record_count + + @property + def _record_count(self): + warnings.warn( + "_record_count will be deprecated soon. Use pending_record_count instead.", + PendingDeprecationWarning + ) + return self.pending_record_count + + @property + def committed_record_count(self): + return self._committed_record_count + + @property + def _total_record_count(self): + warnings.warn( + "_total_record_count will be deprecated soon. Use committed_record_count instead.", + PendingDeprecationWarning + ) + return self.committed_record_count + def write(self, data): bytes_type = bytes if sys.version_info >= (3, 0) else str if not isinstance(data, bytes_type): @@ -555,7 +580,7 @@ def _clear(self): self._buffer.seek(0) self._buffer.truncate() self._inspector.clear() - self._record_count = 0 + self._pending_record_count = 0 def _ensure_validity(self): if self._finished is True: @@ -650,9 +675,9 @@ def _write_record(self, record): values += (repr(value), None) self._writerow(values) - self._record_count += 1 + self._pending_record_count += 1 - if self._record_count >= self._maxresultrows: + if self.pending_record_count >= self._maxresultrows: self.flush(partial=True) try: @@ -689,7 +714,7 @@ def flush(self, finished=None, partial=None): RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance - if self._record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector): + if self.pending_record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector): messages = self._inspector.get('messages') @@ -727,9 +752,9 @@ def flush(self, finished=None, partial=None): print(level, text, file=stderr) self.write(self._buffer.getvalue()) - self._clear() self._chunk_count += 1 - self._total_record_count += self._record_count + self._committed_record_count += self.pending_record_count + self._clear() self._finished = finished is True @@ -758,7 +783,7 @@ def flush(self, finished=None, partial=None): def write_chunk(self, finished=None): inspector = self._inspector - self._total_record_count += self._record_count + self._committed_record_count += self.pending_record_count self._chunk_count += 1 # TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in @@ -766,17 +791,14 @@ def write_chunk(self, finished=None): # # We will need to replace the following block of code with this block: # - # metadata = [ - # ('inspector', self._inspector if len(self._inspector) else None), - # ('finished', finished), - # ('partial', partial)] + # metadata = [item for item in (('inspector', inspector), ('finished', finished), ('partial', partial))] + # + # if partial is True: + # finished = False if len(inspector) == 0: inspector = None - #if partial is True: - # finished = False - metadata = [item for item in (('inspector', inspector), ('finished', finished))] self._write_chunk(metadata, self._buffer.getvalue()) self._clear() @@ -794,7 +816,7 @@ def write_metric(self, name, value): self._inspector['metric.' + name] = value def _clear(self): - RecordWriter._clear(self) + super(RecordWriterV2, self)._clear() self._fieldnames = None def _write_chunk(self, metadata, body): diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index ef94f5a09..a093b7d96 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -776,7 +776,6 @@ def _process_protocol_v2(self, argv, ifile, ofile): # noinspection PyBroadException try: debug('Executing under protocol_version=2') - #self._records = self._records_protocol_v2 self._metadata.action = 'execute' self._execute(ifile, None) except SystemExit: @@ -951,9 +950,12 @@ def _execute_v2(self, ifile, process): def _execute_chunk_v2(self, process, chunk): metadata, body = chunk - if len(body) > 0: - records = self._read_csv_records(StringIO(body)) - self._record_writer.write_records(process(records)) + + if len(body) <= 0: + return + + records = self._read_csv_records(StringIO(body)) + self._record_writer.write_records(process(records)) def _report_unexpected_error(self): diff --git a/tests/searchcommands/test_internals_v2.py b/tests/searchcommands/test_internals_v2.py index 0d0f05e3c..bdef65c4a 100755 --- a/tests/searchcommands/test_internals_v2.py +++ b/tests/searchcommands/test_internals_v2.py @@ -229,8 +229,10 @@ def test_record_writer_with_random_data(self, save_recording=False): self.assertEqual(writer._chunk_count, 0) self.assertEqual(writer._record_count, 31) + self.assertEqual(writer.pending_record_count, 31) self.assertGreater(writer._buffer.tell(), 0) self.assertEqual(writer._total_record_count, 0) + self.assertEqual(writer.committed_record_count, 0) self.assertListEqual(writer._fieldnames, fieldnames) self.assertListEqual(writer._inspector['messages'], messages) @@ -242,16 +244,18 @@ def test_record_writer_with_random_data(self, save_recording=False): self.assertEqual(writer._chunk_count, 1) self.assertEqual(writer._record_count, 0) + self.assertEqual(writer.pending_record_count, 0) self.assertEqual(writer._buffer.tell(), 0) self.assertEqual(writer._buffer.getvalue(), '') self.assertEqual(writer._total_record_count, 31) + self.assertEqual(writer.committed_record_count, 31) self.assertRaises(AssertionError, writer.flush, finished=True, partial=True) self.assertRaises(AssertionError, writer.flush, finished='non-boolean') self.assertRaises(AssertionError, writer.flush, partial='non-boolean') self.assertRaises(AssertionError, writer.flush) - # For SCPv2 we should follow the finish negotiation protocol. + # P2 [ ] TODO: For SCPv2 we should follow the finish negotiation protocol. # self.assertRaises(RuntimeError, writer.write_record, {}) self.assertFalse(writer._ofile.closed)