From 3b243897d70811f6e487bc3451825987713bd946 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Mon, 24 Nov 2025 10:09:04 +0000 Subject: [PATCH 1/3] stabilising --- crates/catalog/hms/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 4bb18a0ba9..e9df1be735 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -374,7 +374,7 @@ pub(crate) fn create_lock_request( component: vec![component], txnid: None, user: FastStr::from(whoami::username()), - hostname: FastStr::from(whoami::fallible::hostname()), + hostname: FastStr::from(whoami::fallible::hostname().unwrap()), agent_info: None, } } From 47187d9728ec56ffddac9f7c0f8836dd21d52737 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Mon, 24 Nov 2025 23:26:46 +0000 Subject: [PATCH 2/3] fix issues with hms lock flow --- crates/catalog/hms/src/catalog.rs | 19 ++++++++++++------- crates/catalog/hms/src/utils.rs | 2 +- crates/catalog/hms/tests/hms_catalog_test.rs | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index af48bd8330..512301d9cc 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -54,7 +54,10 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled"; /// HMS Environment Context +/// These constants are reserved for future use with optimistic locking +#[allow(dead_code)] const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key"; +#[allow(dead_code)] const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value"; /// Builder for [`RestCatalog`]. @@ -616,7 +619,7 @@ impl Catalog for HmsCatalog { let tbl_name = ident.name.clone(); // if HMS_HIVE_LOCKS_DISABLED is set - if let Some(tt) = &self.config.props.get(HMS_HIVE_LOCKS_DISABLED) { + if let Some(_tt) = &self.config.props.get(HMS_HIVE_LOCKS_DISABLED) { // Do alter table with env context Err(Error::new(ErrorKind::Unexpected, "Optimistic locks are not supported yet")) } else { @@ -656,21 +659,23 @@ impl Catalog for HmsCatalog { .metadata() .write_to( staged_table.file_io(), - staged_table.metadata().metadata_location(), + staged_table.metadata_location_result()?, ) .await?; - let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?; + let new_hive_table = update_hive_table_from_table(&hive_table, &staged_table)?; - let updated = self.client.0.alter_table( + self.client.0.alter_table( db_name.clone().into(), tbl_name.clone().into(), new_hive_table, ).await - .map(from_thrift_exception) - .map_err(from_thrift_error)??; + .map_err(from_thrift_error)?; // unlock the table after alter table - &self.client.0.unlock(lock.lockid).await.map(from_thrift_error)?; + self.client.0.unlock(hive_metastore::UnlockRequest { lockid: lock.lockid }) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; Ok(staged_table) } } diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index e9df1be735..9b3d71bf7b 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -187,7 +187,7 @@ pub(crate) fn update_hive_table_from_table( if k == METADATA_LOCATION || k == TABLE_TYPE || k == EXTERNAL { continue; } - params.insert(k.as_str().clone().into(), v.as_str().clone().into()); + params.insert(FastStr::from_string(k.to_string()), FastStr::from_string(v.to_string())); } params.insert(FastStr::from(EXTERNAL), FastStr::from("TRUE")); diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 2748bcc496..926c0663ee 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -34,7 +34,7 @@ use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; use tokio::time::sleep; use tracing::info; -use iceberg::transaction::Transaction; +use iceberg::transaction::{Transaction, ApplyTransactionAction}; const HMS_CATALOG_PORT: u16 = 9083; const MINIO_PORT: u16 = 9000; From 7ab5e6d6792afdf31dcb2c1b3acd3015ad49b8f8 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Tue, 25 Nov 2025 23:18:05 +0000 Subject: [PATCH 3/3] refactoring for optimistic locks --- crates/catalog/hms/src/catalog.rs | 208 +++++++++++++++---- crates/catalog/hms/src/utils.rs | 8 +- crates/catalog/hms/tests/hms_catalog_test.rs | 78 ++++++- 3 files changed, 235 insertions(+), 59 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 512301d9cc..6056cc7dad 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -54,13 +54,10 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled"; /// HMS Environment Context -/// These constants are reserved for future use with optimistic locking -#[allow(dead_code)] const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key"; -#[allow(dead_code)] const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value"; -/// Builder for [`RestCatalog`]. +/// Builder for [`HmsCatalog`]. #[derive(Debug)] pub struct HmsCatalogBuilder(HmsCatalogConfig); @@ -177,6 +174,43 @@ impl Debug for HmsCatalog { } } +/// RAII guard for HMS table locks. Automatically releases the lock when dropped. +struct HmsLockGuard { + client: ThriftHiveMetastoreClient, + lockid: i64, +} + +impl HmsLockGuard { + async fn acquire( + client: &ThriftHiveMetastoreClient, + db_name: &str, + tbl_name: &str, + ) -> Result { + let lock = client + .lock(create_lock_request(db_name, tbl_name)) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + Ok(Self { + client: client.clone(), + lockid: lock.lockid, + }) + } +} + +impl Drop for HmsLockGuard { + fn drop(&mut self) { + let client = self.client.clone(); + let lockid = self.lockid; + tokio::spawn(async move { + let _ = client + .unlock(hive_metastore::UnlockRequest { lockid }) + .await; + }); + } +} + impl HmsCatalog { /// Create a new hms catalog. fn new(config: HmsCatalogConfig) -> Result { @@ -218,6 +252,64 @@ impl HmsCatalog { pub fn file_io(&self) -> FileIO { self.file_io.clone() } + + /// Applies a commit to a table and prepares the update for HMS. + /// # Returns + /// A tuple of (staged_table, new_hive_table) ready for HMS alter_table operation + async fn apply_and_prepare_update( + &self, + commit: TableCommit, + db_name: &str, + tbl_name: &str, + hive_table: &hive_metastore::Table, + ) -> Result<(Table, hive_metastore::Table)> { + let metadata_location = get_metadata_location(&hive_table.parameters)?; + + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + + let cur_table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name.to_string()), + tbl_name.to_string(), + )) + .build()?; + + let staged_table = commit.apply(cur_table)?; + staged_table + .metadata() + .write_to( + staged_table.file_io(), + staged_table.metadata_location_result()?, + ) + .await?; + + let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?; + + Ok((staged_table, new_hive_table)) + } + + /// Builds an EnvironmentContext for optimistic locking with HMS. + /// + /// The context includes the expected metadata_location, which HMS will use + /// to validate that the table hasn't been modified concurrently. + fn build_environment_context(metadata_location: &str) -> hive_metastore::EnvironmentContext { + let mut env_context_properties = pilota::AHashMap::new(); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_KEY.into(), + "metadata_location".into(), + ); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_VALUE.into(), + pilota::FastStr::from_string(metadata_location.to_string()), + ); + + hive_metastore::EnvironmentContext { + properties: Some(env_context_properties), + } + } } #[async_trait] @@ -613,25 +705,43 @@ impl Catalog for HmsCatalog { )) } + /// Updates an existing table by applying a commit operation. + /// + /// This method supports two update strategies depending on the catalog configuration: + /// + /// **Optimistic Locking** (when `hive_locks_disabled` is set): + /// - Retrieves the current table state from HMS without acquiring locks + /// - Constructs an `EnvironmentContext` with the expected metadata location + /// - Uses `alter_table_with_environment_context` to perform an atomic + /// compare-and-swap operation. + /// - HMS will reject the update if the metadata location has changed, + /// indicating a concurrent modification + /// + /// **Traditional Locking** (default): + /// - Acquires an exclusive HMS lock on the table before making changes + /// - Retrieves the current table state + /// - Applies the commit and writes new metadata + /// - Updates the table in HMS using `alter_table` + /// - Releases the lock after the operation completes + /// + /// # Returns + /// A `Result` wrapping the updated `Table` object with new metadata. + /// + /// # Errors + /// This function may return an error in several scenarios: + /// - Failure to validate the namespace or table identifier + /// - Inability to acquire a lock (traditional locking mode) + /// - Failure to retrieve the table from HMS + /// - Errors reading or writing table metadata + /// - HMS rejects the update due to concurrent modification (optimistic locking) + /// - Errors from the underlying Thrift communication with HMS async fn update_table(&self, commit: TableCommit) -> Result { let ident = commit.identifier().clone(); let db_name = validate_namespace(ident.namespace())?; let tbl_name = ident.name.clone(); - // if HMS_HIVE_LOCKS_DISABLED is set - if let Some(_tt) = &self.config.props.get(HMS_HIVE_LOCKS_DISABLED) { - // Do alter table with env context - Err(Error::new(ErrorKind::Unexpected, "Optimistic locks are not supported yet")) - } else { - // start with trying to acquire a lock - let lock = &self - .client - .0 - .lock(create_lock_request(&db_name, &tbl_name)) - .await - .map(from_thrift_exception) - .map_err(from_thrift_error)??; - + if self.config.props.contains_key(HMS_HIVE_LOCKS_DISABLED) { + // Optimistic locking path: read first, then validate with EnvironmentContext let hive_table = self .client .0 @@ -641,42 +751,48 @@ impl Catalog for HmsCatalog { .map_err(from_thrift_error)??; let metadata_location = get_metadata_location(&hive_table.parameters)?; + let env_context = Self::build_environment_context(&metadata_location); - let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + let (staged_table, new_hive_table) = + self.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; - let cur_table = Table::builder() - .file_io(self.file_io()) - .metadata_location(metadata_location) - .metadata(metadata) - .identifier(TableIdent::new( - NamespaceIdent::new(db_name.clone()), - tbl_name.clone(), - )) - .build()?; - - let staged_table = commit.apply(cur_table)?; - staged_table - .metadata() - .write_to( - staged_table.file_io(), - staged_table.metadata_location_result()?, + self.client + .0 + .alter_table_with_environment_context( + db_name.into(), + tbl_name.into(), + new_hive_table, + env_context, ) - .await?; - let new_hive_table = update_hive_table_from_table(&hive_table, &staged_table)?; - - self.client.0.alter_table( - db_name.clone().into(), - tbl_name.clone().into(), - new_hive_table, - ).await - .map_err(from_thrift_error)?; + .await + .map_err(from_thrift_error)?; - // unlock the table after alter table - self.client.0.unlock(hive_metastore::UnlockRequest { lockid: lock.lockid }) + Ok(staged_table) + } else { + // Traditional locking path: acquire lock first, then read + let _guard = HmsLockGuard::acquire(&self.client.0, &db_name, &tbl_name).await?; + + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), tbl_name.clone().into()) .await .map(from_thrift_exception) .map_err(from_thrift_error)??; + + let (staged_table, new_hive_table) = + self.apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; + + self.client + .0 + .alter_table(db_name.into(), tbl_name.into(), new_hive_table) + .await + .map_err(from_thrift_error)?; + Ok(staged_table) + // Lock automatically released here via Drop } } } diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 9b3d71bf7b..50efe92df4 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -357,14 +357,14 @@ fn get_current_time() -> Result { } pub(crate) fn create_lock_request( - tbl_name: &String, - db_name: &String, + db_name: &str, + tbl_name: &str, ) -> hive_metastore::LockRequest { let component = hive_metastore::LockComponent { r#type: hive_metastore::LockType::EXCLUSIVE, level: hive_metastore::LockLevel::TABLE, - dbname: db_name.clone().into(), - tablename: Some(tbl_name.clone().into()), + dbname: FastStr::from_string(db_name.to_string()), + tablename: Some(FastStr::from_string(tbl_name.to_string())), partitionname: None, operation_type: None, is_acid: Some(true), diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 926c0663ee..4c4bd05ddd 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -59,6 +59,17 @@ fn after_all() { } async fn get_catalog() -> HmsCatalog { + get_catalog_with_props(HashMap::new()).await +} + +async fn get_catalog_with_optimistic_locking() -> HmsCatalog { + use iceberg_catalog_hms::HMS_HIVE_LOCKS_DISABLED; + let mut extra_props = HashMap::new(); + extra_props.insert(HMS_HIVE_LOCKS_DISABLED.to_string(), "true".to_string()); + get_catalog_with_props(extra_props).await +} + +async fn get_catalog_with_props(extra_props: HashMap) -> HmsCatalog { set_up(); let (hms_catalog_ip, minio_ip) = { @@ -82,7 +93,7 @@ async fn get_catalog() -> HmsCatalog { sleep(std::time::Duration::from_millis(1000)).await; } - let props = HashMap::from([ + let mut props = HashMap::from([ ( HMS_CATALOG_PROP_URI.to_string(), hms_socket_addr.to_string(), @@ -104,6 +115,9 @@ async fn get_catalog() -> HmsCatalog { (S3_REGION.to_string(), "us-east-1".to_string()), ]); + // Merge in extra properties + props.extend(extra_props); + // Wait for bucket to actually exist let file_io = iceberg::io::FileIO::from_path("s3a://") .unwrap() @@ -426,34 +440,26 @@ async fn test_update_table() -> Result<()> { assert_eq!(table.metadata_location(), expected.metadata_location()); assert_eq!(table.metadata(), expected.metadata()); let original_metadata_location = table.metadata_location(); - - // Update table properties using the transaction let tx = Transaction::new(&table); let tx = tx .update_table_properties() .set("test_property".to_string(), "test_value".to_string()) .apply(tx)?; - // Commit the transaction to the catalog let updated_table = tx.commit(&catalog).await?; - // Verify the update was successful assert_eq!( updated_table.metadata().properties().get("test_property"), Some(&"test_value".to_string()) ); - - // Verify the metadata location has been updated assert_ne!( updated_table.metadata_location(), original_metadata_location, "Metadata location should be updated after commit" ); - // Load the table again from the catalog to verify changes were persisted let reloaded_table = catalog.load_table(table.identifier()).await?; - // Verify the reloaded table matches the updated table assert_eq!( reloaded_table.metadata().properties().get("test_property"), Some(&"test_value".to_string()) @@ -466,3 +472,57 @@ async fn test_update_table() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_update_table_with_optimistic_locking() -> Result<()> { + let catalog = get_catalog_with_optimistic_locking().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table_optimistic".into())); + set_test_namespace(&catalog, namespace.name()).await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + let original_metadata_location = table.metadata_location(); + + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set("test_property_optimistic".to_string(), "test_value_optimistic".to_string()) + .apply(tx)?; + + let updated_table = tx.commit(&catalog).await?; + + assert_eq!( + updated_table.metadata().properties().get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit with optimistic locking" + ); + + let reloaded_table = catalog.load_table(table.identifier()).await?; + assert_eq!( + reloaded_table.metadata().properties().get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +}