Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed Aug 22, 2016
1 parent 2166fec commit 7db42f6
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 50 deletions.
38 changes: 12 additions & 26 deletions arango/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,18 @@ def status(self):
job is still in the queue), ``"done"`` (the job completed or raised
an exception)
:rtype: str
:raises arango.exceptions.AsyncJobInvalidError: if the async job is
not valid
:raises arango.exceptions.AsyncJobNotFoundError: if the async job
cannot be found in the server
:raises arango.exceptions.AsyncJobStatusError: if the status of the
async job cannot be retrieved from the server
"""
res = self._conn.get('/_api/job/{}'.format(self._id))
res = self._conn.get('/_api/job/{}'.format(self.id))
if res.status_code == 204:
return 'pending'
elif res.status_code in HTTP_OK:
return 'done'
elif res.status_code == 400:
raise AsyncJobInvalidError(res)
raise AsyncJobStatusError(res, 'Job {} invalid'.format(self.id))
elif res.status_code == 404:
raise AsyncJobNotFoundError(res)
raise AsyncJobStatusError(res, 'Job {} not found'.format(self.id))
else:
raise AsyncJobStatusError(res)

Expand All @@ -175,39 +171,29 @@ def result(self):
:returns: the result or the exception from the async job
:rtype: object
:raises arango.exceptions.AsyncJobInvalidError: if the async job is
not valid
:raises arango.exceptions.AsyncJobNotFoundError: if the async job
cannot be found in the server
:raises arango.exceptions.AsyncJobNotDoneError: if the async job is
still pending in the queue
:raises arango.exceptions.AsyncJobResultError: if the result of the
async job cannot be retrieved from the server
.. note::
An async job result will automatically be cleared from the server
once fetched and will *not* be available in subsequent calls.
"""
_id = self._id
res = self._conn.put('/_api/job/{}'.format(_id))
if (
res.status_code == 404 and
res.error_code == 404 and
res.error_message == 'not found'
):
raise AsyncJobNotFoundError(res, 'Job {} not found'.format(_id))
elif res.body is not None:
res = self._conn.put('/_api/job/{}'.format(self.id))
if 'X-Arango-Async-Id' in res.headers:
try:
result = self._handler(res)
except Exception as error:
return error
else:
return result
elif res.status_code == 204:
raise AsyncJobNotDoneError(res, 'Job {} pending'.format(_id))
raise AsyncJobResultError(res, 'Job {} incomplete'.format(self.id))
elif res.status_code == 400:
raise AsyncJobInvalidError(res, 'Job {} invalid'.format(_id))
raise AsyncJobResultError(res, 'Failed to query job {}'.format(_id))
raise AsyncJobResultError(res, 'Job {} invalid'.format(self.id))
elif res.status_code == 404:
raise AsyncJobResultError(res, 'Job {} not found'.format(self.id))
else:
raise AsyncJobResultError(res)

