Skip to content

Commit

Permalink
feat: deregister table for MemoryCatalogManager (GreptimeTeam#620)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored and paomian committed Oct 19, 2023
1 parent 83cb5cb commit bb0669d
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 47 deletions.
1 change: 1 addition & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8
mysql_addr = '127.0.0.1:4406'
mysql_runtime_size = 4
enable_memory_catalog = false

[storage]
type = 'File'
Expand Down
1 change: 1 addition & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ node_id = 0
mode = 'standalone'
http_addr = '127.0.0.1:4000'
wal_dir = '/tmp/greptimedb/wal/'
enable_memory_catalog = false

[storage]
type = 'File'
Expand Down
11 changes: 10 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Operation {} not implemented yet", operation))]
Unimplemented {
operation: String,
backtrace: Backtrace,
},

#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
OpenTable {
table_info: String,
Expand Down Expand Up @@ -216,11 +222,12 @@ impl ErrorExt for Error {
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,

Error::RegisterTable { .. } => StatusCode::Internal,

Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::SystemCatalogTypeMismatch { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),

Error::RegisterTable { .. } => StatusCode::Internal,
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
Error::SchemaExists { .. } => StatusCode::InvalidArguments,

Expand All @@ -235,6 +242,8 @@ impl ErrorExt for Error {
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),

Error::Unimplemented { .. } => StatusCode::Unsupported,
}
}

Expand Down
23 changes: 18 additions & 5 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,17 @@ pub trait CatalogManager: CatalogList {
/// Starts a catalog manager.
async fn start(&self) -> Result<()>;

/// Registers a table given given catalog/schema to catalog manager,
/// returns table registered.
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize>;
/// Registers a table within given catalog/schema to catalog manager,
/// returns whether the table registered.
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;

/// Register a schema with catalog name and schema name.
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize>;
/// Deregisters a table within given catalog/schema to catalog manager,
/// returns whether the table deregistered.
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool>;

/// Register a schema with catalog name and schema name. Retuens whether the
/// schema registered.
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
Expand Down Expand Up @@ -123,6 +128,14 @@ pub struct RegisterTableRequest {
pub table: TableRef,
}

#[derive(Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
pub schema: String,
pub table_name: String,
pub table_id: TableId,
}

#[derive(Debug, Clone)]
pub struct RegisterSchemaRequest {
pub catalog: String,
Expand Down
21 changes: 14 additions & 7 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use table::TableRef;
use crate::error::{
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result,
SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu,
TableExistsSnafu, TableNotFoundSnafu,
TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu,
};
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use crate::system::{
Expand All @@ -46,8 +46,8 @@ use crate::system::{
use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
};

/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
Expand Down Expand Up @@ -309,7 +309,7 @@ impl CatalogManager for LocalCatalogManager {
self.init().await
}

async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let started = self.init_lock.lock().await;

ensure!(
Expand Down Expand Up @@ -349,10 +349,17 @@ impl CatalogManager for LocalCatalogManager {
.await?;

schema.register_table(request.table_name, request.table)?;
Ok(1)
Ok(true)
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
}
.fail()
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let started = self.init_lock.lock().await;
ensure!(
*started,
Expand All @@ -377,7 +384,7 @@ impl CatalogManager for LocalCatalogManager {
.register_schema(request.catalog, schema_name.clone())
.await?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
Ok(1)
Ok(true)
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
Expand Down
64 changes: 57 additions & 7 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::schema::SchemaProvider;
use crate::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
};

/// Simple in-memory list of catalogs
Expand Down Expand Up @@ -69,7 +69,7 @@ impl CatalogManager for MemoryCatalogManager {
Ok(())
}

async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
Expand All @@ -84,22 +84,41 @@ impl CatalogManager for MemoryCatalogManager {
})?;
schema
.register_table(request.table_name, request.table)
.map(|v| if v.is_some() { 0 } else { 1 })
.map(|v| v.is_none())
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.with_context(|| SchemaNotFoundSnafu {
schema_info: format!("{}.{}", &request.catalog, &request.schema),
})?;
schema
.deregister_table(&request.table_name)
.map(|v| v.is_some())
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
Ok(1)
Ok(true)
}

async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
unimplemented!()
// TODO(ruihang): support register system table request
Ok(())
}

fn schema(&self, catalog: &str, schema: &str) -> Result<Option<SchemaProviderRef>> {
Expand Down Expand Up @@ -340,4 +359,35 @@ mod tests {
.downcast_ref::<MemoryCatalogManager>()
.unwrap();
}

#[tokio::test]
pub async fn test_catalog_deregister_table() {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();

let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "numbers".to_string(),
table_id: 2333,
table: Arc::new(NumbersTable::default()),
};
catalog.register_table(register_table_req).await.unwrap();
assert!(schema.table_exist("numbers").unwrap());

let deregister_table_req = DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "numbers".to_string(),
table_id: 2333,
};
catalog
.deregister_table(deregister_table_req)
.await
.unwrap();
assert!(!schema.table_exist("numbers").unwrap());
}
}
21 changes: 14 additions & 7 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use tokio::sync::Mutex;

