Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
/// Key used by lance to mark a field as a blob
/// TODO: Use Arrow extension mechanism instead?
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
/// Key used by Lance to record the blob column format version.
pub const BLOB_VERSION_META_KEY: &str = "lance-encoding:blob-version";

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
30 changes: 29 additions & 1 deletion rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ mod field;
mod schema;

use crate::{Error, Result};
pub use field::{Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions};
pub use field::{
BlobVersion, Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions,
};
pub use schema::{
escape_field_path_for_project, format_field_path, parse_field_path, FieldRef, OnMissing,
Projectable, Projection, Schema,
Expand All @@ -44,6 +46,32 @@ pub static BLOB_DESC_FIELD: LazyLock<ArrowField> = LazyLock::new(|| {
pub static BLOB_DESC_LANCE_FIELD: LazyLock<Field> =
LazyLock::new(|| Field::try_from(&*BLOB_DESC_FIELD).unwrap());

pub static BLOB_V2_DESC_FIELDS: LazyLock<Fields> = LazyLock::new(|| {
Fields::from(vec![
ArrowField::new("kind", DataType::UInt8, false),
ArrowField::new("position", DataType::UInt64, true),
ArrowField::new("size", DataType::UInt64, true),
ArrowField::new("blob_id", DataType::UInt32, true),
ArrowField::new("blob_uri", DataType::Utf8, true),
])
});

pub static BLOB_V2_DESC_TYPE: LazyLock<DataType> =
LazyLock::new(|| DataType::Struct(BLOB_V2_DESC_FIELDS.clone()));

pub static BLOB_V2_DESC_FIELD: LazyLock<ArrowField> = LazyLock::new(|| {
ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([
(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()),
(
lance_arrow::BLOB_VERSION_META_KEY.to_string(),
"2".to_string(),
),
]))
});

pub static BLOB_V2_DESC_LANCE_FIELD: LazyLock<Field> =
LazyLock::new(|| Field::try_from(&*BLOB_V2_DESC_FIELD).unwrap());
Comment on lines +72 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just call this BLOBFILE_DESC_FIELD? This way it is clear we are not replacing BLOB_DESC_FIELD (we still need it for inline case)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current idea is to have a blob v2 concept that includes all supported blob types like 'inline', 'packed', or 'dedicated'. This means blob v2 will cover all the uses of blob v1, which is just inline. I think this makes compatibility easier without changing any logic on the inline side.

We’ll only reuse the file encoding from blob v1 (inline), but all the table schema will be in blob v2. With this change, we can do the version check early at the table level instead of at the time of writing data.

Btw, I’m not strong on this. If you prefer to just keep packed/dedicated blob types as an extension to inline, I’m fine with making this change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move this dicussion to #5163


/// LogicalType is a string presentation of arrow type.
/// to be serialized into protobuf.
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
Expand Down
102 changes: 99 additions & 3 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use super::{
schema::{compare_fields, explain_fields_difference},
Dictionary, LogicalType, Projection,
};
use crate::{datatypes::BLOB_DESC_LANCE_FIELD, Error, Result};
use crate::{
datatypes::{BLOB_DESC_LANCE_FIELD, BLOB_V2_DESC_LANCE_FIELD},
Error, Result,
};

/// Use this config key in Arrow field metadata to indicate a column is a part of the primary key.
/// The value can be any true values like `true`, `1`, `yes` (case-insensitive).
Expand Down Expand Up @@ -69,6 +72,32 @@ pub struct SchemaCompareOptions {
/// Allow out of order fields (default false)
pub ignore_field_order: bool,
}

/// Blob column format version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BlobVersion {
/// Legacy blob format (position / size only).
#[default]
V1,
/// Blob v2 struct format.
V2,
}

impl BlobVersion {
pub fn from_metadata_value(value: Option<&str>) -> Self {
match value {
Some("2") => Self::V2,
_ => Self::V1,
}
}

pub fn metadata_value(self) -> Option<&'static str> {
match self {
Self::V1 => None,
Self::V2 => Some("2"),
}
}
}
/// Encoding enum.
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
pub enum Encoding {
Expand Down Expand Up @@ -457,14 +486,45 @@ impl Field {
self.metadata.contains_key(BLOB_META_KEY)
}

pub fn blob_version(&self) -> BlobVersion {
if !self.is_blob() {
return BlobVersion::V1;
}
let value = self.metadata.get(BLOB_VERSION_META_KEY).map(|s| s.as_str());
BlobVersion::from_metadata_value(value)
}

pub fn set_blob_version(&mut self, version: BlobVersion) {
if !self.is_blob() {
return;
}
match version.metadata_value() {
Some(value) => {
self.metadata
.insert(BLOB_VERSION_META_KEY.to_string(), value.to_string());
}
None => {
self.metadata.remove(BLOB_VERSION_META_KEY);
}
}
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn into_unloaded(mut self) -> Self {
if self.data_type().is_binary_like() && self.is_blob() {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
match self.blob_version() {
BlobVersion::V2 => {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
}
BlobVersion::V1 => {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
}
}
}
self
}
Expand Down Expand Up @@ -1466,4 +1526,40 @@ mod tests {
assert!(f1.compare_with_options(&f2, &ignore_nullability));
assert!(f2.compare_with_options(&f1, &ignore_nullability));
}

#[test]
fn blob_version_detection_and_setting() {
let mut metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field_v1: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata.clone())
.try_into()
.unwrap();
assert_eq!(field_v1.blob_version(), BlobVersion::V1);

metadata.insert(BLOB_VERSION_META_KEY.to_string(), "2".to_string());
let mut field_v2: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
assert_eq!(field_v2.blob_version(), BlobVersion::V2);

field_v2.set_blob_version(BlobVersion::V1);
assert_eq!(field_v2.blob_version(), BlobVersion::V1);
assert!(!field_v2.metadata.contains_key(BLOB_VERSION_META_KEY));
}

#[test]
fn blob_into_unloaded_selects_v2_layout() {
let metadata = HashMap::from([
(BLOB_META_KEY.to_string(), "true".to_string()),
(BLOB_VERSION_META_KEY.to_string(), "2".to_string()),
]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
let unloaded = field.into_unloaded();
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}
}
54 changes: 53 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf;
use lance_arrow::*;
use snafu::location;

use super::field::{Field, OnTypeMismatch, SchemaCompareOptions};
use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions};
use crate::{Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, WILDCARD};

