Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getters and setters for Task priority in AlchemiscaleClient #213

Merged
merged 21 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions alchemiscale/interface/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,50 @@ def cancel_tasks(
return [str(sk) if sk is not None else None for sk in canceled_sks]


@router.post("/bulk/tasks/priority/get")
def tasks_priority_get(
*,
tasks: List[ScopedKey] = Body(embed=True),
n4js: Neo4jStore = Depends(get_n4js_depends),
token: TokenData = Depends(get_token_data_depends),
) -> List[int]:
valid_tasks = []
for task_sk in tasks:
try:
validate_scopes(task_sk.scope, token)
valid_tasks.append(task_sk)
except HTTPException:
valid_tasks.append(None)

priorities = n4js.get_task_priority(valid_tasks)

return priorities


@router.post("/bulk/tasks/priority/set")
def tasks_priority_set(
*,
tasks: List[ScopedKey] = Body(embed=True),
priority: int = Body(embed=True),
n4js: Neo4jStore = Depends(get_n4js_depends),
token: TokenData = Depends(get_token_data_depends),
) -> List[Union[str, None]]:
valid_tasks = []
for task_sk in tasks:
try:
validate_scopes(task_sk.scope, token)
valid_tasks.append(task_sk)
except HTTPException:
valid_tasks.append(None)

try:
tasks_updated = n4js.set_task_priority(valid_tasks, priority)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))

return [str(t) if t is not None else None for t in tasks_updated]


@router.post("/bulk/tasks/status/get")
def tasks_status_get(
*,
Expand Down
132 changes: 110 additions & 22 deletions alchemiscale/interface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

import asyncio
from typing import Union, List, Dict, Optional, Tuple
from typing import Union, List, Dict, Optional, Tuple, Any
import json
from itertools import chain
from collections import Counter
Expand Down Expand Up @@ -724,12 +724,36 @@ def cancel_tasks(

return [ScopedKey.from_str(i) if i is not None else None for i in canceled_sks]

def _set_task_status(
self, task: ScopedKey, status: TaskStatusEnum
) -> Optional[ScopedKey]:
"""Set the status of a `Task`."""
task_sk = self._post_resource(f"/tasks/{task}/status", status.value)
return ScopedKey.from_str(task_sk) if task_sk is not None else None
def _task_attribute_getter(
self, tasks: List[ScopedKey], getter_function, batch_size
) -> List[Any]:
tasks = [
ScopedKey.from_str(task) if isinstance(task, str) else task
for task in tasks
]

@use_session
async def async_request(self):
values = await asyncio.gather(
*[
getter_function(task_batch)
for task_batch in self._batched(tasks, batch_size)
]
)

return list(chain.from_iterable(values))

coro = async_request(self)

try:
return asyncio.run(coro)
except RuntimeError:
# we use nest_asyncio to support environments where an event loop
# is already running, such as in a Jupyter notebook
import nest_asyncio

nest_asyncio.apply()
return asyncio.run(coro)
ianmkenney marked this conversation as resolved.
Show resolved Hide resolved

async def _set_task_status(
self, tasks: List[ScopedKey], status: TaskStatusEnum
Expand All @@ -744,12 +768,6 @@ async def _set_task_status(
for task_sk in tasks_updated
]

async def _get_task_status(self, tasks: List[ScopedKey]) -> List[TaskStatusEnum]:
"""Get the statuses for many Tasks"""
data = dict(tasks=[t.dict() for t in tasks])
statuses = await self._post_resource_async(f"/bulk/tasks/status/get", data=data)
return statuses

def set_tasks_status(
self,
tasks: List[ScopedKey],
Expand Down Expand Up @@ -807,6 +825,12 @@ async def async_request(self):
nest_asyncio.apply()
return asyncio.run(coro)

async def _get_task_status(self, tasks: List[ScopedKey]) -> List[TaskStatusEnum]:
"""Get the statuses for many Tasks"""
data = dict(tasks=[t.dict() for t in tasks])
statuses = await self._post_resource_async(f"/bulk/tasks/status/get", data=data)
return statuses

def get_tasks_status(
self, tasks: List[ScopedKey], batch_size: int = 1000
) -> List[str]:
Expand All @@ -827,40 +851,104 @@ def get_tasks_status(
given Task doesn't exist, ``None`` will be returned in its place.

"""
return self._task_attribute_getter(tasks, self._get_task_status, batch_size)

async def _set_task_priority(
self, tasks: List[ScopedKey], priority: int
) -> List[Optional[ScopedKey]]:
data = dict(tasks=[t.dict() for t in tasks], priority=priority)
tasks_updated = await self._post_resource_async(
f"/bulk/tasks/priority/set", data=data
)
return [
ScopedKey.from_str(task_sk) if task_sk is not None else None
for task_sk in tasks_updated
]

def set_tasks_priority(
self,
tasks: List[ScopedKey],
priority: int,
batch_size: int = 1000,
) -> List[Optional[ScopedKey]]:
"""Set the priority of multiple Tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a note here on what kind of integers are allowed for priority, and what these values mean? As-in, priority 1 is the highest priority, 2 the next highest, and so on, with the lowest priority allowed being 2**64 / 2 (max value of java long, since this gets stored in Neo4j).

Copy link
Member

@dotsdl dotsdl Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong about this; max value of a java long is 2**64/2 - 1, or 2**63 - 1. Fixing this myself now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also adding guardrail in Neo4jStore.

Parameters
----------
tasks
The Tasks to set the priority of.
priority
The priority to set for the Task.
batch_size
The number of Tasks to include in a single request; use to tune
method call speed when requesting many priorities at once.

Returns
-------
updated
The ScopedKeys of the Tasks that were updated, in the same order
as given in `tasks`. If a given Task doesn't exist, ``None`` will
be returned in its place.
"""
tasks = [
ScopedKey.from_str(task) if isinstance(task, str) else task
for task in tasks
]

@use_session
async def async_request(self):
statuses = await asyncio.gather(
scoped_keys = await asyncio.gather(
*[
self._get_task_status(task_batch)
self._set_task_priority(task_batch, priority)
for task_batch in self._batched(tasks, batch_size)
]
)

return list(chain.from_iterable(statuses))
return list(chain.from_iterable(scoped_keys))

coro = async_request(self)

try:
return asyncio.run(async_request(self))
return asyncio.run(coro)
except RuntimeError:
# we use nest_asyncio to support environments where an event loop
# is already running, such as in a Jupyter notebook
import nest_asyncio

nest_asyncio.apply()
return asyncio.run(async_request(self))
return asyncio.run(coro)

async def _get_task_priority(self, tasks: List[ScopedKey]) -> List[int]:
"""Get the priority for many Tasks"""
data = dict(tasks=[t.dict() for t in tasks])
priorities = await self._post_resource_async(
f"/bulk/tasks/priority/get", data=data
)
return priorities

def get_tasks_priority(
self,
tasks: List[ScopedKey],
):
raise NotImplementedError
batch_size: int = 1000,
) -> List[int]:
"""Get the priority of multiple Tasks.

def set_tasks_priority(self, tasks: List[ScopedKey], priority: int):
raise NotImplementedError
Parameters
----------
tasks
The Tasks to get the priority of.
batch_size
The number of Tasks to include in a single request; use to tune
method call speed when requesting many priorities at once.

Returns
-------
priorities
The priority of each Task in the same order as given in `tasks`. If a
given Task doesn't exist, ``None`` will be returned in its place.

"""
return self._task_attribute_getter(tasks, self._get_task_priority, batch_size)

### results

Expand Down
50 changes: 43 additions & 7 deletions alchemiscale/storage/statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,14 +1710,50 @@ def set_tasks(
transformation, Transformation, scope
)

def set_task_priority(self, task: ScopedKey, priority: int):
q = f"""
MATCH (t:Task {{_scoped_key: "{task}"}})
SET t.priority = {priority}
RETURN t
"""
def set_task_priority(self, task: Union[ScopedKey, List[ScopedKey]], priority: int):
if not priority >= 0:
raise ValueError("priority cannot be negative")

if isinstance(task, ScopedKey):
task = [task]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to simplify a bit and make it consistent with other methods, let's axe this method's ability to take a single task, since it avoids the asymmetry of input sometimes being a non-iterable but output always being an iterable.

We generally try to take this pattern at this lowest layer of the stack; it doesn't need to be user-friendly, but we aim for a fairly logical consistency where possible.

This will simplify the tests for these methods as well, reducing the input space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add -> List[Optional[ScopedKey]] as the output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And rename task to tasks. 😁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a docstring?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to simplify a bit and make it consistent with other methods, let's axe this method's ability to take a single task, since it avoids the asymmetry of input sometimes being a non-iterable but output always being an iterable.

We generally try to take this pattern at this lowest layer of the stack; it doesn't need to be user-friendly, but we aim for a fairly logical consistency where possible.

This will simplify the tests for these methods as well, reducing the input space.

Agreed. This will require changing compute tests using the single task inputs. I was trying to preserve previous functionality. Might be worthwhile doing a general consistency sweep before the next release.


with self.transaction() as tx:
tx.run(q)
q = """
WITH $scoped_keys AS batch
UNWIND batch AS scoped_key

OPTIONAL MATCH (t:Task {_scoped_key: scoped_key})
SET t.priority = $priority

RETURN scoped_key, t
"""
res = tx.run(q, scoped_keys=[str(t) for t in task], priority=priority)

task_results = []
for record in res:
task_i = record["t"]
scoped_key = record["scoped_key"]

# catch missing tasks
if task_i is None:
task_results.append(None)
else:
task_results.append(ScopedKey.from_str(scoped_key))
return task_results

def get_task_priority(self, tasks: List[ScopedKey]) -> List[Optional[int]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring here as well!

with self.transaction() as tx:
q = """
WITH $scoped_keys AS batch
UNWIND batch AS scoped_key
OPTIONAL MATCH (t:Task)
WHERE t._scoped_key = scoped_key
RETURN t.priority as priority
"""
res = tx.run(q, scoped_keys=[str(t) for t in tasks])
priorities = [rec["priority"] for rec in res]

return priorities

def delete_task(
self,
Expand Down
87 changes: 83 additions & 4 deletions alchemiscale/tests/integration/interface/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,11 +869,90 @@ def test_get_tasks_status(
statuses = user_client.get_tasks_status(all_tasks)
assert all([s == status.value for s in statuses])

def test_get_tasks_priority(self):
...
def test_get_tasks_priority(
self,
scope_test,
n4js_preloaded,
network_tyk2,
user_client: client.AlchemiscaleClient,
uvicorn_server,
):
an = network_tyk2
transformation = list(an.edges)[0]
transformation_sk = user_client.get_scoped_key(transformation, scope_test)

all_tasks = user_client.create_tasks(transformation_sk, count=5)
priorities = user_client.get_tasks_priority(all_tasks)

assert all([p == 10 for p in priorities])

@pytest.mark.parametrize(
"priority, should_raise",
[
(1, False),
(-1, True),
],
)
def test_set_tasks_priority(
self,
scope_test,
n4js_preloaded,
network_tyk2,
user_client: client.AlchemiscaleClient,
uvicorn_server,
priority,
should_raise,
):
an = network_tyk2
transformation = list(an.edges)[0]

network_sk = user_client.get_scoped_key(an, scope_test)
transformation_sk = user_client.get_scoped_key(transformation, scope_test)

all_tasks = user_client.create_tasks(transformation_sk, count=5)
priorities = user_client.get_tasks_priority(all_tasks)

# baseline, we need to confirm that values are different after set call
assert all([p == 10 for p in priorities])

if should_raise:
with pytest.raises(
AlchemiscaleClientError,
match="Status Code 400 : Bad Request : priority cannot be negative",
):
user_client.set_tasks_priority(all_tasks, priority)
else:
user_client.set_tasks_priority(all_tasks, priority)
priorities = user_client.get_tasks_priority(all_tasks)
assert all([p == priority for p in priorities])

def test_set_tasks_priority_missing_tasks(
self,
scope_test,
n4js_preloaded,
network_tyk2,
user_client: client.AlchemiscaleClient,
uvicorn_server,
):
an = network_tyk2
transformation = list(an.edges)[0]

network_sk = user_client.get_scoped_key(an, scope_test)
transformation_sk = user_client.get_scoped_key(transformation, scope_test)

all_tasks = user_client.create_tasks(transformation_sk, count=5)

def test_set_tasks_priority(self):
...
fake_tasks = [
ScopedKey.from_str(t)
for t in [
"Task-FAKE1-test_org-test_campaign-test_project",
"Task-FAKE2-test_org-test_campaign-test_project",
]
]

updated_tasks = user_client.set_tasks_priority(fake_tasks + all_tasks, 10)
assert updated_tasks[0:2] == [None, None]
assert [t is not None for t in updated_tasks[2:]]

### results

Expand Down
Loading
Loading