Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multi ordered primary key #633

Merged
merged 3 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 107 additions & 39 deletions src/binder/statement/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,73 +45,121 @@ impl Binder {
}
}

// column name -> index in `ordered_pk_ids` vector
let ordered_pk_pos = Binder::get_extra_pks(constraints);
// whether we have primary key declared by "primary key(c1, c2..) syntax"
let flag = !ordered_pk_pos.is_empty();
// ordered primary ids declared by "primary key(c1, c2..) syntax"
let mut ordered_pk_ids = vec![0; ordered_pk_pos.len()];
let mut col_catalogs = vec![];
let mut ordered_pk_ids = Binder::ordered_pks_from_columns(columns);
let has_pk_from_column = !ordered_pk_ids.is_empty();

for (idx, colum_def) in columns.iter().enumerate() {
let mut is_primary = false;
let mut col = ColumnCatalog::from(colum_def, &mut is_primary);
if flag && is_primary {
// we can't support sql query like
// "create table (a int primary key, b int not null, primary key(b));"
// declaring pk both in column's option and 'primary key(c1..)' syntax"
return Err(BindError::NotSupportedTSQL);
} else if is_primary {
// primary key is declared only in column's options
ordered_pk_ids.push(idx as ColumnId);
} else if ordered_pk_pos.contains_key(col.name()) {
// primary key is declared only by "primary key(c1, c2) syntax"
let pos = *(ordered_pk_pos.get(col.name()).unwrap());
ordered_pk_ids[pos] = idx as ColumnId;
col.set_primary(true); // TODO: remove this line in the future
}
col.set_id(idx as ColumnId);
col_catalogs.push(col);
if ordered_pk_ids.len() > 1 {
// multi primary key should be declared by "primary key(c1, c2...)" syntax
return Err(BindError::NotSupportedTSQL);
}

let pks_name_from_constraints = Binder::pks_name_from_constraints(constraints);
if has_pk_from_column && !pks_name_from_constraints.is_empty() {
// can't get primary key both from "primary key(c1, c2...)" syntax and
// column's option
return Err(BindError::NotSupportedTSQL);
} else if !has_pk_from_column {
ordered_pk_ids =
Binder::ordered_pks_from_constraint(&pks_name_from_constraints, columns);
}

let mut columns: Vec<ColumnCatalog> = columns
.iter()
.enumerate()
.map(|(idx, col)| {
let mut col = ColumnCatalog::from(col);
col.set_id(idx as ColumnId);
col
})
.collect();

// // TODO: when remove `is_primary` filed in `ColumnDesc`,
// // Remove this line and change `columns` above to immut.
for &index in &ordered_pk_ids {
columns[index as usize].set_primary(true);
}

Ok(BoundCreateTable {
database_id: db.id(),
schema_id: schema.id(),
table_name: table_name.into(),
columns: col_catalogs,
columns,
ordered_pk_ids,
})
}
_ => panic!("mismatched statement type"),
}
}

