From fd977be6f13319601255aeab60faaa1d7b8ab57d Mon Sep 17 00:00:00 2001 From: Itay Neeman Date: Wed, 18 Nov 2015 13:15:31 -0800 Subject: [PATCH] Add KV Store support to the Python SDK This change adds KV Store support to the Python SDK, which includes two main pieces: 1. A couple of small fixes in binding.py, specifically around allowing us to make POST requests which are not encoded with application/x-www-form-urlencoded. 2. A set of classes in client.py to make it easy to access the KV Store API, e.g. for accessing configuration, data, etc. Finally, it also adds tests to verify these fixes, and an example to show how to use it. --- docs/client.rst | 14 ++- docs/index.rst | 6 ++ examples/kvstore.py | 75 ++++++++++++++ splunklib/binding.py | 8 +- splunklib/client.py | 199 ++++++++++++++++++++++++++++++++++++ tests/test_examples.py | 5 + tests/test_kvstore_batch.py | 81 +++++++++++++++ tests/test_kvstore_conf.py | 99 ++++++++++++++++++ tests/test_kvstore_data.py | 102 ++++++++++++++++++ 9 files changed, 587 insertions(+), 2 deletions(-) create mode 100644 examples/kvstore.py create mode 100755 tests/test_kvstore_batch.py create mode 100755 tests/test_kvstore_conf.py create mode 100755 tests/test_kvstore_data.py diff --git a/docs/client.rst b/docs/client.rst index 4233e4ecf..45cb6bdf4 100644 --- a/docs/client.rst +++ b/docs/client.rst @@ -68,6 +68,18 @@ splunklib.client :members: create, export, itemmeta, oneshot :inherited-members: +.. autoclass:: KVStoreCollection + :members: data, update_index, update_field + :inherited-members: + +.. autoclass:: KVStoreCollectionData + :members: query, query_by_id, insert, delete, delete_by_id, update, batch_save + :inherited-members: + +.. autoclass:: KVStoreCollections + :members: create + :inherited-members: + .. autoclass:: Loggers :members: itemmeta :inherited-members: @@ -110,7 +122,7 @@ splunklib.client :inherited-members: .. autoclass:: Service - :members: apps, confs, capabilities, event_types, fired_alerts, indexes, info, inputs, job, jobs, loggers, messages, modular_input_kinds, parse, restart, restart_required, roles, search, saved_searches, settings, splunk_version, storage_passwords, users + :members: apps, confs, capabilities, event_types, fired_alerts, indexes, info, inputs, job, jobs, kvstore, loggers, messages, modular_input_kinds, parse, restart, restart_required, roles, search, saved_searches, settings, splunk_version, storage_passwords, users :inherited-members: .. autoclass:: Settings diff --git a/docs/index.rst b/docs/index.rst index 75c9a2b4a..b04f51b1d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -69,6 +69,12 @@ For more information, see the `Splunk Developer Portal 0: url = url + UrlEncoded('?' + _encode(**kwargs), skip_encode=True) diff --git a/splunklib/client.py b/splunklib/client.py index 0e5b287ba..2e44c835f 100644 --- a/splunklib/client.py +++ b/splunklib/client.py @@ -653,6 +653,14 @@ def splunk_version(self): self._splunk_version = tuple([int(p) for p in self.info['version'].split('.')]) return self._splunk_version + @property + def kvstore(self): + """Returns the collection of KV Store collections. + + :return: A :class:`KVStoreCollections` collection of :class:`KVStoreCollection` entities. + """ + return KVStoreCollections(self) + @property def users(self): """Returns the collection of users. @@ -3518,3 +3526,194 @@ def package(self): def updateInfo(self): """Returns any update information that is available for the app.""" return self._run_action("update") + +class KVStoreCollections(Collection): + def __init__(self, service): + Collection.__init__(self, service, 'storage/collections/config', item=KVStoreCollection) + + def create(self, name, indexes = {}, fields = {}, **kwargs): + """Creates a KV Store Collection. + + :param name: name of collection to create + :type name: ``string`` + :param indexes: dictionary of index definitions + :type indexes: ``dict`` + :param fields: dictionary of field definitions + :type fields: ``dict`` + :param kwargs: a dictionary of additional parameters specifying indexes and field definitions + :type kwargs: ``dict`` + + :return: Result of POST request + """ + for k, v in indexes.iteritems(): + if isinstance(v, dict): + v = json.dumps(v) + kwargs['index.' + k] = v + for k, v in fields.iteritems(): + kwargs['field.' + k] = v + return self.post(name=name, **kwargs) + +class KVStoreCollection(Entity): + @property + def data(self): + """Returns data object for this Collection. + + :rtype: :class:`KVStoreData` + """ + return KVStoreCollectionData(self) + + def update_index(self, name, value): + """Changes the definition of a KV Store index. + + :param name: name of index to change + :type name: ``string`` + :param value: new index definition + :type value: ``dict`` or ``string`` + + :return: Result of POST request + """ + kwargs = {} + kwargs['index.' + name] = value if isinstance(value, basestring) else json.dumps(value) + return self.post(**kwargs) + + def update_field(self, name, value): + """Changes the definition of a KV Store field. + + :param name: name of field to change + :type name: ``string`` + :param value: new field definition + :type value: ``string`` + + :return: Result of POST request + """ + kwargs = {} + kwargs['field.' + name] = value + return self.post(**kwargs) + +class KVStoreCollectionData(object): + """This class represents the data endpoint for a KVStoreCollection. + + Retrieve using :meth:`KVStoreCollection.data` + """ + JSON_HEADER = [('Content-Type', 'application/json')] + + def __init__(self, collection): + self.service = collection.service + self.collection = collection + self.owner, self.app, self.sharing = collection._proper_namespace() + self.path = 'storage/collections/data/' + UrlEncoded(self.collection.name) + '/' + + def _get(self, url, **kwargs): + return self.service.get(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + + def _post(self, url, **kwargs): + return self.service.post(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + + def _delete(self, url, **kwargs): + return self.service.delete(self.path + url, owner=self.owner, app=self.app, sharing=self.sharing, **kwargs) + + def query(self, **query): + """ + Gets the results of query, with optional parameters sort, limit, skip, and fields. + + :param query: Optional parameters. Valid options are sort, limit, skip, and fields + :type query: ``dict`` + + :return: Array of documents retrieved by query. + :rtype: ``array`` + """ + return json.loads(self._get('', **query).body.read()) + + def query_by_id(self, id): + """ + Returns object with _id = id. + + :param id: Value for ID. If not a string will be coerced to string. + :type id: ``string`` + + :return: Document with id + :rtype: ``dict`` + """ + return json.loads(self._get(UrlEncoded(str(id))).body.read()) + + def insert(self, data): + """ + Inserts item into this collection. An _id field will be generated if not assigned in the data. + + :param data: Document to insert + :type data: ``string`` + + :return: _id of inserted object + :rtype: ``dict`` + """ + return json.loads(self._post('', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read()) + + def delete(self, query=None): + """ + Deletes all data in collection if query is absent. Otherwise, deletes all data matched by query. + + :param query: Query to select documents to delete + :type query: ``string`` + + :return: Result of DELETE request + """ + return self._delete('', **({'query': query}) if query else {}) + + def delete_by_id(self, id): + """ + Deletes document that has _id = id. + + :param id: id of document to delete + :type id: ``string`` + + :return: Result of DELETE request + """ + return self._delete(UrlEncoded(str(id))) + + def update(self, id, data): + """ + Replaces document with _id = id with data. + + :param id: _id of document to update + :type id: ``string`` + :param data: the new document to insert + :type data: ``string`` + + :return: id of replaced document + :rtype: ``dict`` + """ + return json.loads(self._post(UrlEncoded(str(id)), headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read()) + + def batch_find(self, *dbqueries): + """ + Returns array of results from queries dbqueries. + + :param dbqueries: Array of individual queries as dictionaries + :type dbqueries: ``array`` of ``dict`` + + :return: Results of each query + :rtype: ``array`` of ``array`` + """ + if len(dbqueries) < 1: + raise Exception('Must have at least one query.') + + data = json.dumps(dbqueries) + + return json.loads(self._post('batch_find', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read()) + + def batch_save(self, *documents): + """ + Inserts or updates every document specified in documents. + + :param documents: Array of documents to save as dictionaries + :type documents: ``array`` of ``dict`` + + :return: Results of update operation as overall stats + :rtype: ``dict`` + """ + if len(documents) < 1: + raise Exception('Must have at least one document.') + + data = json.dumps(documents) + + return json.loads(self._post('batch_save', headers=KVStoreCollectionData.JSON_HEADER, body=data).body.read()) diff --git a/tests/test_examples.py b/tests/test_examples.py index 70837da98..9470ec6d7 100755 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -182,6 +182,11 @@ def test_job(self): "job.py", "job.py list", "job.py list @0") + + def test_kvstore(self): + self.check_commands( + "kvstore.py --help", + "kvstore.py") def test_loggers(self): self.check_commands( diff --git a/tests/test_kvstore_batch.py b/tests/test_kvstore_batch.py new file mode 100755 index 000000000..cbc806f28 --- /dev/null +++ b/tests/test_kvstore_batch.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# +# Copyright 2011-2014 Splunk, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"): you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testlib +try: + import unittest +except ImportError: + import unittest2 as unittest +import splunklib.client as client + +class KVStoreBatchTestCase(testlib.SDKTestCase): + def setUp(self): + super(KVStoreBatchTestCase, self).setUp() + self.service.namespace['owner'] = 'nobody' + self.service.namespace['app'] = 'search' + confs = self.service.kvstore + if ('test' in confs): + confs['test'].delete() + confs.create('test') + + self.col = confs['test'].data + + def test_insert_find_update_data(self): + data = map(lambda x: {'_key': str(x), 'data': '#' + str(x), 'num': x}, range(1000)) + self.col.batch_save(*data) + + testData = self.col.query(sort='num') + self.assertEqual(len(testData), 1000) + + for x in range(1000): + self.assertEqual(testData[x]['_key'], str(x)) + self.assertEqual(testData[x]['data'], '#' + str(x)) + self.assertEqual(testData[x]['num'], x) + + data = map(lambda x: {'_key': str(x), 'data': '#' + str(x + 1), 'num': x + 1}, range(1000)) + self.col.batch_save(*data) + + testData = self.col.query(sort='num') + self.assertEqual(len(testData), 1000) + + for x in range(1000): + self.assertEqual(testData[x]['_key'], str(x)) + self.assertEqual(testData[x]['data'], '#' + str(x + 1)) + self.assertEqual(testData[x]['num'], x + 1) + + query = map(lambda x: {"query": {"num": x + 1}}, range(100)) + testData = self.col.batch_find(*query) + + self.assertEqual(len(testData), 100) + testData.sort(key=lambda x: x[0]['num']) + + for x in range(100): + self.assertEqual(testData[x][0]['_key'], str(x)) + self.assertEqual(testData[x][0]['data'], '#' + str(x + 1)) + self.assertEqual(testData[x][0]['num'], x + 1) + + + def tearDown(self): + confs = self.service.kvstore + if ('test' in confs): + confs['test'].delete() + +if __name__ == "__main__": + try: + import unittest2 as unittest + except ImportError: + import unittest + unittest.main() \ No newline at end of file diff --git a/tests/test_kvstore_conf.py b/tests/test_kvstore_conf.py new file mode 100755 index 000000000..804b3262b --- /dev/null +++ b/tests/test_kvstore_conf.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# +# Copyright 2011-2014 Splunk, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"): you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testlib +try: + import unittest +except ImportError: + import unittest2 as unittest +import splunklib.client as client + +class KVStoreConfTestCase(testlib.SDKTestCase): + def setUp(self): + super(KVStoreConfTestCase, self).setUp() + self.service.namespace['owner'] = 'nobody' + self.service.namespace['app'] = 'search' + self.confs = self.service.kvstore + if ('test' in self.confs): + self.confs['test'].delete() + + def test_owner_restriction(self): + self.service.namespace['owner'] = 'admin' + self.assertRaises(client.HTTPError, lambda: self.confs.list()) + self.service.namespace['owner'] = 'nobody' + + def test_create_delete_collection(self): + self.confs.create('test') + self.assertTrue('test' in self.confs) + self.confs['test'].delete() + self.assertTrue(not 'test' in self.confs) + + def test_update_collection(self): + self.confs.create('test') + self.confs['test'].post(**{'accelerated_fields.ind1': '{"a": 1}', 'field.a': 'number'}) + self.assertEqual(self.confs['test']['field.a'], 'number') + self.assertEqual(self.confs['test']['accelerated_fields.ind1'], '{"a": 1}') + self.confs['test'].delete() + + + def test_update_fields(self): + self.confs.create('test') + self.confs['test'].post(**{'field.a': 'number'}) + self.assertEqual(self.confs['test']['field.a'], 'number') + self.confs['test'].update_field('a', 'string') + self.assertEqual(self.confs['test']['field.a'], 'string') + self.confs['test'].delete() + + + def test_create_unique_collection(self): + self.confs.create('test') + self.assertTrue('test' in self.confs) + self.assertRaises(client.HTTPError, lambda: self.confs.create('test')) + self.confs['test'].delete() + + def test_overlapping_collections(self): + self.service.namespace['app'] = 'system' + self.confs.create('test') + self.service.namespace['app'] = 'search' + self.confs.create('test') + self.assertEqual(self.confs['test']['eai:appName'], 'search') + self.service.namespace['app'] = 'system' + self.assertEqual(self.confs['test']['eai:appName'], 'system') + self.service.namespace['app'] = 'search' + self.confs['test'].delete() + self.confs['test'].delete() + + """ + def test_create_accelerated_fields_fields(self): + self.confs.create('test', indexes={'foo': '{"foo": 1}', 'bar': {'bar': -1}}, **{'field.foo': 'string'}) + self.assertEqual(self.confs['test']['accelerated_fields.foo'], '{"foo": 1}') + self.assertEqual(self.confs['test']['field.foo'], 'string') + self.assertRaises(client.HTTPError, lambda: self.confs['test'].post(**{'accelerated_fields.foo': 'THIS IS INVALID'})) + self.assertEqual(self.confs['test']['accelerated_fields.foo'], '{"foo": 1}') + self.confs['test'].update_accelerated_fields('foo', '') + self.assertEqual(self.confs['test']['accelerated_fields.foo'], None) + """ + + def tearDown(self): + if ('test' in self.confs): + self.confs['test'].delete() + +if __name__ == "__main__": + try: + import unittest2 as unittest + except ImportError: + import unittest + unittest.main() \ No newline at end of file diff --git a/tests/test_kvstore_data.py b/tests/test_kvstore_data.py new file mode 100755 index 000000000..92fd54b24 --- /dev/null +++ b/tests/test_kvstore_data.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Copyright 2011-2014 Splunk, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"): you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import testlib +try: + import unittest +except ImportError: + import unittest2 as unittest +import splunklib.client as client + +class KVStoreDataTestCase(testlib.SDKTestCase): + def setUp(self): + super(KVStoreDataTestCase, self).setUp() + self.service.namespace['owner'] = 'nobody' + self.service.namespace['app'] = 'search' + self.confs = self.service.kvstore + if ('test' in self.confs): + self.confs['test'].delete() + self.confs.create('test') + + self.col = self.confs['test'].data + + def test_insert_query_delete_data(self): + for x in range(50): + self.col.insert(json.dumps({'_key': str(x), 'data': '#' + str(x), 'num': x})) + self.assertEqual(len(self.col.query()), 50) + self.assertEqual(len(self.col.query(query='{"num": 10}')), 1) + self.assertEqual(self.col.query(query='{"num": 10}')[0]['data'], '#10') + self.col.delete(json.dumps({'num': {'$gt': 39}})) + self.assertEqual(len(self.col.query()), 40) + self.col.delete() + self.assertEqual(len(self.col.query()), 0) + + def test_update_delete_data(self): + for x in range(50): + self.col.insert(json.dumps({'_key': str(x), 'data': '#' + str(x), 'num': x})) + self.assertEqual(len(self.col.query()), 50) + self.assertEqual(self.col.query(query='{"num": 49}')[0]['data'], '#49') + self.col.update(str(49), json.dumps({'data': '#50', 'num': 50})) + self.assertEqual(len(self.col.query()), 50) + self.assertEqual(self.col.query(query='{"num": 50}')[0]['data'], '#50') + self.assertEqual(len(self.col.query(query='{"num": 49}')), 0) + self.col.delete_by_id(49) + self.assertEqual(len(self.col.query(query='{"num": 50}')), 0) + + def test_query_data(self): + if ('test1' in self.confs): + self.confs['test1'].delete() + self.confs.create('test1') + self.col = self.confs['test1'].data + for x in range(10): + self.col.insert(json.dumps({'_key': str(x), 'data': '#' + str(x), 'num': x})) + data = self.col.query(sort='data:-1', skip=9) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]['data'], '#0') + data = self.col.query(sort='data:1') + self.assertEqual(data[0]['data'], '#0') + data = self.col.query(limit=2, skip=9) + self.assertEqual(len(data), 1) + + + def test_invalid_insert_update(self): + self.assertRaises(client.HTTPError, lambda: self.col.insert('NOT VALID DATA')) + id = self.col.insert(json.dumps({'foo': 'bar'}))['_key'] + self.assertRaises(client.HTTPError, lambda: self.col.update(id, 'NOT VALID DATA')) + self.assertEqual(self.col.query_by_id(id)['foo'], 'bar') + + def test_params_data_type_conversion(self): + self.confs['test'].post(**{'field.data': 'number', 'accelerated_fields.data': '{"data": -1}'}) + for x in range(50): + self.col.insert(json.dumps({'_key': str(x), 'data': str(x), 'ignore': x})) + data = self.col.query(sort='data:-1', limit=20, fields='data,_id:0', skip=10) + self.assertEqual(len(data), 20) + for x in range(20): + self.assertEqual(data[x]['data'], 39 - x) + self.assertTrue(not 'ignore' in data[x]) + self.assertTrue(not '_key' in data[x]) + + def tearDown(self): + if ('test' in self.confs): + self.confs['test'].delete() + +if __name__ == "__main__": + try: + import unittest2 as unittest + except ImportError: + import unittest + unittest.main() \ No newline at end of file