Skip to content

Commit

Permalink
Delete Points By Filter API #39 (#250)
Browse files Browse the repository at this point in the history
* Delete Points By Filter API #39

* make delete_by_filter part of existing delete query + fix merge issues #39

* apply fmt

Co-authored-by: Andrey Vasnetsov <andrey@vasnetsov.com>
  • Loading branch information
elbart and generall committed Jan 25, 2022
1 parent eed0f51 commit 559e7a8
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 73 deletions.
60 changes: 47 additions & 13 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,17 @@
},
"type": "object"
},
"FilterSelector": {
"properties": {
"filter": {
"$ref": "#/components/schemas/Filter"
}
},
"required": [
"filter"
],
"type": "object"
},
"GeoBoundingBox": {
"description": "Geo filter request\n\nMatches coordinates inside the rectangle, described by coordinates of lop-left and bottom-right edges",
"properties": {
Expand Down Expand Up @@ -1247,6 +1258,22 @@
],
"description": "Payload interface structure which ensures that user is allowed to pass payload in both - array and single element forms.\n\nExample:\n\nBoth versions should work: ```json {..., \"payload\": {\"city\": {\"type\": \"keyword\", \"value\": [\"Berlin\", \"London\"] }}}, {..., \"payload\": {\"city\": {\"type\": \"keyword\", \"value\": \"Moscow\" }}}, ```"
},
"PointIdsList": {
"properties": {
"points": {
"items": {
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"type": "array"
}
},
"required": [
"points"
],
"type": "object"
},
"PointInsertOperations": {
"anyOf": [
{
Expand Down Expand Up @@ -1297,6 +1324,19 @@
"delete_points"
],
"type": "object"
},
{
"additionalProperties": false,
"description": "Delete points by given filter criteria",
"properties": {
"delete_points_by_filter": {
"$ref": "#/components/schemas/Filter"
}
},
"required": [
"delete_points_by_filter"
],
"type": "object"
}
]
},
Expand Down Expand Up @@ -1383,20 +1423,14 @@
"type": "object"
},
"PointsSelector": {
"properties": {
"points": {
"items": {
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"type": "array"
"anyOf": [
{
"$ref": "#/components/schemas/PointIdsList"
},
{
"$ref": "#/components/schemas/FilterSelector"
}
},
"required": [
"points"
],
"type": "object"
]
},
"Range": {
"description": "Range filter request",
Expand Down
60 changes: 47 additions & 13 deletions docs/redoc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,17 @@
},
"type": "object"
},
"FilterSelector": {
"properties": {
"filter": {
"$ref": "#/components/schemas/Filter"
}
},
"required": [
"filter"
],
"type": "object"
},
"GeoBoundingBox": {
"description": "Geo filter request\n\nMatches coordinates inside the rectangle, described by coordinates of lop-left and bottom-right edges",
"properties": {
Expand Down Expand Up @@ -1247,6 +1258,22 @@
],
"description": "Payload interface structure which ensures that user is allowed to pass payload in both - array and single element forms.\n\nExample:\n\nBoth versions should work: ```json {..., \"payload\": {\"city\": {\"type\": \"keyword\", \"value\": [\"Berlin\", \"London\"] }}}, {..., \"payload\": {\"city\": {\"type\": \"keyword\", \"value\": \"Moscow\" }}}, ```"
},
"PointIdsList": {
"properties": {
"points": {
"items": {
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"type": "array"
}
},
"required": [
"points"
],
"type": "object"
},
"PointInsertOperations": {
"anyOf": [
{
Expand Down Expand Up @@ -1297,6 +1324,19 @@
"delete_points"
],
"type": "object"
},
{
"additionalProperties": false,
"description": "Delete points by given filter criteria",
"properties": {
"delete_points_by_filter": {
"$ref": "#/components/schemas/Filter"
}
},
"required": [
"delete_points_by_filter"
],
"type": "object"
}
]
},
Expand Down Expand Up @@ -1383,20 +1423,14 @@
"type": "object"
},
"PointsSelector": {
"properties": {
"points": {
"items": {
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"type": "array"
"anyOf": [
{
"$ref": "#/components/schemas/PointIdsList"
},
{
"$ref": "#/components/schemas/FilterSelector"
}
},
"required": [
"points"
],
"type": "object"
]
},
"Range": {
"description": "Range filter request",
Expand Down
11 changes: 11 additions & 0 deletions lib/collection/src/collection_manager/holders/proxy_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,17 @@ impl SegmentEntry for ProxySegment {
fn check_error(&self) -> Option<SegmentFailedState> {
self.write_segment.get().read().check_error()
}

fn delete_filtered<'a>(
&'a mut self,
op_num: SeqNumberType,
filter: &'a Filter,
) -> OperationResult<usize> {
self.write_segment
.get()
.write()
.delete_filtered(op_num, filter)
}
}

#[cfg(test)]
Expand Down
19 changes: 18 additions & 1 deletion lib/collection/src/collection_manager/segments_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use parking_lot::{RwLock, RwLockWriteGuard};

use segment::types::{
PayloadInterface, PayloadKeyType, PayloadKeyTypeRef, PointIdType, SeqNumberType,
Filter, PayloadInterface, PayloadKeyType, PayloadKeyTypeRef, PointIdType, SeqNumberType,
VectorElementType,
};

Expand Down Expand Up @@ -266,6 +266,9 @@ pub(crate) fn process_point_operation(
let res = upsert_points(segments, op_num, &ids, &vectors, &payloads)?;
Ok(res)
}
PointOperations::DeletePointsByFilter(filter) => {
delete_points_by_filter(&segments.read(), op_num, &filter)
}
}
}