fn get_extra_pks(constraints: &Vec<TableConstraint>) -> HashMap<String, usize> {
let mut ordered_pk_pos: HashMap<String, usize> = HashMap::new();
// get primary keys' id in declared order。
// we use index in columns vector as column id
fn ordered_pks_from_columns(columns: &[ColumnDef]) -> Vec<ColumnId> {
let mut ordered_pks = Vec::new();

for (index, col_def) in columns.iter().enumerate() {
for option_def in &col_def.options {
let is_primary_ = if let ColumnOption::Unique { is_primary } = option_def.option {
is_primary
} else {
false
};
if is_primary_ {
ordered_pks.push(index as ColumnId);
}
}
}
ordered_pks
}

// We have used `pks_name_from_constraints` to get the primary keys' name sorted by declaration
skyzh marked this conversation as resolved.
Show resolved Hide resolved
// order in "primary key(c1, c2..)" syntax. Now we transfer the name to id to get the sorted
// ID
fn ordered_pks_from_constraint(pks_name: &[String], columns: &[ColumnDef]) -> Vec<ColumnId> {
let mut ordered_pks = vec![0; pks_name.len()];
let mut pos_in_ordered_pk = HashMap::new(); // used to get pos from column name
pks_name.iter().enumerate().for_each(|(pos, name)| {
pos_in_ordered_pk.insert(name, pos);
});

columns.iter().enumerate().for_each(|(index, colum_desc)| {
let column_name = &colum_desc.name.value;
if pos_in_ordered_pk.contains_key(column_name) {
let id = index as ColumnId;
let pos = *(pos_in_ordered_pk.get(column_name).unwrap());
ordered_pks[pos] = id;
}
});
ordered_pks
}
// get the primary keys' name sorted by declaration order in "primary key(c1, c2..)" syntax.
fn pks_name_from_constraints(constraints: &[TableConstraint]) -> Vec<String> {
let mut pks_name_from_constraints = vec![];

for constraint in constraints {
match constraint {
TableConstraint::Unique {
is_primary,
columns,
..
} if *is_primary => columns.iter().enumerate().for_each(|(index, indent)| {
ordered_pk_pos.insert(indent.value.clone(), index);
} if *is_primary => columns.iter().for_each(|ident| {
pks_name_from_constraints.push(ident.value.clone());
}),
_ => todo!(),
_ => continue,
}
}
ordered_pk_pos
pks_name_from_constraints
}
}

impl ColumnCatalog {
fn from(cdef: &ColumnDef, is_primary_: &mut bool) -> Self {
impl From<&ColumnDef> for ColumnCatalog {
fn from(cdef: &ColumnDef) -> Self {
let mut is_nullable = true;
let mut is_primary_ = false;
for opt in &cdef.options {
match opt.option {
ColumnOption::Null => is_nullable = true,
ColumnOption::NotNull => is_nullable = false,
ColumnOption::Unique { is_primary } => *is_primary_ = is_primary,
ColumnOption::Unique { is_primary } => is_primary_ = is_primary,
_ => todo!("column options"),
}
}
Expand All @@ -120,7 +168,7 @@ impl ColumnCatalog {
ColumnDesc::new(
DataType::new(cdef.data_type.clone(), is_nullable),
cdef.name.value.to_lowercase(),
*is_primary_,
is_primary_,
),
)
}
Expand All @@ -146,7 +194,8 @@ mod tests {
create table t4 (a int not null, b int not null, c int, primary key(a, b));
create table t5 (a int not null, b int not null, c int, primary key(b, a));
create table t6 (a int primary key, b int not null, c int not null, primary key(b, c));
create table t7 (a int primary key, b int)";
create table t7 (a int primary key, b int);
create table t8 (a int not null, b int, primary key(a));";

let stmts = parse(sql).unwrap();

Expand Down Expand Up @@ -253,7 +302,26 @@ mod tests {
.nullable()
.to_column_primary_key("a".into()),
),
ColumnCatalog::new(1, DataTypeKind::Int(None).nullable().to_column("b".into()),),
ColumnCatalog::new(1, DataTypeKind::Int(None).nullable().to_column("b".into())),
],
ordered_pk_ids: vec![0],
}
);

assert_eq!(
binder.bind_create_table(&stmts[7]).unwrap(),
BoundCreateTable {
database_id: 0,
schema_id: 0,
table_name: "t8".into(),
columns: vec![
ColumnCatalog::new(
0,
DataTypeKind::Int(None)
.not_null()
.to_column_primary_key("a".into()),
),
ColumnCatalog::new(1, DataTypeKind::Int(None).nullable().to_column("b".into())),
],
ordered_pk_ids: vec![0],
}
Expand Down
1 change: 1 addition & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Database {
// TODO: parallelize
let mut outputs: Vec<Chunk> = vec![];
for stmt in stmts {
debug!("{:#?}", stmt);
let stmt = binder.bind(&stmt)?;
debug!("{:#?}", stmt);
let logical_plan = logical_planner.plan(stmt)?;
Expand Down
4 changes: 2 additions & 2 deletions src/optimizer/plan_nodes/logical_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl LogicalCreateTable {
}

/// Get the logical create table's `ordered_pk_ids`.
pub fn ordered_pk_ids(&self) -> Vec<ColumnId> {
self.ordered_pk_ids.clone()
pub fn ordered_pk_ids(&self) -> &[ColumnId] {
self.ordered_pk_ids.as_ref()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Storage for InMemoryStorage {
schema_id: SchemaId,
table_name: &'a str,
column_descs: &'a [ColumnCatalog],
ordered_pk_ids: Vec<ColumnId>,
ordered_pk_ids: &'a [ColumnId],
) -> Self::CreateTableResultFuture<'a> {
async move {
let db = self
Expand All @@ -92,7 +92,7 @@ impl Storage for InMemoryStorage {
table_name.into(),
column_descs.to_vec(),
false,
ordered_pk_ids,
ordered_pk_ids.to_vec(),
)
.map_err(|_| StorageError::Duplicated("table", table_name.into()))?;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait Storage: Sync + Send + 'static {
schema_id: SchemaId,
table_name: &'a str,
column_descs: &'a [ColumnCatalog],
ordered_pk_ids: Vec<ColumnId>,
ordered_pk_ids: &'a [ColumnId],
) -> Self::CreateTableResultFuture<'a>;

fn get_table(&self, table_id: TableRefId) -> StorageResult<Self::TableType>;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/secondary/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ impl SecondaryStorage {
schema_id: SchemaId,
table_name: &str,
column_descs: &[ColumnCatalog],
ordered_pk_ids: Vec<ColumnId>,
ordered_pk_ids: &[ColumnId],
) -> StorageResult<()> {
let entry = CreateTableEntry {
database_id,
schema_id,
table_name: table_name.to_string(),
column_descs: column_descs.to_vec(),
ordered_pk_ids,
ordered_pk_ids: ordered_pk_ids.to_vec(),
};

// persist to manifest first
Expand Down
2 changes: 1 addition & 1 deletion src/storage/secondary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Storage for SecondaryStorage {
schema_id: SchemaId,
table_name: &'a str,
column_descs: &'a [ColumnCatalog],
ordered_pk_ids: Vec<ColumnId>,
ordered_pk_ids: &'a [ColumnId],
) -> Self::CreateTableResultFuture<'a> {
async move {
self.create_table_inner(
Expand Down