Skip to content

Commit

Permalink
create altered table
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Jan 4, 2023
1 parent 5a74745 commit 6f39f43
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 28 deletions.
82 changes: 62 additions & 20 deletions src/frontend/src/handler/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_sqlparser::ast::{ColumnDef, ObjectName};
use risingwave_sqlparser::ast::{ColumnDef, Ident, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;

use super::create_table::{gen_create_table_plan, ColumnIdGenerator};
use super::{HandlerArgs, RwPgResponse};
use crate::binder::Relation;
use crate::Binder;
use crate::{build_graph, Binder, OptimizerContext};

#[expect(clippy::unused_async)]
pub async fn handle_add_column(
Expand All @@ -37,34 +41,72 @@ pub async fn handle_add_column(
}
};

let [mut definition]: [_; 1] = Parser::parse_sql(&catalog.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable { columns, .. } = &mut definition else {
panic!()
};

// Duplicated names can actually be checked by `StreamMaterialize`. We do here for better error
// reporting.
let new_column_name = new_column.name.real_value();
if catalog
.columns()
if columns
.iter()
.any(|c| c.name() == new_column_name)
.any(|c| c.name.real_value() == new_column_name)
{
Err(ErrorCode::InvalidInputSyntax(format!(
"column \"{}\" of table \"{}\" already exists",
new_column_name, table_name
)))?
}

// let _new_column = {
// let column_id_offset = catalog.version.unwrap().next_column_id.get_id();
// let (columns, pk_id) = bind_sql_columns_with_offset(vec![new_column], column_id_offset)?;
// if pk_id.is_some() {
// Err(ErrorCode::NotImplemented(
// "add a primary key column".to_owned(),
// 6903.into(),
// ))?
// }
// columns.into_iter().exactly_one().unwrap()
// };
columns.push(new_column);

let handler_args = HandlerArgs::new(session.clone(), &definition, "")?;
let mut col_id_gen = ColumnIdGenerator::new(
catalog.columns(),
catalog.version.as_ref().unwrap().next_column_id,
);
let Statement::CreateTable {
columns,
constraints,
..
} = definition else {
panic!();
};

let new_name = {
let mut idents = table_name.0;
let last_ident = idents.last_mut().unwrap();
*last_ident = Ident::new(format!("{}_add_column_{}", last_ident, new_column_name));
ObjectName(idents)
};

let (graph, source, table) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) = gen_create_table_plan(
&session,
context.into(),
new_name,
columns,
constraints,
&mut col_id_gen,
)?;
let graph = build_graph(plan);

(graph, source, table)
};

let catalog_writer = session.env().catalog_writer();

catalog_writer.create_table(source, table, graph).await?;

Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))

Err(ErrorCode::NotImplemented(
"ADD COLUMN".to_owned(),
6903.into(),
))?
// Err(ErrorCode::NotImplemented(
// "ADD COLUMN".to_owned(),
// 6903.into(),
// ))?
}
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::ColumnId;
use crate::handler::create_table::ColumnIdGenerator;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME;
use crate::optimizer::OptimizerContext;
Expand Down Expand Up @@ -96,7 +97,8 @@ pub async fn handle_create_source(
.to_lowercase()
.eq("kafka");

let (mut column_descs, pk_column_id_from_columns) = bind_sql_columns(stmt.columns)?;
let (mut column_descs, pk_column_id_from_columns) =
bind_sql_columns(stmt.columns, &mut ColumnIdGenerator::initial())?;

// Add hidden column `_rw_kafka_timestamp` to each message
if is_kafka && !is_materialized {
Expand Down
51 changes: 46 additions & 5 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,54 @@ pub enum DmlFlag {
AppendOnly,
}

pub struct ColumnIdGenerator {
pub existing: HashMap<String, ColumnId>,
pub next_id: ColumnId,
}

impl ColumnIdGenerator {
pub fn new(existing_columns: &[ColumnCatalog], next_column_id: ColumnId) -> Self {
let existing = existing_columns
.iter()
.map(|col| (col.name().to_owned(), col.column_id()))
.collect();
Self {
existing,
next_id: next_column_id,
}
}

pub fn initial() -> Self {
Self {
existing: HashMap::new(),
next_id: ColumnId::from(USER_COLUMN_ID_OFFSET),
}
}

pub fn gen(&mut self, name: &str) -> ColumnId {
if let Some(id) = self.existing.get(name) {
*id
} else {
let id = self.next_id;
self.next_id = self.next_id.next();
id
}
}
}

/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
/// If a column is marked as `primary key`, its `ColumnId` is also returned.
/// This primary key is not combined with table constraints yet.
pub fn bind_sql_columns(columns: Vec<ColumnDef>) -> Result<(Vec<ColumnDesc>, Option<ColumnId>)> {
pub fn bind_sql_columns(
columns: Vec<ColumnDef>,
col_id_gen: &mut ColumnIdGenerator,
) -> Result<(Vec<ColumnDesc>, Option<ColumnId>)> {
// In `ColumnDef`, pk can contain only one column. So we use `Option` rather than `Vec`.
let mut pk_column_id = None;
let mut column_descs = Vec::with_capacity(columns.len());

for (i, column) in columns.into_iter().enumerate() {
let column_id = ColumnId::from(i as i32 + USER_COLUMN_ID_OFFSET);
for column in columns {
let column_id = col_id_gen.gen(&column.name.real_value());
// Destruct to make sure all fields are properly handled rather than ignored.
// Do NOT use `..` to ignore fields you do not want to deal with.
// Reject them with a clear NotImplemented error.
Expand Down Expand Up @@ -218,9 +256,10 @@ pub(crate) fn gen_create_table_plan(
table_name: ObjectName,
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
col_id_gen: &mut ColumnIdGenerator,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let definition = context.normalized_sql().to_owned();
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns)?;
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, col_id_gen)?;
gen_create_table_plan_without_bind(
session,
context,
Expand Down Expand Up @@ -428,6 +467,7 @@ pub async fn handle_create_table(
table_name.clone(),
columns,
constraints,
&mut ColumnIdGenerator::initial(),
)?;
let graph = build_graph(plan);

Expand Down Expand Up @@ -540,7 +580,8 @@ mod tests {
..
} = ast.remove(0) else { panic!("test case should be create table") };
let actual: Result<_> = (|| {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns)?;
let (column_descs, pk_column_id_from_columns) =
bind_sql_columns(columns, &mut ColumnIdGenerator::initial())?;
let (_, pk_column_ids, _) = bind_sql_table_constraints(
column_descs,
pk_column_id_from_columns,
Expand Down
14 changes: 12 additions & 2 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement};
use super::create_index::gen_create_index_plan;
use super::create_mv::gen_create_mv_plan;
use super::create_sink::gen_sink_plan;
use super::create_table::gen_create_table_plan;
use super::create_table::{gen_create_table_plan, ColumnIdGenerator};
use super::query::gen_batch_query_plan;
use super::RwPgResponse;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -63,7 +63,17 @@ pub fn handle_explain(
columns,
constraints,
..
} => gen_create_table_plan(&session, context.into(), name, columns, constraints)?.0,
} => {
gen_create_table_plan(
&session,
context.into(),
name,
columns,
constraints,
&mut ColumnIdGenerator::initial(),
)?
.0
}

Statement::CreateIndex {
name,
Expand Down

0 comments on commit 6f39f43

Please sign in to comment.