From 4f4e609cf97ed9326ab6306eb02998dbc95366c6 Mon Sep 17 00:00:00 2001 From: David Poncelow Date: Mon, 5 Oct 2020 15:33:32 -0700 Subject: [PATCH 1/5] initial end-to-end tests for streaming, reporting, generating --- splunklib/searchcommands/search_command.py | 3 +- tests/searchcommands/chunked_data_stream.py | 93 +++++++++++++++++++ .../searchcommands/test_generator_command.py | 41 ++++++++ .../searchcommands/test_reporting_command.py | 34 +++++++ .../searchcommands/test_streaming_command.py | 29 ++++++ 5 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 tests/searchcommands/chunked_data_stream.py create mode 100644 tests/searchcommands/test_generator_command.py create mode 100644 tests/searchcommands/test_reporting_command.py create mode 100644 tests/searchcommands/test_streaming_command.py diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 8ec82bba0..faed2eaff 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -851,7 +851,8 @@ def _execute(self, ifile, process): @staticmethod def _as_binary_stream(ifile): - if six.PY2: + naught = ifile.read(0) + if isinstance(naught, bytes): return ifile try: diff --git a/tests/searchcommands/chunked_data_stream.py b/tests/searchcommands/chunked_data_stream.py new file mode 100644 index 000000000..d0e216aa9 --- /dev/null +++ b/tests/searchcommands/chunked_data_stream.py @@ -0,0 +1,93 @@ +import collections +import csv +import io +import json + +import splunklib.searchcommands.internals +from splunklib import six + + +class Chunk(object): + def __init__(self, version, meta, data): + self.version = six.ensure_str(version) + self.meta = json.loads(meta) + self.data = csv.DictReader(io.StringIO(data.decode("utf-8")), dialect=splunklib.searchcommands.internals.CsvDialect) + + +class ChunkedDataStreamIter(collections.Iterator): + def __init__(self, chunk_stream): + self.chunk_stream = chunk_stream + + def __next__(self): + return self.next() + + def next(self): + try: + return self.chunk_stream.read_chunk() + except EOFError: + raise StopIteration + + +class ChunkedDataStream(collections.Iterable): + def __iter__(self): + return ChunkedDataStreamIter(self) + + def __init__(self, stream): + empty = stream.read(0) + assert isinstance(empty, bytes) + self.stream = stream + + def read_chunk(self): + header = self.stream.readline() + if len(header) == 0: + raise EOFError + + while len(header) > 0 and header.strip() == b'': + header = self.stream.readline() # Skip empty lines + version, meta, data = header.rstrip().split(b',') + metabytes = self.stream.read(int(meta)) + databytes = self.stream.read(int(data)) + print("sent") + return Chunk(version, metabytes, databytes) + + +def build_chunk(kv, data=None): + metadata = six.ensure_binary(json.dumps(kv), 'utf-8') + data_output = _build_data_csv(data) + return b"chunked 1.0,%d,%d\n%s%s" % (len(metadata), len(data_output), metadata, data_output) + + +def build_empty_searchinfo(): + return { + 'earliest_time': 0, + 'latest_time': 0, + 'search': "", + 'dispatch_dir': "", + 'sid': "", + 'args': [], + 'splunk_version': "42.3.4", + } + + +def build_getinfo_chunk(): + return build_chunk({'action': 'getinfo', 'preview': False, 'searchinfo': build_empty_searchinfo()}) + + +def build_data_chunk(data, finished=True): + return build_chunk({'action': 'execute', 'finished': finished}, data) + + +def _build_data_csv(data): + if data is None: + return b'' + if isinstance(data, bytes): + return data + csvout = io.StringIO() + headers = set() + for datum in data: + headers.update(datum.keys()) + writer = csv.DictWriter(csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect) + writer.writeheader() + for datum in data: + writer.writerow(datum) + return csvout.getvalue().encode("utf-8") diff --git a/tests/searchcommands/test_generator_command.py b/tests/searchcommands/test_generator_command.py new file mode 100644 index 000000000..7e675a70f --- /dev/null +++ b/tests/searchcommands/test_generator_command.py @@ -0,0 +1,41 @@ +import io +import time + +from . import chunked_data_stream as chunky + +from splunklib.searchcommands import Configuration, GeneratingCommand + + +def test_simple_generator(): + @Configuration() + class GeneratorTest(GeneratingCommand): + def generate(self): + for num in range(1, 10): + yield {'_time': time.time(), 'event_index': num} + generator = GeneratorTest() + in_stream = io.BytesIO() + in_stream.write(chunky.build_getinfo_chunk()) + in_stream.write(chunky.build_chunk({'action': 'execute'})) + in_stream.seek(0) + out_stream = io.BytesIO() + generator._process_protocol_v2([], in_stream, out_stream) + out_stream.seek(0) + + ds = chunky.ChunkedDataStream(out_stream) + is_first_chunk = True + finished_seen = False + expected = set(map(lambda i: str(i), range(1, 10))) + seen = set() + for chunk in ds: + if is_first_chunk: + assert chunk.meta["generating"] is True + assert chunk.meta["type"] == "stateful" + is_first_chunk = False + finished_seen = chunk.meta.get("finished", False) + for row in chunk.data: + seen.add(row["event_index"]) + assert expected.issubset(seen) + assert finished_seen + + + diff --git a/tests/searchcommands/test_reporting_command.py b/tests/searchcommands/test_reporting_command.py new file mode 100644 index 000000000..e5add818c --- /dev/null +++ b/tests/searchcommands/test_reporting_command.py @@ -0,0 +1,34 @@ +import io + +import splunklib.searchcommands as searchcommands +from . import chunked_data_stream as chunky + + +def test_simple_reporting_command(): + @searchcommands.Configuration() + class TestReportingCommand(searchcommands.ReportingCommand): + def reduce(self, records): + value = 0 + for record in records: + value += int(record["value"]) + yield {'sum': value} + + cmd = TestReportingCommand() + ifile = io.BytesIO() + data = list() + for i in range(0, 10): + data.append({"value": str(i)}) + ifile.write(chunky.build_getinfo_chunk()) + ifile.write(chunky.build_data_chunk(data)) + ifile.seek(0) + ofile = io.BytesIO() + cmd._process_protocol_v2([], ifile, ofile) + ofile.seek(0) + chunk_stream = chunky.ChunkedDataStream(ofile) + getinfo_response = chunk_stream.read_chunk() + assert getinfo_response.meta['type'] == 'reporting' + data_chunk = chunk_stream.read_chunk() + assert data_chunk.meta['finished'] is True # Should only be one row + data = list(data_chunk.data) + assert len(data) == 1 + assert int(data[0]['sum']) == sum(range(0, 10)) diff --git a/tests/searchcommands/test_streaming_command.py b/tests/searchcommands/test_streaming_command.py new file mode 100644 index 000000000..dcc00b53e --- /dev/null +++ b/tests/searchcommands/test_streaming_command.py @@ -0,0 +1,29 @@ +import io + +from . import chunked_data_stream as chunky +from splunklib.searchcommands import StreamingCommand, Configuration + + +def test_simple_streaming_command(): + @Configuration() + class TestStreamingCommand(StreamingCommand): + + def stream(self, records): + for record in records: + record["out_index"] = record["in_index"] + yield record + + cmd = TestStreamingCommand() + ifile = io.BytesIO() + ifile.write(chunky.build_getinfo_chunk()) + data = list() + for i in range(0, 10): + data.append({"in_index": str(i)}) + ifile.write(chunky.build_data_chunk(data, finished=True)) + ifile.seek(0) + ofile = io.BytesIO() + cmd._process_protocol_v2([], ifile, ofile) + ofile.seek(0) + output = chunky.ChunkedDataStream(ofile) + getinfo_response = output.read_chunk() + assert getinfo_response.meta["type"] == "streaming" From 6f63ec178e42d1a0c89a9ac0463dd9a57d033290 Mon Sep 17 00:00:00 2001 From: David Poncelow Date: Wed, 7 Oct 2020 14:58:10 -0700 Subject: [PATCH 2/5] add workaround for python 2.7 --- tests/searchcommands/chunked_data_stream.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/searchcommands/chunked_data_stream.py b/tests/searchcommands/chunked_data_stream.py index d0e216aa9..73fea73a9 100644 --- a/tests/searchcommands/chunked_data_stream.py +++ b/tests/searchcommands/chunked_data_stream.py @@ -82,12 +82,22 @@ def _build_data_csv(data): return b'' if isinstance(data, bytes): return data - csvout = io.StringIO() + if six.PY2: + csvout = io.BytesIO() + else: + csvout = io.StringIO() + headers = set() for datum in data: - headers.update(datum.keys()) + if six.PY2: + headers.update(datum.keys()) + else: + headers.update(datum.keys()) writer = csv.DictWriter(csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect) writer.writeheader() for datum in data: - writer.writerow(datum) - return csvout.getvalue().encode("utf-8") + if six.PY2: + writer.writerow(datum) + else: + writer.writerow(datum) + return six.ensure_binary(csvout.getvalue()) From 348c80b4d9b2ae099143705ab1d666621e943aa3 Mon Sep 17 00:00:00 2001 From: David Poncelow Date: Mon, 19 Oct 2020 13:53:22 -0700 Subject: [PATCH 3/5] address code review comments --- tests/searchcommands/chunked_data_stream.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/tests/searchcommands/chunked_data_stream.py b/tests/searchcommands/chunked_data_stream.py index 73fea73a9..0bd0ff8a2 100644 --- a/tests/searchcommands/chunked_data_stream.py +++ b/tests/searchcommands/chunked_data_stream.py @@ -39,11 +39,12 @@ def __init__(self, stream): def read_chunk(self): header = self.stream.readline() - if len(header) == 0: - raise EOFError while len(header) > 0 and header.strip() == b'': header = self.stream.readline() # Skip empty lines + + if len(header) == 0: + raise EOFError version, meta, data = header.rstrip().split(b',') metabytes = self.stream.read(int(meta)) databytes = self.stream.read(int(data)) @@ -82,22 +83,13 @@ def _build_data_csv(data): return b'' if isinstance(data, bytes): return data - if six.PY2: - csvout = io.BytesIO() - else: - csvout = io.StringIO() + csvout = splunklib.six.StringIO() headers = set() for datum in data: - if six.PY2: - headers.update(datum.keys()) - else: - headers.update(datum.keys()) + headers.update(datum.keys()) writer = csv.DictWriter(csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect) writer.writeheader() for datum in data: - if six.PY2: - writer.writerow(datum) - else: - writer.writerow(datum) + writer.writerow(datum) return six.ensure_binary(csvout.getvalue()) From efaa7e151a3288931f1d794c4cd40acbcbe118e8 Mon Sep 17 00:00:00 2001 From: David Poncelow Date: Wed, 4 Nov 2020 16:03:18 -0800 Subject: [PATCH 4/5] lint changes and formatting --- tests/searchcommands/chunked_data_stream.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/searchcommands/chunked_data_stream.py b/tests/searchcommands/chunked_data_stream.py index 0bd0ff8a2..2e2726fb2 100644 --- a/tests/searchcommands/chunked_data_stream.py +++ b/tests/searchcommands/chunked_data_stream.py @@ -11,7 +11,9 @@ class Chunk(object): def __init__(self, version, meta, data): self.version = six.ensure_str(version) self.meta = json.loads(meta) - self.data = csv.DictReader(io.StringIO(data.decode("utf-8")), dialect=splunklib.searchcommands.internals.CsvDialect) + dialect = splunklib.searchcommands.internals.CsvDialect + self.data = csv.DictReader(io.StringIO(data.decode("utf-8")), + dialect=dialect) class ChunkedDataStreamIter(collections.Iterator): @@ -40,20 +42,19 @@ def __init__(self, stream): def read_chunk(self): header = self.stream.readline() - while len(header) > 0 and header.strip() == b'': + while header > 0 and header.strip() == b'': header = self.stream.readline() # Skip empty lines - if len(header) == 0: + if not header == 0: raise EOFError version, meta, data = header.rstrip().split(b',') metabytes = self.stream.read(int(meta)) databytes = self.stream.read(int(data)) - print("sent") return Chunk(version, metabytes, databytes) -def build_chunk(kv, data=None): - metadata = six.ensure_binary(json.dumps(kv), 'utf-8') +def build_chunk(keyval, data=None): + metadata = six.ensure_binary(json.dumps(keyval), 'utf-8') data_output = _build_data_csv(data) return b"chunked 1.0,%d,%d\n%s%s" % (len(metadata), len(data_output), metadata, data_output) @@ -71,7 +72,10 @@ def build_empty_searchinfo(): def build_getinfo_chunk(): - return build_chunk({'action': 'getinfo', 'preview': False, 'searchinfo': build_empty_searchinfo()}) + return build_chunk({ + 'action': 'getinfo', + 'preview': False, + 'searchinfo': build_empty_searchinfo()}) def build_data_chunk(data, finished=True): @@ -88,7 +92,8 @@ def _build_data_csv(data): headers = set() for datum in data: headers.update(datum.keys()) - writer = csv.DictWriter(csvout, headers, dialect=splunklib.searchcommands.internals.CsvDialect) + writer = csv.DictWriter(csvout, headers, + dialect=splunklib.searchcommands.internals.CsvDialect) writer.writeheader() for datum in data: writer.writerow(datum) From 139d90743ca4e40a46be86adb035b4e79a52c10f Mon Sep 17 00:00:00 2001 From: David Poncelow Date: Thu, 3 Dec 2020 15:49:20 -0800 Subject: [PATCH 5/5] fix tests broken by taking lint advice --- tests/searchcommands/chunked_data_stream.py | 6 +++--- tests/searchcommands/test_generator_command.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/searchcommands/chunked_data_stream.py b/tests/searchcommands/chunked_data_stream.py index 2e2726fb2..ae5363eff 100644 --- a/tests/searchcommands/chunked_data_stream.py +++ b/tests/searchcommands/chunked_data_stream.py @@ -42,11 +42,11 @@ def __init__(self, stream): def read_chunk(self): header = self.stream.readline() - while header > 0 and header.strip() == b'': + while len(header) > 0 and header.strip() == b'': header = self.stream.readline() # Skip empty lines - - if not header == 0: + if len(header) == 0: raise EOFError + version, meta, data = header.rstrip().split(b',') metabytes = self.stream.read(int(meta)) databytes = self.stream.read(int(data)) diff --git a/tests/searchcommands/test_generator_command.py b/tests/searchcommands/test_generator_command.py index 7e675a70f..4af61a5d2 100644 --- a/tests/searchcommands/test_generator_command.py +++ b/tests/searchcommands/test_generator_command.py @@ -34,6 +34,9 @@ def generate(self): finished_seen = chunk.meta.get("finished", False) for row in chunk.data: seen.add(row["event_index"]) + print(out_stream.getvalue()) + print(expected) + print(seen) assert expected.issubset(seen) assert finished_seen