def cancel(self, ignore_missing=False):
"""Cancel the async job if it is still pending.
Expand All @@ -232,7 +218,7 @@ def cancel(self, ignore_missing=False):
if res.status_code == 200:
return True
elif res.status_code == 400:
raise AsyncJobInvalidError(res, 'Job {} invalid'.format(_id))
raise AsyncJobInvalidError(res, 'Job {} is invalid'.format(_id))
elif res.status_code == 404:
if ignore_missing:
return False
Expand Down
4 changes: 2 additions & 2 deletions arango/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ def close(self, ignore_missing=True):
"""
if not self.id:
return False
res = self._conn.delete("/api/cursor/{}".format(self.id))
res = self._conn.delete("/_api/cursor/{}".format(self.id))
if res.status_code not in HTTP_OK:
if res.status_code == 404 and ignore_missing:
return False
raise CursorCloseError(res)
return True # pragma: no cover
return True
2 changes: 1 addition & 1 deletion docs/task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ more information on the HTTP REST API for task management visit this
name='test_task',
command='''
var task = function(params){
var db =require('@arangodb');
var db = require('@arangodb');
db.print(params);
}
task(params);
Expand Down
99 changes: 80 additions & 19 deletions tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
from __future__ import absolute_import, unicode_literals

from time import sleep

import pytest
from six import string_types as string

from arango import ArangoClient
from arango.aql import AQL
from arango.collections import Collection
from arango.exceptions import (
AQLQueryExecuteError,
AsyncJobNotFoundError
AsyncExecuteError,
AsyncJobCancelError,
AsyncJobClearError,
AsyncJobInvalidError,
AsyncJobNotFoundError,
AsyncJobNotDoneError,
AsyncJobResultError,
AsyncJobStatusError,
AQLQueryExecuteError
)
from arango.graph import Graph

from .utils import (
generate_db_name,
generate_col_name,
generate_graph_name
generate_col_name
)

arango_client = ArangoClient()
db_name = generate_db_name(arango_client)
db = arango_client.create_database(db_name)
col_name = generate_col_name(db)
col = db.create_collection(col_name)
graph_name = generate_graph_name(db)
graph = db.create_graph(graph_name)
vcol_name = generate_col_name(db)
graph.create_vertex_collection(vcol_name)
col.add_fulltext_index(fields=['val'])
bad_col_name = generate_col_name(db)
bad_col = db.collection(bad_col_name)


def teardown_module(*_):
Expand Down Expand Up @@ -54,19 +61,57 @@ def test_init():


@pytest.mark.order2
def test_async_inserts():
def test_async_execute_error():
bad_db = arango_client.db(
name=db_name,
username='root',
password='incorrect'
)
async = bad_db.async(return_result=False)
with pytest.raises(AsyncExecuteError):
async.collection(col_name).insert({'_key': '1', 'val': 1})
with pytest.raises(AsyncExecuteError):
async.collection(col_name).properties()
with pytest.raises(AsyncExecuteError):
async.aql.execute('FOR d IN {} RETURN d'.format(col_name))


@pytest.mark.order3
def test_async_inserts_without_result():
# Test precondition
assert len(col) == 0

# Test insert document asynchronously
async = db.async(return_result=True)
# Insert test documents asynchronously with return_result False
async = db.async(return_result=False)
job1 = async.collection(col_name).insert({'_key': '1', 'val': 1})
job2 = async.collection(col_name).insert({'_key': '2', 'val': 2})
job3 = async.collection(col_name).insert({'_key': '3', 'val': 3})
jobs = [job1, job2, job3]

# Ensure that no jobs were returned
for job in [job1, job2, job3]:
assert job is None

# Ensure that the asynchronously requests went through
sleep(0.5)
assert len(col) == 3
assert col['1']['val'] == 1
assert col['2']['val'] == 2
assert col['3']['val'] == 3


@pytest.mark.order4
def test_async_inserts_with_result():
# Test precondition
assert len(col) == 0

# Insert test documents asynchronously with return_result True
async_col = db.async(return_result=True).collection(col_name)
job1 = async_col.insert({'_key': '1', 'val': 1}, sync=True)
job2 = async_col.insert({'_key': '2', 'val': 2}, sync=True)
job3 = async_col.insert({'_key': '3', 'val': 3}, sync=True)

# Wait for the asynchronous jobs to finish
for job in jobs:
for job in [job1, job2, job3]:
assert 'ArangoDB asynchronous job {}'.format(job.id) in repr(job)
assert isinstance(job.id, string)
wait_on_job(job)
Expand All @@ -77,8 +122,13 @@ def test_async_inserts():
assert dict(job2.result())['_key'] == '2'
assert dict(job3.result())['_key'] == '3'

# Test cancelling invalid jobs
setattr(job1, '_id', '')
with pytest.raises(AsyncJobInvalidError):
job1.result()

@pytest.mark.order3

@pytest.mark.order5
def test_async_query():
# Set up test documents
async = db.async(return_result=True)
Expand Down Expand Up @@ -114,8 +164,20 @@ def test_async_query():
assert set(d['_key'] for d in job.result()) == {'1'}


@pytest.mark.order4
def test_async_clear_job():
@pytest.mark.order2
def test_async_get_status():
async_col = db.async(return_result=True).collection(col_name)
test_docs = [{'_key': str(i), 'val': str(i * 42)} for i in range(50000)]
job = async_col.insert_many(test_docs, sync=True)
assert job.status() == 'pending'
wait_on_job(job)

setattr(job, '_id', '')
job.status()


@pytest.mark.order6
def test_clear_async_job():
# Setup test asynchronous jobs
async = db.async(return_result=True)
job1 = async.collection(col_name).insert({'_key': '1', 'val': 1})
Expand Down Expand Up @@ -145,8 +207,8 @@ def test_async_clear_job():
assert job.clear(ignore_missing=True) is False


@pytest.mark.order5
def test_async_cancel_job():
@pytest.mark.order7
def test_cancel_async_job():
# Setup test asynchronous jobs
async = db.async(return_result=True)
job1 = async.collection(col_name).insert({'_key': '1', 'val': 1})
Expand All @@ -164,4 +226,3 @@ def test_async_cancel_job():
with pytest.raises(AsyncJobNotFoundError):
assert job.cancel(ignore_missing=False)
assert job.cancel(ignore_missing=True) is False

4 changes: 2 additions & 2 deletions tests/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_read_cursor_early_finish():
ttl=1000,
optimizer_rules=['+all']
)
assert cursor.close(ignore_missing=True) is False
assert cursor.close() is True
with pytest.raises(CursorCloseError):
cursor.close(ignore_missing=False)

Expand Down Expand Up @@ -249,7 +249,7 @@ def test_write_cursor_early_finish():
ttl=1000,
optimizer_rules=['+all']
)
assert cursor.close(ignore_missing=True) is False
assert cursor.close() is True
with pytest.raises(CursorCloseError):
cursor.close(ignore_missing=False)

Expand Down

0 comments on commit 7db42f6

Please sign in to comment.