Skip to content

Commit

Permalink
fix alias operation deadlock (#103) (#105)
Browse files Browse the repository at this point in the history
* fix alias operation deadlock (#103)

* cargo fmt (#103)
  • Loading branch information
generall committed Oct 6, 2021
1 parent a58b74e commit cca7839
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 33 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ authors = ["Andrey Vasnetsov <vasnetsov93@gmail.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dev-dependencies]
tempdir = "0.3.7"

[dependencies]

Expand All @@ -18,6 +20,7 @@ tokio = {version = "~1.7", features = ["rt-multi-thread"]}
serde = { version = "~1.0", features = ["derive"] }
schemars = "0.8.0"
itertools = "0.9"
async-trait = "0.1.51"


segment = {path = "../segment"}
Expand Down
40 changes: 40 additions & 0 deletions lib/storage/src/content_manager/collections_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::content_manager::errors::StorageError;
use async_trait::async_trait;
use collection::collection::Collection;
use std::collections::HashMap;
use std::sync::Arc;

pub type Collections = HashMap<String, Arc<Collection>>;

#[async_trait]
pub trait Checker {
async fn is_collection_exists(&self, collection_name: &str) -> bool;

async fn validate_collection_not_exists(
&self,
collection_name: &str,
) -> Result<(), StorageError> {
if self.is_collection_exists(collection_name).await {
return Err(StorageError::BadInput {
description: format!("Collection `{}` already exists!", collection_name),
});
}
Ok(())
}

async fn validate_collection_exists(&self, collection_name: &str) -> Result<(), StorageError> {
if !self.is_collection_exists(collection_name).await {
return Err(StorageError::NotFound {
description: format!("Collection `{}` doesn't exist!", collection_name),
});
}
Ok(())
}
}

#[async_trait]
impl Checker for Collections {
async fn is_collection_exists(&self, collection_name: &str) -> bool {
self.contains_key(collection_name)
}
}
1 change: 1 addition & 0 deletions lib/storage/src/content_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod collections_ops;
pub mod errors;
pub mod storage_ops;
pub mod toc;
54 changes: 23 additions & 31 deletions lib/storage/src/content_manager/toc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use collection::operations::types::{
use collection::operations::CollectionUpdateOperations;
use segment::types::{PointIdType, ScoredPoint};

use crate::content_manager::collections_ops::{Checker, Collections};
use crate::content_manager::errors::StorageError;
use crate::content_manager::storage_ops::{AliasOperations, StorageOperations};
use crate::types::StorageConfig;
Expand All @@ -35,7 +36,7 @@ const SLED_CACHE_SIZE: u64 = 1 * 1024 * 1024; // 1 mb
const COLLECTIONS_DIR: &str = "collections";

pub struct TableOfContent {
collections: Arc<RwLock<HashMap<String, Arc<Collection>>>>,
collections: Arc<RwLock<Collections>>,
storage_config: StorageConfig,
search_runtime: Runtime,
alias_persistence: Db,
Expand Down Expand Up @@ -109,42 +110,21 @@ impl TableOfContent {
Ok(path)
}

async fn validate_collection_not_exists(
&self,
collection_name: &str,
) -> Result<(), StorageError> {
if self.is_collection_exists(collection_name).await {
return Err(StorageError::BadInput {
description: format!("Collection `{}` already exists!", collection_name),
});
}
Ok(())
}

async fn validate_collection_exists(&self, collection_name: &str) -> Result<(), StorageError> {
if !self.is_collection_exists(collection_name).await {
return Err(StorageError::NotFound {
description: format!("Collection `{}` doesn't exist!", collection_name),
});
}
Ok(())
}

async fn resolve_name(&self, collection_name: &str) -> Result<String, StorageError> {
let alias_collection_name = self.alias_persistence.get(collection_name.as_bytes())?;

let resolved_name = match alias_collection_name {
None => collection_name.to_string(),
Some(resolved_alias) => from_utf8(&resolved_alias).unwrap().to_string(),
};
self.validate_collection_exists(&resolved_name).await?;
self.collections
.read()
.await
.validate_collection_exists(&resolved_name)
.await?;
Ok(resolved_name)
}

pub async fn is_collection_exists(&self, collection_name: &str) -> bool {
self.collections.read().await.contains_key(collection_name)
}

pub async fn perform_collection_operation(
&self,
operation: StorageOperations,
Expand All @@ -158,7 +138,10 @@ impl TableOfContent {
wal_config: wal_config_diff,
optimizers_config: optimizers_config_diff,
} => {
self.validate_collection_not_exists(&collection_name)
self.collections
.read()
.await
.validate_collection_not_exists(&collection_name)
.await?;
let collection_path = self.create_collection_path(&collection_name)?;

Expand Down Expand Up @@ -190,6 +173,9 @@ impl TableOfContent {
)?;

let mut write_collections = self.collections.write().await;
write_collections
.validate_collection_not_exists(&collection_name)
.await?;
write_collections.insert(collection_name, Arc::new(collection));
Ok(true)
}
Expand Down Expand Up @@ -229,15 +215,21 @@ impl TableOfContent {
}
}
StorageOperations::ChangeAliases { actions } => {
let _collection_lock = self.collections.write().await; // Make alias change atomic
// Lock all collections for alias changes
// Prevent search on partially switched collections
let collection_lock = self.collections.write().await;
for action in actions {
match action {
AliasOperations::CreateAlias {
collection_name,
alias_name,
} => {
self.validate_collection_exists(&collection_name).await?;
self.validate_collection_not_exists(&alias_name).await?;
collection_lock
.validate_collection_exists(&collection_name)
.await?;
collection_lock
.validate_collection_not_exists(&alias_name)
.await?;

self.alias_persistence
.insert(alias_name.as_bytes(), collection_name.as_bytes())?;
Expand Down
86 changes: 86 additions & 0 deletions lib/storage/tests/alias_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use collection::collection_builder::optimizers_builder::OptimizersConfig;
use storage::content_manager::toc::TableOfContent;
use storage::types::{PerformanceConfig, StorageConfig};
use tempdir::TempDir;
use tokio::runtime::Runtime;

#[cfg(test)]
mod tests {
use super::*;
use segment::types::Distance;
use storage::content_manager::storage_ops::{AliasOperations, StorageOperations};

#[test]
fn test_alias_operation() {
let storage_dir = TempDir::new("storage").unwrap();

let config = StorageConfig {
storage_path: storage_dir.path().to_str().unwrap().to_string(),
optimizers: OptimizersConfig {
deleted_threshold: 0.5,
vacuum_min_vector_number: 100,
max_segment_number: 2,
memmap_threshold: 100,
indexing_threshold: 100,
payload_indexing_threshold: 100,
flush_interval_sec: 2,
},
wal: Default::default(),
performance: PerformanceConfig {
max_search_threads: 1,
},
hnsw_index: Default::default(),
};

let runtime = Runtime::new().unwrap();
let handle = runtime.handle().clone();

let toc = TableOfContent::new(&config, runtime);

handle
.block_on(
toc.perform_collection_operation(StorageOperations::CreateCollection {
name: "test".to_string(),
vector_size: 10,
distance: Distance::Cosine,
hnsw_config: None,
wal_config: None,
optimizers_config: None,
}),
)
.unwrap();

handle
.block_on(
toc.perform_collection_operation(StorageOperations::ChangeAliases {
actions: vec![AliasOperations::CreateAlias {
collection_name: "test".to_string(),
alias_name: "test_alias".to_string(),
}],
}),
)
.unwrap();

handle
.block_on(
toc.perform_collection_operation(StorageOperations::ChangeAliases {
actions: vec![
AliasOperations::CreateAlias {
collection_name: "test".to_string(),
alias_name: "test_alias2".to_string(),
},
AliasOperations::DeleteAlias {
alias_name: "test_alias".to_string(),
},
AliasOperations::RenameAlias {
old_alias_name: "test_alias2".to_string(),
new_alias_name: "test_alias3".to_string(),
},
],
}),
)
.unwrap();

handle.block_on(toc.get_collection(&"test_alias3")).unwrap();
}
}
24 changes: 24 additions & 0 deletions tests/basic_api_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,27 @@ curl -L -X POST "http://$QDRANT_HOST/collections/test_collection/points/scroll"
--fail -s \
-H 'Content-Type: application/json' \
--data-raw '{ "offset": 2, "limit": 2, "with_vector": true }' | jq

curl -L -X POST "http://$QDRANT_HOST/collections" \
--fail -s \
-H 'Content-Type: application/json' \
--data-raw '{
"change_aliases": {
"actions": [
{
"create_alias": {
"alias_name": "test_alias",
"collection_name": "test_collection"
}
}
]
}
}' | jq

curl -L -X POST "http://$QDRANT_HOST/collections/test_alias/points/search" \
-H 'Content-Type: application/json' \
--fail -s \
--data-raw '{
"vector": [0.2,0.1,0.9,0.7],
"top": 3
}' | jq

0 comments on commit cca7839

Please sign in to comment.