Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK Support for splunkd search API changes #468

Merged
merged 15 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ password=changed!
# Access scheme (default: https)
scheme=https
# Your version of Splunk (default: 6.2)
version=8.0
version=9.0
# Bearer token for authentication
#bearerToken="<Bearer-token>"
# Session key for authentication
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- ubuntu-latest
python: [ 2.7, 3.7 ]
splunk-version:
- "8.0"
- "8.2"
- "latest"
fail-fast: false

Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ If you're seeing some unexpected behavior with this project, please create an [i
1. Version of this project you're using (ex: 1.5.0)
2. Platform version (ex: Windows Server 2012 R2)
3. Framework version (ex: Python 3.7)
4. Splunk Enterprise version (ex: 8.0)
4. Splunk Enterprise version (ex: 9.0)
5. Other relevant information (ex: local/remote environment, Splunk network configuration, standalone or distributed deployment, are load balancers used)

Alternatively, if you have a Splunk question please ask on [Splunk Answers](https://community.splunk.com/t5/Splunk-Development/ct-p/developer-tools).
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Install the sources you cloned from GitHub:
You'll need `docker` and `docker-compose` to get up and running using this method.

```
make up SPLUNK_VERSION=8.0
make up SPLUNK_VERSION=9.0
make wait_up
make test
make down
Expand Down Expand Up @@ -107,7 +107,7 @@ here is an example of .env file:
# Access scheme (default: https)
scheme=https
# Your version of Splunk Enterprise
version=8.0
version=9.0
# Bearer token for authentication
#bearerToken=<Bearer-token>
# Session key for authentication
Expand Down
93 changes: 86 additions & 7 deletions splunklib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import datetime
import json
import logging
import re
import socket
from datetime import datetime, timedelta
from time import sleep
Expand Down Expand Up @@ -99,6 +100,7 @@
PATH_INDEXES = "data/indexes/"
PATH_INPUTS = "data/inputs/"
PATH_JOBS = "search/jobs/"
PATH_JOBS_V2 = "search/v2/jobs/"
PATH_LOGGER = "/services/server/logger/"
PATH_MESSAGES = "messages/"
PATH_MODULAR_INPUTS = "data/modular-inputs"
Expand Down Expand Up @@ -570,6 +572,8 @@ def parse(self, query, **kwargs):
:type kwargs: ``dict``
:return: A semantic map of the parsed search query.
"""
if self.splunk_version >= (9,):
return self.post("search/v2/parser", q=query, **kwargs)
return self.get("search/parser", q=query, **kwargs)

def restart(self, timeout=None):
Expand Down Expand Up @@ -741,6 +745,25 @@ def __init__(self, service, path):
self.service = service
self.path = path

def get_api_version(self, path):
"""Return the API version of the service used in the provided path.

Args:
path (str): A fully-qualified endpoint path (for example, "/services/search/jobs").

Returns:
int: Version of the API (for example, 1)
"""
# Default to v1 if undefined in the path
# For example, "/services/search/jobs" is using API v1
api_version = 1

versionSearch = re.search('(?:servicesNS\/[^/]+\/[^/]+|services)\/[^/]+\/v(\d+)\/', path)
if versionSearch:
api_version = int(versionSearch.group(1))

return api_version

def get(self, path_segment="", owner=None, app=None, sharing=None, **query):
"""Performs a GET operation on the path segment relative to this endpoint.

Expand Down Expand Up @@ -803,6 +826,22 @@ def get(self, path_segment="", owner=None, app=None, sharing=None, **query):
app=app, sharing=sharing)
# ^-- This was "%s%s" % (self.path, path_segment).
# That doesn't work, because self.path may be UrlEncoded.

# Get the API version from the path
api_version = self.get_api_version(path)

# Search API v2+ fallback to v1:
# - In v2+, /results_preview, /events and /results do not support search params.
# - Fallback from v2+ to v1 if Splunk Version is < 9.
# if api_version >= 2 and ('search' in query and path.endswith(tuple(["results_preview", "events", "results"])) or self.service.splunk_version < (9,)):
# path = path.replace(PATH_JOBS_V2, PATH_JOBS)

if api_version == 1:
if isinstance(path, UrlEncoded):
path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True)
else:
path = path.replace(PATH_JOBS_V2, PATH_JOBS)

return self.service.get(path,
owner=owner, app=app, sharing=sharing,
**query)
Expand Down Expand Up @@ -855,13 +894,29 @@ def post(self, path_segment="", owner=None, app=None, sharing=None, **query):
apps.get('nonexistant/path') # raises HTTPError
s.logout()
apps.get() # raises AuthenticationError
"""
"""
if path_segment.startswith('/'):
path = path_segment
else:
if not self.path.endswith('/') and path_segment != "":
self.path = self.path + '/'
path = self.service._abspath(self.path + path_segment, owner=owner, app=app, sharing=sharing)

# Get the API version from the path
api_version = self.get_api_version(path)

# Search API v2+ fallback to v1:
# - In v2+, /results_preview, /events and /results do not support search params.
# - Fallback from v2+ to v1 if Splunk Version is < 9.
# if api_version >= 2 and ('search' in query and path.endswith(tuple(["results_preview", "events", "results"])) or self.service.splunk_version < (9,)):
# path = path.replace(PATH_JOBS_V2, PATH_JOBS)

if api_version == 1:
if isinstance(path, UrlEncoded):
path = UrlEncoded(path.replace(PATH_JOBS_V2, PATH_JOBS), skip_encode=True)
else:
path = path.replace(PATH_JOBS_V2, PATH_JOBS)

return self.service.post(path, owner=owner, app=app, sharing=sharing, **query)


Expand Down Expand Up @@ -2664,7 +2719,14 @@ def oneshot(self, path, **kwargs):
class Job(Entity):
"""This class represents a search job."""
def __init__(self, service, sid, **kwargs):
path = PATH_JOBS + sid
# Default to v2 in Splunk Version 9+
path = "{path}{sid}"
# Formatting path based on the Splunk Version
if service.splunk_version < (9,):
path = path.format(path=PATH_JOBS, sid=sid)
else:
path = path.format(path=PATH_JOBS_V2, sid=sid)

Entity.__init__(self, service, path, skip_refresh=True, **kwargs)
self.sid = sid

Expand Down Expand Up @@ -2718,7 +2780,11 @@ def events(self, **kwargs):
:return: The ``InputStream`` IO handle to this job's events.
"""
kwargs['segmentation'] = kwargs.get('segmentation', 'none')
return self.get("events", **kwargs).body

# Search API v1(GET) and v2(POST)
if self.service.splunk_version < (9,):
return self.get("events", **kwargs).body
return self.post("events", **kwargs).body

def finalize(self):
"""Stops the job and provides intermediate results for retrieval.
Expand Down Expand Up @@ -2806,7 +2872,11 @@ def results(self, **query_params):
:return: The ``InputStream`` IO handle to this job's results.
"""
query_params['segmentation'] = query_params.get('segmentation', 'none')
return self.get("results", **query_params).body

# Search API v1(GET) and v2(POST)
if self.service.splunk_version < (9,):
return self.get("results", **query_params).body
return self.post("results", **query_params).body

def preview(self, **query_params):
"""Returns a streaming handle to this job's preview search results.
Expand Down Expand Up @@ -2847,7 +2917,11 @@ def preview(self, **query_params):
:return: The ``InputStream`` IO handle to this job's preview results.
"""
query_params['segmentation'] = query_params.get('segmentation', 'none')
return self.get("results_preview", **query_params).body

# Search API v1(GET) and v2(POST)
if self.service.splunk_version < (9,):
return self.get("results_preview", **query_params).body
return self.post("results_preview", **query_params).body

def searchlog(self, **kwargs):
"""Returns a streaming handle to this job's search log.
Expand Down Expand Up @@ -2936,7 +3010,12 @@ class Jobs(Collection):
"""This class represents a collection of search jobs. Retrieve this
collection using :meth:`Service.jobs`."""
def __init__(self, service):
Collection.__init__(self, service, PATH_JOBS, item=Job)
# Splunk 9 introduces the v2 endpoint
if service.splunk_version >= (9,):
path = PATH_JOBS_V2
else:
path = PATH_JOBS
Collection.__init__(self, service, path, item=Job)
# The count value to say list all the contents of this
# Collection is 0, not -1 as it is on most.
self.null_count = 0
Expand Down Expand Up @@ -3774,4 +3853,4 @@ def batch_save(self, *documents):

data = json.dumps(documents)

return json.loads(self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8'))
return json.loads(self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read().decode('utf-8'))
12 changes: 0 additions & 12 deletions splunkrc.spec

This file was deleted.

23 changes: 23 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,29 @@ def test_search_invalid_query_as_json(self):
except Exception as e:
self.fail("Got some unexpected error. %s" % e.message)

def test_v1_job_fallback(self):
self.assertEventuallyTrue(self.job.is_done)
self.assertLessEqual(int(self.job['eventCount']), 3)

preview_stream = self.job.preview(output_mode='json', search='| head 1')
preview_r = results.JSONResultsReader(preview_stream)
self.assertFalse(preview_r.is_preview)

events_stream = self.job.events(output_mode='json', search='| head 1')
events_r = results.JSONResultsReader(events_stream)

results_stream = self.job.results(output_mode='json', search='| head 1')
results_r = results.JSONResultsReader(results_stream)

n_events = len([x for x in events_r if isinstance(x, dict)])
n_preview = len([x for x in preview_r if isinstance(x, dict)])
n_results = len([x for x in results_r if isinstance(x, dict)])

# Fallback test for Splunk Version 9+
if self.service.splunk_version[0] >= 9:
self.assertGreaterEqual(9, self.service.splunk_version[0])
self.assertEqual(n_events, n_preview, n_results)


class TestResultsReader(unittest.TestCase):
def test_results_reader(self):
Expand Down
5 changes: 5 additions & 0 deletions tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def test_parse(self):
# objectified form of the results, but for now there's
# nothing to test but a good response code.
response = self.service.parse('search * abc="def" | dedup abc')

# Splunk Version 9+ using API v2: search/v2/parser
if self.service.splunk_version[0] >= 9:
self.assertGreaterEqual(9, self.service.splunk_version[0])

self.assertEqual(response.status, 200)

def test_parse_fail(self):
Expand Down