/// Lance Schema.
Expand Down Expand Up @@ -146,6 +146,13 @@ impl Schema {
}
}

pub fn apply_blob_version(&mut self, version: BlobVersion, allow_change: bool) -> Result<()> {
for field in self.fields.iter_mut() {
apply_blob_version_to_field(field, version, allow_change)?;
}
Ok(())
}

pub fn has_dictionary_types(&self) -> bool {
self.fields.iter().any(|f| f.has_dictionary_types())
}
Expand Down Expand Up @@ -880,6 +887,31 @@ fn explain_metadata_difference(
}
}

fn apply_blob_version_to_field(
field: &mut Field,
version: BlobVersion,
allow_change: bool,
) -> Result<()> {
if field.is_blob() {
let current = field.blob_version();
if current != version && !allow_change {
return Err(Error::InvalidInput {
source: format!(
"Blob column '{}' uses version {:?}, expected {:?}",
field.name, current, version
)
.into(),
location: location!(),
});
}
field.set_blob_version(version);
}
for child in field.children.iter_mut() {
apply_blob_version_to_field(child, version, allow_change)?;
}
Ok(())
}

/// What to do when a column is missing in the schema
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnMissing {
Expand Down Expand Up @@ -1478,6 +1510,7 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datatypes::BlobVersion;

#[test]
fn test_resolve_with_quoted_fields() {
Expand Down Expand Up @@ -2498,4 +2531,23 @@ mod tests {
.contains(error_message_contains[idx]));
}
}

#[test]
fn apply_blob_version_requires_consistent_metadata() {
let arrow_field = ArrowField::new("blob", ArrowDataType::LargeBinary, true).with_metadata(
HashMap::from([
(BLOB_META_KEY.to_string(), "true".to_string()),
(BLOB_VERSION_META_KEY.to_string(), "2".to_string()),
]),
);
let mut schema =
Schema::try_from(&ArrowSchema::new(vec![arrow_field])).expect("schema creation");

assert!(schema.apply_blob_version(BlobVersion::V1, false).is_err());

schema
.apply_blob_version(BlobVersion::V1, true)
.expect("allow metadata change when permitted");
assert_eq!(schema.fields[0].blob_version(), BlobVersion::V1);
}
}
17 changes: 15 additions & 2 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{Stream, StreamExt, TryStreamExt};
use lance_core::datatypes::{
NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions,
BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions,
};
use lance_core::error::LanceOptionExt;
use lance_core::utils::tempfile::TempDir;
Expand Down Expand Up @@ -41,6 +41,14 @@ use super::transaction::Transaction;
use super::utils::SchemaAdapter;
use super::DATA_DIR;

fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion {
if storage_version >= LanceFileVersion::V2_2 {
BlobVersion::V2
} else {
BlobVersion::V1
}
}

mod commit;
pub mod delete;
mod insert;
Expand Down Expand Up @@ -580,7 +588,9 @@ pub async fn write_fragments_internal(
// Make sure the max rows per group is not larger than the max rows per file
params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file);

let (schema, storage_version) = if let Some(dataset) = dataset {
let allow_blob_version_change =
dataset.is_none() || matches!(params.mode, WriteMode::Overwrite);
let (mut schema, storage_version) = if let Some(dataset) = dataset {
match params.mode {
WriteMode::Append | WriteMode::Create => {
// Append mode, so we need to check compatibility
Expand Down Expand Up @@ -627,6 +637,9 @@ pub async fn write_fragments_internal(
(converted_schema, params.storage_version_or_default())
};

let target_blob_version = blob_version_for(storage_version);
schema.apply_blob_version(target_blob_version, allow_blob_version_change)?;

let fragments = do_write_fragments(
object_store,
base_dir,
Expand Down
Loading