Skip to content

Commit

Permalink
feat(Datasets): delete records by query (#1721)
Browse files Browse the repository at this point in the history
Closes #1714

(cherry picked from commit be4e92c)

fix: delete records feature (#1737)

Some problems detected:

Change from httpx.delete to httpx.request with method="DELETE", since the delete method does not accept body param (data or json)
Fix response data when no records match for deletion
  • Loading branch information
frascuchon committed Oct 5, 2022
1 parent de5eba5 commit bc9685d
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/reference/python/python_client.rst
Expand Up @@ -15,7 +15,7 @@ Methods
-------

.. automodule:: rubrix
:members: init, log, load, copy, delete, set_workspace, get_workspace
:members: init, log, load, copy, delete, set_workspace, get_workspace, delete_records

.. _python ref records:

Expand Down
2 changes: 2 additions & 0 deletions src/rubrix/__init__.py
Expand Up @@ -32,6 +32,7 @@
from rubrix.client.api import (
copy,
delete,
delete_records,
get_workspace,
init,
load,
Expand Down Expand Up @@ -68,6 +69,7 @@
"get_workspace",
"init",
"load",
"delete_records",
"log",
"log_async",
"set_workspace",
Expand Down
49 changes: 48 additions & 1 deletion src/rubrix/client/api.py
Expand Up @@ -20,7 +20,7 @@
from asyncio import Future
from functools import wraps
from inspect import signature
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from tqdm.auto import tqdm

Expand Down Expand Up @@ -401,6 +401,48 @@ async def log_async(
# Creating a composite BulkResponse with the total processed and failed
return BulkResponse(dataset=name, processed=processed, failed=failed)

def delete_records(
self,
name: str,
query: Optional[str] = None,
ids: Optional[List[Union[str, int]]] = None,
discard_only: bool = False,
discard_when_forbidden: bool = True,
) -> Tuple[int, int]:
"""Delete records from a Rubrix dataset.
Args:
name: The dataset name.
query: An ElasticSearch query with the `query string syntax
<https://rubrix.readthedocs.io/en/stable/guides/queries.html>`_
ids: If provided, deletes dataset records with given ids.
discard_only: If `True`, matched records won't be deleted. Instead, they will be marked as `Discarded`
discard_when_forbidden: Only super-user or dataset creator can delete records from a dataset.
So, running "hard" deletion for other users will raise an `ForbiddenApiError` error.
If this parameter is `True`, the client API will automatically try to mark as ``Discarded``
records instead. Default, `True`
Returns:
The total of matched records and real number of processed errors. These numbers could not
be the same if some data conflicts are found during operations (some matched records change during
deletion).
Examples:
>>> ## Delete by id
>>> import rubrix as rb
>>> rb.delete_records(name="example-dataset", ids=[1,3,5])
>>> ## Discard records by query
>>> import rubrix as rb
>>> rb.delete_records(name="example-dataset", query="metadata.code=33", discard_only=True)
"""
return self.datasets.delete_records(
name=name,
query=query,
ids=ids,
mark_as_discarded=discard_only,
discard_when_forbidden=discard_when_forbidden,
)

def load(
self,
name: str,
Expand Down Expand Up @@ -649,5 +691,10 @@ def load(*args, **kwargs):
return active_api().load(*args, **kwargs)


@api_wrapper(Api.delete_records)
def delete_records(*args, **kwargs):
return active_api().delete_records(*args, **kwargs)


class InputValueError(RubrixClientError):
pass
61 changes: 58 additions & 3 deletions src/rubrix/client/apis/datasets.py
@@ -1,11 +1,16 @@
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from pydantic import BaseModel, Field

from rubrix.client.apis import AbstractApi, api_compatibility
from rubrix.client.sdk.commons.errors import AlreadyExistsApiError, NotFoundApiError
from rubrix.client.sdk.commons.errors import (
AlreadyExistsApiError,
ForbiddenApiError,
NotFoundApiError,
)
from rubrix.client.sdk.datasets.api import get_dataset
from rubrix.client.sdk.datasets.models import TaskType

Expand Down Expand Up @@ -126,8 +131,58 @@ def configure(self, name: str, settings: Settings):
ds = self.find_by_name(name)
self.__save_settings__(dataset=ds, settings=settings)

def __save_settings__(self, dataset: _DatasetApiModel, settings: Settings):
def delete_records(
self,
name: str,
query: Optional[str] = None,
ids: Optional[List[Union[str, int]]] = None,
mark_as_discarded: bool = False,
discard_when_forbidden: bool = True,
) -> Tuple[int, int]:
"""
Tries to delete records in a dataset for a given query/ids list.
Args:
name: The dataset name
query: The query matching records
ids: A list of records ids. If provided, the query param will be ignored
mark_as_discarded: If `True`, the matched records will be marked as `Discarded` instead
of delete them
discard_when_forbidden: Only super-user or dataset creator can delete records from a dataset.
So, running "hard" deletion for other users will raise an `ForbiddenApiError` error.
If this parameter is `True`, the client API will automatically try to mark as ``Discarded``
records instead.
Returns:
The total of matched records and real number of processed errors. These numbers could not
be the same if some data conflicts are found during operations (some matched records change during
deletion).
"""
with api_compatibility(self, min_version="0.18"):
try:
response = self.__client__.delete(
path=f"{self._API_PREFIX}/{name}/data?mark_as_discarded={mark_as_discarded}",
json={"ids": ids} if ids else {"query_text": query},
)
return response["matched"], response["processed"]
except ForbiddenApiError as faer:
if discard_when_forbidden:
warnings.warn(
message=f"{faer}. Records will be discarded instead",
category=UserWarning,
)
return self.delete_records(
name=name,
query=query,
ids=ids,
mark_as_discarded=True,
discard_when_forbidden=False, # Next time will raise the error
)
else:
raise faer

def __save_settings__(self, dataset: _DatasetApiModel, settings: Settings):
if __TASK_TO_SETTINGS__.get(dataset.task) != type(settings):
raise ValueError(
f"The provided settings type {type(settings)} cannot be applied to dataset."
Expand Down
12 changes: 12 additions & 0 deletions src/rubrix/client/sdk/client.py
Expand Up @@ -120,6 +120,18 @@ def put(self, path: str, *args, **kwargs):
)
return build_raw_response(response).parsed

@with_httpx_error_handler
def delete(self, path: str, *args, **kwargs):
path = self._normalize_path(path)
response = self.__httpx__.request(
method="DELETE",
url=path,
headers=self.get_headers(),
*args,
**kwargs,
)
return build_raw_response(response).parsed

@with_httpx_error_handler
def stream(self, path: str, *args, **kwargs):
return self.__httpx__.stream(
Expand Down
61 changes: 61 additions & 0 deletions src/rubrix/server/apis/v0/handlers/records_deletion.py
@@ -0,0 +1,61 @@
from typing import Optional, Union

from fastapi import APIRouter, Depends, Query, Security
from pydantic import BaseModel

from rubrix.client.sdk.token_classification.models import TokenClassificationQuery
from rubrix.server.apis.v0.models.commons.params import CommonTaskHandlerDependencies
from rubrix.server.apis.v0.models.text2text import Text2TextQuery
from rubrix.server.apis.v0.models.text_classification import TextClassificationQuery
from rubrix.server.security import auth
from rubrix.server.security.model import User
from rubrix.server.services.datasets import DatasetsService
from rubrix.server.services.storage.service import RecordsStorageService


def configure_router(router: APIRouter):
QueryType = Union[TextClassificationQuery, TokenClassificationQuery, Text2TextQuery]

class DeleteRecordsResponse(BaseModel):
matched: int
processed: int

@router.delete(
"/{name}/data",
operation_id="delete_dataset_records",
response_model=DeleteRecordsResponse,
)
async def delete_dataset_records(
name: str,
query: Optional[QueryType] = None,
mark_as_discarded: bool = Query(
default=False,
title="If True, matched records won't be deleted."
" Instead of that, the record status will be changed to `Discarded`",
),
request_deps: CommonTaskHandlerDependencies = Depends(),
service: DatasetsService = Depends(DatasetsService.get_instance),
storage: RecordsStorageService = Depends(RecordsStorageService.get_instance),
current_user: User = Security(auth.get_user, scopes=[]),
):
found = service.find_by_name(
user=current_user,
name=name,
workspace=request_deps.workspace,
)

result = await storage.delete_records(
user=current_user,
dataset=found,
query=query,
mark_as_discarded=mark_as_discarded,
)

return DeleteRecordsResponse(
matched=result.processed,
processed=result.deleted or result.discarded,
)


router = APIRouter(tags=["datasets"], prefix="/datasets")
configure_router(router)
23 changes: 22 additions & 1 deletion src/rubrix/server/daos/records.py
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import datetime
from typing import Any, Dict, Iterable, List, Optional, Type
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type

from fastapi import Depends

Expand Down Expand Up @@ -200,3 +200,24 @@ def get_dataset_schema(self, dataset: DatasetDB) -> Dict[str, Any]:
"""Return inner elasticsearch index configuration"""
schema = self._es.get_mappings(id=dataset.id)
return schema

async def delete_records_by_query(
self,
dataset: DatasetDB,
query: Optional[BackendRecordsQuery] = None,
) -> Tuple[int, int]:
total, deleted = await self._es.delete_records_by_query(
id=dataset.id, query=query
)
return total, deleted

async def update_records_by_query(
self,
dataset: DatasetDB,
query: Optional[BackendRecordsQuery] = None,
**content,
) -> Tuple[int, int]:
total, updated = await self._es.update_records_content(
id=dataset.id, content=content, query=query
)
return total, updated
9 changes: 8 additions & 1 deletion src/rubrix/server/routes.py
Expand Up @@ -23,6 +23,7 @@
from rubrix.server.apis.v0.handlers import datasets as datasets
from rubrix.server.apis.v0.handlers import info as info
from rubrix.server.apis.v0.handlers import metrics as tasks
from rubrix.server.apis.v0.handlers import records_deletion as records_deletion
from rubrix.server.apis.v0.handlers import users as users
from rubrix.server.errors.base_errors import __ALL__

Expand All @@ -33,5 +34,11 @@

dependencies = []

for router in [users.router, datasets.router, info.router, tasks.router]:
for router in [
users.router,
datasets.router,
info.router,
tasks.router,
records_deletion.router,
]:
api_router.include_router(router, dependencies=dependencies)
45 changes: 44 additions & 1 deletion src/rubrix/server/services/storage/service.py
@@ -1,14 +1,26 @@
from typing import List, Type
import dataclasses
from typing import List, Optional, Type

from fastapi import Depends

from rubrix.server.commons import telemetry
from rubrix.server.commons.config import TasksFactory
from rubrix.server.commons.models import TaskStatus
from rubrix.server.daos.records import DatasetRecordsDAO
from rubrix.server.errors import ForbiddenOperationError
from rubrix.server.security.model import User
from rubrix.server.services.datasets import ServiceDataset
from rubrix.server.services.search.model import ServiceBaseRecordsQuery
from rubrix.server.services.tasks.commons import ServiceRecord


@dataclasses.dataclass
class DeleteRecordsOut:
processed: int = 0
discarded: int = 0
deleted: int = 0


class RecordsStorageService:

_INSTANCE: "RecordsStorageService" = None
Expand Down Expand Up @@ -44,3 +56,34 @@ async def store_records(
records=records,
record_class=record_type,
)

async def delete_records(
self,
user: User,
dataset: ServiceDataset,
query: Optional[ServiceBaseRecordsQuery] = None,
mark_as_discarded: bool = False,
) -> DeleteRecordsOut:
processed, discarded, deleted = None, None, None
if mark_as_discarded:
processed, discarded = await self.__dao__.update_records_by_query(
dataset,
query=query,
status=TaskStatus.discarded,
)
else:
if not user.is_superuser() and user.username != dataset.created_by:
raise ForbiddenOperationError(
f"You don't have the necessary permissions to delete records on this dataset. "
"Only dataset creators or administrators can delete datasets"
)

processed, deleted = await self.__dao__.delete_records_by_query(
dataset, query=query
)

return DeleteRecordsOut(
processed=processed or 0,
discarded=discarded or 0,
deleted=deleted or 0,
)
Empty file.

0 comments on commit bc9685d

Please sign in to comment.