use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu,
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
SchemaProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
};

/// Catalog manager based on metasrv.
Expand Down Expand Up @@ -411,7 +411,7 @@ impl CatalogManager for RemoteCatalogManager {
Ok(())
}

async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
Expand All @@ -430,18 +430,25 @@ impl CatalogManager for RemoteCatalogManager {
.fail();
}
schema_provider.register_table(request.table_name, request.table)?;
Ok(1)
Ok(true)
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<usize> {
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
}
.fail()
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let schema_provider = self.new_schema_provider(&catalog_name, &schema_name);
catalog_provider.register_schema(schema_name, schema_provider)?;
Ok(1)
Ok(true)
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod tests {
table_id,
table,
};
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
assert!(catalog_manager.register_table(reg_req).await.unwrap());
assert_eq!(
HashSet::from([table_name, "numbers".to_string()]),
default_schema
Expand Down Expand Up @@ -287,7 +287,7 @@ mod tests {
.register_schema(schema_name.clone(), schema.clone())
.expect("Register schema should not fail");
assert!(prev.is_none());
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
assert!(catalog_manager.register_table(reg_req).await.unwrap());

assert_eq!(
HashSet::from([schema_name.clone()]),
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct StandaloneOptions {
pub mode: Mode,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
}

impl Default for StandaloneOptions {
Expand All @@ -86,6 +87,7 @@ impl Default for StandaloneOptions {
mode: Mode::Standalone,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
}
}
}
Expand All @@ -110,6 +112,7 @@ impl StandaloneOptions {
DatanodeOptions {
wal_dir: self.wal_dir,
storage: self.storage,
enable_memory_catalog: self.enable_memory_catalog,
..Default::default()
}
}
Expand All @@ -131,18 +134,22 @@ struct StartCommand {
influxdb_enable: bool,
#[clap(short, long)]
config_file: Option<String>,
#[clap(short = 'm', long = "memory-catalog")]
enable_memory_catalog: bool,
}

impl StartCommand {
async fn run(self) -> Result<()> {
let enable_memory_catalog = self.enable_memory_catalog;
let config_file = self.config_file.clone();
let fe_opts = FrontendOptions::try_from(self)?;
let dn_opts: DatanodeOptions = {
let opts: StandaloneOptions = if let Some(path) = config_file {
let mut opts: StandaloneOptions = if let Some(path) = config_file {
toml_loader::from_file!(&path)?
} else {
StandaloneOptions::default()
};
opts.enable_memory_catalog = enable_memory_catalog;
opts.datanode_options()
};

Expand Down Expand Up @@ -264,6 +271,7 @@ mod tests {
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
influxdb_enable: false,
enable_memory_catalog: false,
};

let fe_opts = FrontendOptions::try_from(cmd).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct DatanodeOptions {
pub meta_client_opts: Option<MetaClientOpts>,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub mode: Mode,
}

Expand All @@ -61,6 +62,7 @@ impl Default for DatanodeOptions {
meta_client_opts: None,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
mode: Mode::Standalone,
}
}
Expand Down
Loading

0 comments on commit bb0669d

Please sign in to comment.