Expand Down Expand Up @@ -299,3 +302,17 @@ pub(crate) fn process_field_index_operation(
}
}
}

/// Deletes points from all segments matching the given filter
pub(crate) fn delete_points_by_filter(
segments: &SegmentHolder,
op_num: SeqNumberType,
filter: &Filter,
) -> CollectionResult<usize> {
let mut deleted = 0;
segments.apply_segments(|s| {
deleted += s.delete_filtered(op_num, filter)?;
Ok(true)
})?;
Ok(deleted)
}
4 changes: 2 additions & 2 deletions lib/collection/src/operations/payload_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use std::collections::HashMap;
pub struct SetPayload {
pub payload: HashMap<PayloadKeyType, PayloadInterface>,
/// Assigns payload to each point in this list
pub points: Vec<PointIdType>,
pub points: Vec<PointIdType>, // ToDo: replace with point selector
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct DeletePayload {
pub keys: Vec<PayloadKeyType>,
/// Deletes values from each point in this list
pub points: Vec<PointIdType>,
pub points: Vec<PointIdType>, // ToDo: replace with point selector
}

/// Define operations description for point payloads manipulation
Expand Down
26 changes: 25 additions & 1 deletion lib/collection/src/operations/point_ops.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::operations::types::VectorType;
use schemars::JsonSchema;
use segment::types::{PayloadInterface, PayloadKeyType, PointIdType};
use segment::types::{Filter, PayloadInterface, PayloadKeyType, PointIdType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

Expand Down Expand Up @@ -35,6 +35,28 @@ pub struct PointsList {
pub points: Vec<PointStruct>,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct PointIdsList {
pub points: Vec<PointIdType>,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct FilterSelector {
pub filter: Filter,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
pub enum PointsSelector {
/// Select points by list of IDs
PointIdsSelector(PointIdsList),
/// Select points by filtering condition
FilterSelector(FilterSelector),
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
Expand All @@ -52,6 +74,8 @@ pub enum PointOperations {
UpsertPoints(PointInsertOperations),
/// Delete point if exists
DeletePoints { ids: Vec<PointIdType> },
/// Delete points by given filter criteria
DeletePointsByFilter(Filter),
}

impl From<Batch> for PointInsertOperations {
Expand Down
78 changes: 76 additions & 2 deletions lib/collection/tests/collection_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use tempdir::TempDir;
Expand All @@ -10,7 +10,8 @@ use collection::operations::point_ops::{Batch, PointOperations, PointStruct};
use collection::operations::types::{RecommendRequest, ScrollRequest, SearchRequest, UpdateStatus};
use collection::operations::CollectionUpdateOperations;
use segment::types::{
PayloadInterface, PayloadKeyType, PayloadVariant, WithPayload, WithPayloadInterface,
Condition, HasIdCondition, PayloadInterface, PayloadKeyType, PayloadVariant, PointIdType,
WithPayload, WithPayloadInterface,
};

use crate::common::simple_collection_fixture;
Expand Down Expand Up @@ -331,3 +332,76 @@ async fn test_read_api() {
assert_eq!(result.next_page_offset, Some(2));
assert_eq!(result.points.len(), 2);
}

#[tokio::test]
async fn test_collection_delete_points_by_filter() {
let collection_dir = TempDir::new("collection").unwrap();

let collection = simple_collection_fixture(collection_dir.path()).await;

let insert_points = CollectionUpdateOperations::PointOperation(
Batch {
ids: vec![0, 1, 2, 3, 4],
vectors: vec![
vec![1.0, 0.0, 1.0, 1.0],
vec![1.0, 0.0, 1.0, 0.0],
vec![1.0, 1.0, 1.0, 1.0],
vec![1.0, 1.0, 0.0, 1.0],
vec![1.0, 0.0, 0.0, 0.0],
],
payloads: None,
}
.into(),
);

let insert_result = collection.update(insert_points, true).await;

match insert_result {
Ok(res) => {
assert_eq!(res.status, UpdateStatus::Completed)
}
Err(err) => panic!("operation failed: {:?}", err),
}

// delete points with id (0, 3)
let to_be_deleted: HashSet<PointIdType> = vec![0, 3].into_iter().collect();
let delete_filter = segment::types::Filter {
should: None,
must: Some(vec![Condition::HasId(HasIdCondition::from(to_be_deleted))]),
must_not: None,
};

let delete_points = CollectionUpdateOperations::PointOperation(
PointOperations::DeletePointsByFilter(delete_filter),
);

let delete_result = collection.update(delete_points, true).await;

match delete_result {
Ok(res) => {
assert_eq!(res.status, UpdateStatus::Completed)
}
Err(err) => panic!("operation failed: {:?}", err),
}

let segment_searcher = SimpleCollectionSearcher::new();
let result = collection
.scroll_by(
ScrollRequest {
offset: Some(0),
limit: Some(10),
filter: None,
with_payload: Some(WithPayloadInterface::Bool(false)),
with_vector: None,
},
&segment_searcher,
)
.await
.unwrap();

// check if we only have 3 out of 5 points left and that the point id were really deleted
assert_eq!(result.points.len(), 3);
assert_eq!(result.points.get(0).unwrap().id, 1);
assert_eq!(result.points.get(1).unwrap().id, 2);
assert_eq!(result.points.get(2).unwrap().id, 4);
}

0 comments on commit 559e7a8

Please sign in to comment.