Skip to content

Added delete_by_query functionality. Original patch by an2deg. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 29, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 38 additions & 27 deletions pyelasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
>>> conn.delete("test-index", "test-type", 2)
{'_type': 'test-type', '_id': '2', 'ok': True, '_index': 'test-index'}

>>> conn.delete_by_query("test-index, "test-type", {"query_string": {"query": "name:joe OR name:bill"}})
{'ok': True, '_indices': {'test-index': {'_shards': {'successful': 5, 'failed': 0, 'total': 5}}}}

Delete the index

>>> conn.delete_index("test-index")
Expand Down Expand Up @@ -107,33 +110,33 @@ def get_version():
except ImportError:
# For Python >= 2.6
import json

from httplib import HTTPConnection
from urlparse import urlsplit
from urllib import urlencode
import logging


class ElasticSearch(object):
"""
ElasticSearch connection object.
"""

def __init__(self, url):
self.url = url
self.scheme, netloc, path, query, fragment = urlsplit(url)
netloc = netloc.split(':')
self.host = netloc[0]
if len(netloc) == 1:
self.host, self.port = netloc[0], None
self.host, self.port = netloc[0], 9200
else:
self.host, self.port = netloc
self.conn = None

def _conn(self):
if not self.conn:
self.conn = HTTPConnection(self.host, int(self.port))
return self.conn

def _send_request(self, method, path, body="", querystring_args={}):
if querystring_args:
path = "?".join([path, urlencode(querystring_args)])
Expand All @@ -148,7 +151,7 @@ def _send_request(self, method, path, body="", querystring_args={}):
response = self._prep_response(response.read())
logging.debug("got response %s" % response)
return response

def _make_path(self, path_components):
"""
Smush together the path components. Empty components will be ignored.
Expand All @@ -158,19 +161,19 @@ def _make_path(self, path_components):
if not path.startswith('/'):
path = '/'+path
return path

def _prep_request(self, body):
"""
Encodes body as json.
"""
return json.dumps(body)

def _prep_response(self, response):
"""
Parses json to a native python object.
"""
return json.loads(response)

def _query_call(self, query_type, query, body=None, indexes=['_all'], doc_types=[], **query_params):
"""
This can be used for search and count calls.
Expand All @@ -182,9 +185,9 @@ def _query_call(self, query_type, query, body=None, indexes=['_all'], doc_types=
path = self._make_path([','.join(indexes), ','.join(doc_types),query_type])
response = self._send_request('GET', path, body, querystring_args)
return response

## REST API

def index(self, doc, index, doc_type, id=None, force_insert=False):
"""
Index a typed JSON document into a specific index and make it searchable.
Expand All @@ -193,65 +196,73 @@ def index(self, doc, index, doc_type, id=None, force_insert=False):
querystring_args = {'op_type':'create'}
else:
querystring_args = {}

if id is None:
request_method = 'POST'
else:
request_method = 'PUT'
path = self._make_path([index, doc_type, id])
response = self._send_request(request_method, path, doc, querystring_args)
return response

def delete(self, index, doc_type, id):
"""
Delete a typed JSON document from a specific index based on its id.
"""
path = self._make_path([index, doc_type, id])
response = self._send_request('DELETE', path)
return response


def delete_by_query(self, index, doc_type, query):
"""
Delete a typed JSON documents from a specific index based on query
"""
path = self._make_path([index, doc_type, '_query'])
response = self._send_request('DELETE', path, query)
return response

def get(self, index, doc_type, id):
"""
Get a typed JSON document from an index based on its id.
"""
path = self._make_path([index, doc_type, id])
response = self._send_request('GET', path)
return response

def search(self, query, body=None, indexes=['_all'], doc_types=[], **query_params):
"""
Execute a search query against one or more indices and get back search hits.
query must be a dictionary that will convert to Query DSL
TODO: better api to reflect that the query can be either 'query' or 'body' argument.
"""
return self._query_call("_search", query, body, indexes, doc_types, **query_params)

def count(self, query, body=None, indexes=['_all'], doc_types=[], **query_params):
"""
Execute a query against one or more indices and get hits count.
"""
return self._query_call("_count", query, body, indexes, doc_types, **query_params)

def put_mapping(self, doc_type, mapping, indexes=['_all']):
"""
Register specific mapping definition for a specific type against one or more indices.
"""
path = self._make_path([','.join(indexes), doc_type,"_mapping"])
response = self._send_request('PUT', path, mapping)
return response

def terms(self, fields, indexes=['_all'], **query_params):
"""
Extract terms and their document frequencies from one or more fields.
The fields argument must be a list or tuple of fields.
For valid query params see:
For valid query params see:
http://www.elasticsearch.com/docs/elasticsearch/rest_api/terms/
"""
path = self._make_path([','.join(indexes), "_terms"])
query_params['fields'] = ','.join(fields)
response = self._send_request('GET', path, querystring_args=query_params)
return response

def morelikethis(self, index, doc_type, id, fields, **query_params):
"""
Execute a "more like this" search query against one or more fields and get back search hits.
Expand All @@ -260,9 +271,9 @@ def morelikethis(self, index, doc_type, id, fields, **query_params):
query_params['fields'] = ','.join(fields)
response = self._send_request('GET', path, querystring_args=query_params)
return response

## Index Admin API

def status(self, indexes=['_all']):
"""
Retrieve the status of one or more indices
Expand All @@ -279,14 +290,14 @@ def create_index(self, index, settings=None):
"""
response = self._send_request('PUT', index, settings)
return response

def delete_index(self, index):
"""
Deletes an index.
"""
response = self._send_request('DELETE', index)
return response

def flush(self, indexes=['_all'], refresh=None):
"""
Flushes one or more indices (clear memory)
Expand All @@ -313,7 +324,7 @@ def gateway_snapshot(self, indexes=['_all']):
path = self._make_path([','.join(indexes), '_gateway', 'snapshot'])
response = self._send_request('POST', path)
return response


def optimize(self, indexes=['_all'], **args):
"""
Expand All @@ -322,9 +333,9 @@ def optimize(self, indexes=['_all'], **args):
path = self._make_path([','.join(indexes), '_optimize'])
response = self._send_request('POST', path, querystring_args=args)
return response

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.debug("testing")
import doctest
doctest.testmod()
doctest.testmod()
49 changes: 33 additions & 16 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class ElasticSearchTestCase(unittest.TestCase):
def setUp(self):
self.conn = ElasticSearch('http://localhost:9200/')

def tearDown(self):
self.conn.delete_index("test-index")

Expand All @@ -16,7 +16,7 @@ def assertResultContains(self, result, expected):
self.assertEquals(value, result[key])

class IndexingTestCase(ElasticSearchTestCase):

def testIndexingWithID(self):
result = self.conn.index({"name":"Joe Tester"}, "test-index", "test-type", 1)
self.assertResultContains(result, {'_type': 'test-type', '_id': '1', 'ok': True, '_index': 'test-index'} )
Expand All @@ -26,46 +26,63 @@ def testIndexingWithoutID(self):
self.assertResultContains(result, {'_type': 'test-type', 'ok': True, '_index': 'test-index'} )
# should have an id of some value assigned.
self.assertTrue(result.has_key('_id') and result['_id'])

def testExplicitIndexCreate(self):
result = self.conn.create_index("test-index")
self.assertResultContains(result, {'acknowledged': True, 'ok': True})

def testDeleteByID(self):
self.conn.index({"name":"Joe Tester"}, "test-index", "test-type", 1)
self.conn.refresh(["test-index"])
result = self.conn.delete("test-index", "test-type", 1)
self.assertResultContains(result, {'_type': 'test-type', '_id': '1', 'ok': True, '_index': 'test-index'})


def testDeleteByQuery(self):
self.conn.index({"name":"Joe Tester"}, "test-index", "test-type", 1)
self.conn.index({"name":"Bill Baloney"}, "test-index", "test-type", 2)
self.conn.index({"name":"Horace Humdinger"}, "test-index", "test-type", 3)
self.conn.refresh(["test-index"])

self.conn.refresh(["test-index"])
result = self.conn.count("*:*", indexes=['test-index'])
self.assertResultContains(result, {'count': 3})

result = self.conn.delete_by_query("test-index", "test-type", {"query_string": {"query": "name:joe OR name:bill"}})
self.assertResultContains(result, {'ok': True})

self.conn.refresh(["test-index"])
result = self.conn.count("*:*", indexes=['test-index'])
self.assertResultContains(result, {'count': 1})

def testDeleteIndex(self):
self.conn.create_index("another-index")
result = self.conn.delete_index("another-index")
self.assertResultContains(result, {'acknowledged': True, 'ok': True})

def testCannotCreateExistingIndex(self):
self.conn.create_index("another-index")
result = self.conn.create_index("another-index")
self.conn.delete_index("another-index")
self.assertResultContains(result, {'error': '[another-index] Already exists'})

def testPutMapping(self):
result = self.conn.create_index("test-index")
result = self.conn.create_index("test-index")
result = self.conn.put_mapping("test-type", {"test-type" : {"properties" : {"name" : {"type" : "string", "store" : "yes"}}}}, indexes=["test-index"])
self.assertResultContains(result, {'acknowledged': True, 'ok': True})

def testIndexStatus(self):
self.conn.create_index("another-index")
result = self.conn.status(["another-index"])
self.conn.delete_index("another-index")
self.assertTrue(result.has_key('indices'))
self.assertResultContains(result, {'ok': True})

def testIndexFlush(self):
self.conn.create_index("another-index")
result = self.conn.flush(["another-index"])
self.conn.delete_index("another-index")
self.assertResultContains(result, {'ok': True})

def testIndexRefresh(self):
self.conn.create_index("another-index")
result = self.conn.refresh(["another-index"])
Expand All @@ -78,14 +95,14 @@ def testIndexOptimize(self):
self.conn.delete_index("another-index")
self.assertResultContains(result, {'ok': True})


class SearchTestCase(ElasticSearchTestCase):
def setUp(self):
super(SearchTestCase, self).setUp()
self.conn.index({"name":"Joe Tester"}, "test-index", "test-type", 1)
self.conn.index({"name":"Bill Baloney"}, "test-index", "test-type", 2)
self.conn.refresh(["test-index"])

def testGetByID(self):
result = self.conn.get("test-index", "test-type", 1)
self.assertResultContains(result, {'_type': 'test-type', '_id': '1', '_source': {'name': 'Joe Tester'}, '_index': 'test-index'})
Expand All @@ -101,20 +118,20 @@ def testSearchByField(self):
def testTermsByField(self):
result = self.conn.terms(['name'])
self.assertResultContains(result, {'docs': {'max_doc': 2, 'num_docs': 2, 'deleted_docs': 0}, 'fields': {'name': {'terms': [{'term': 'baloney', 'doc_freq': 1}, {'term': 'bill', 'doc_freq': 1}, {'term': 'joe', 'doc_freq': 1}, {'term': 'tester', 'doc_freq': 1}]}}})

def testTermsByIndex(self):
result = self.conn.terms(['name'], indexes=['test-index'])
self.assertResultContains(result, {'docs': {'max_doc': 2, 'num_docs': 2, 'deleted_docs': 0}, 'fields': {'name': {'terms': [{'term': 'baloney', 'doc_freq': 1}, {'term': 'bill', 'doc_freq': 1}, {'term': 'joe', 'doc_freq': 1}, {'term': 'tester', 'doc_freq': 1}]}}})

def testTermsMinFreq(self):
result = self.conn.terms(['name'], min_freq=2)
self.assertResultContains(result, {'docs': {'max_doc': 2, 'num_docs': 2, 'deleted_docs': 0}, 'fields': {'name': {'terms': []}}})

def testMLT(self):
self.conn.index({"name":"Joe Test"}, "test-index", "test-type", 3)
self.conn.refresh(["test-index"])
result = self.conn.morelikethis("test-index", "test-type", 1, ['name'], min_term_freq=1, min_doc_freq=1)
self.assertResultContains(result, {'hits': {'hits': [{'_type': 'test-type', '_id': '3', '_source': {'name': 'Joe Test'}, '_index': 'test-index'}], 'total': 1}})

if __name__ == "__main__":
unittest.main()
unittest.main()