Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename and cleanup follow-up for chunk synchronization #327

Merged
merged 1 commit into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 39 additions & 17 deletions splunklib/searchcommands/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import os
import re
import sys
import warnings

from . import environment

Expand Down Expand Up @@ -505,8 +506,8 @@ def __init__(self, ofile, maxresultrows=None):

self._inspector = OrderedDict()
self._chunk_count = 0
self._record_count = 0
self._total_record_count = 0
self._pending_record_count = 0
self._committed_record_count = 0

@property
def is_flushed(self):
Expand All @@ -524,6 +525,30 @@ def ofile(self):
def ofile(self, value):
self._ofile = set_binary_mode(value)

@property
def pending_record_count(self):
return self._pending_record_count

@property
def _record_count(self):
warnings.warn(
"_record_count will be deprecated soon. Use pending_record_count instead.",
PendingDeprecationWarning
)
return self.pending_record_count

@property
def committed_record_count(self):
return self._committed_record_count

@property
def _total_record_count(self):
warnings.warn(
"_total_record_count will be deprecated soon. Use committed_record_count instead.",
PendingDeprecationWarning
)
return self.committed_record_count

def write(self, data):
bytes_type = bytes if sys.version_info >= (3, 0) else str
if not isinstance(data, bytes_type):
Expand Down Expand Up @@ -555,7 +580,7 @@ def _clear(self):
self._buffer.seek(0)
self._buffer.truncate()
self._inspector.clear()
self._record_count = 0
self._pending_record_count = 0

def _ensure_validity(self):
if self._finished is True:
Expand Down Expand Up @@ -650,9 +675,9 @@ def _write_record(self, record):
values += (repr(value), None)

self._writerow(values)
self._record_count += 1
self._pending_record_count += 1

if self._record_count >= self._maxresultrows:
if self.pending_record_count >= self._maxresultrows:
self.flush(partial=True)

try:
Expand Down Expand Up @@ -689,7 +714,7 @@ def flush(self, finished=None, partial=None):

RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance

if self._record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector):
if self.pending_record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector):

messages = self._inspector.get('messages')

Expand Down Expand Up @@ -727,9 +752,9 @@ def flush(self, finished=None, partial=None):
print(level, text, file=stderr)

self.write(self._buffer.getvalue())
self._clear()
self._chunk_count += 1
self._total_record_count += self._record_count
self._committed_record_count += self.pending_record_count
self._clear()

self._finished = finished is True

Expand Down Expand Up @@ -758,25 +783,22 @@ def flush(self, finished=None, partial=None):

def write_chunk(self, finished=None):
inspector = self._inspector
self._total_record_count += self._record_count
self._committed_record_count += self.pending_record_count
self._chunk_count += 1

# TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in
# ChunkedExternProcessor (See SPL-103525)
#
# We will need to replace the following block of code with this block:
#
# metadata = [
# ('inspector', self._inspector if len(self._inspector) else None),
# ('finished', finished),
# ('partial', partial)]
# metadata = [item for item in (('inspector', inspector), ('finished', finished), ('partial', partial))]
#
# if partial is True:
# finished = False

if len(inspector) == 0:
inspector = None

#if partial is True:
# finished = False

metadata = [item for item in (('inspector', inspector), ('finished', finished))]
self._write_chunk(metadata, self._buffer.getvalue())
self._clear()
Expand All @@ -794,7 +816,7 @@ def write_metric(self, name, value):
self._inspector['metric.' + name] = value

def _clear(self):
RecordWriter._clear(self)
super(RecordWriterV2, self)._clear()
amysutedja marked this conversation as resolved.
Show resolved Hide resolved
self._fieldnames = None

def _write_chunk(self, metadata, body):
Expand Down
10 changes: 6 additions & 4 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ def _process_protocol_v2(self, argv, ifile, ofile):
# noinspection PyBroadException
try:
debug('Executing under protocol_version=2')
#self._records = self._records_protocol_v2
self._metadata.action = 'execute'
self._execute(ifile, None)
except SystemExit:
Expand Down Expand Up @@ -951,9 +950,12 @@ def _execute_v2(self, ifile, process):

def _execute_chunk_v2(self, process, chunk):
metadata, body = chunk
if len(body) > 0:
records = self._read_csv_records(StringIO(body))
self._record_writer.write_records(process(records))

if len(body) <= 0:
return

records = self._read_csv_records(StringIO(body))
self._record_writer.write_records(process(records))


def _report_unexpected_error(self):
Expand Down
6 changes: 5 additions & 1 deletion tests/searchcommands/test_internals_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ def test_record_writer_with_random_data(self, save_recording=False):

self.assertEqual(writer._chunk_count, 0)
self.assertEqual(writer._record_count, 31)
self.assertEqual(writer.pending_record_count, 31)
self.assertGreater(writer._buffer.tell(), 0)
self.assertEqual(writer._total_record_count, 0)
self.assertEqual(writer.committed_record_count, 0)
self.assertListEqual(writer._fieldnames, fieldnames)
self.assertListEqual(writer._inspector['messages'], messages)

Expand All @@ -242,16 +244,18 @@ def test_record_writer_with_random_data(self, save_recording=False):

self.assertEqual(writer._chunk_count, 1)
self.assertEqual(writer._record_count, 0)
self.assertEqual(writer.pending_record_count, 0)
self.assertEqual(writer._buffer.tell(), 0)
self.assertEqual(writer._buffer.getvalue(), '')
self.assertEqual(writer._total_record_count, 31)
self.assertEqual(writer.committed_record_count, 31)

self.assertRaises(AssertionError, writer.flush, finished=True, partial=True)
self.assertRaises(AssertionError, writer.flush, finished='non-boolean')
self.assertRaises(AssertionError, writer.flush, partial='non-boolean')
self.assertRaises(AssertionError, writer.flush)

# For SCPv2 we should follow the finish negotiation protocol.
# P2 [ ] TODO: For SCPv2 we should follow the finish negotiation protocol.
# self.assertRaises(RuntimeError, writer.write_record, {})

self.assertFalse(writer._ofile.closed)
Expand Down