Skip to content

Commit

Permalink
Implement new CSV WITH HEADER COLUMNS syntax
Browse files Browse the repository at this point in the history
This syntax allows users to provide header names for objects that do not yet
exist. It additionally allows Materialize to record header columns into SQL for
the catalog interacting less with the more general aliases SQL feature -- we
still put default column names in the SQL aliases if the format is specifed as
`WITH n COLUMNS`.

Design: MaterializeInc#7407
Part of: MaterializeInc#7145
  • Loading branch information
quodlibetor committed Jul 28, 2021
1 parent 3b49792 commit 77c7b63
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 84 deletions.
41 changes: 34 additions & 7 deletions src/dataflow-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,19 @@ impl DataEncoding {
let ty = ScalarType::String.nullable(true);
desc.with_named_column(name, ty)
}),
DataEncoding::Csv(CsvEncoding { n_cols, .. }) => {
(1..=*n_cols).fold(key_desc, |desc, i| {
DataEncoding::Csv(CsvEncoding { columns, .. }) => match columns {
ColumnSpec::Count(n) => (1..=*n).into_iter().fold(key_desc, |desc, i| {
desc.with_named_column(
format!("column{}", i),
ScalarType::String.nullable(false),
)
})
}
}),
ColumnSpec::Names(names) => {
names.iter().map(|s| &**s).fold(key_desc, |desc, name| {
desc.with_named_column(name, ScalarType::String.nullable(false))
})
}
},
DataEncoding::Text => {
key_desc.with_named_column("text", ScalarType::String.nullable(false))
}
Expand Down Expand Up @@ -588,14 +593,36 @@ pub struct ProtobufEncoding {
pub message_name: String,
}

/// Encoding in CSV format, with `n_cols` columns per row, with an optional header.
/// Encoding in CSV format, with the given columns, with an optional header.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CsvEncoding {
pub header_row: bool,
pub n_cols: usize,
pub columns: ColumnSpec,
pub delimiter: u8,
}

impl CsvEncoding {
/// True when this source objects have header rows
pub fn has_header_rows(&self) -> bool {
matches!(self.columns, ColumnSpec::Names(_))
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ColumnSpec {
Count(usize),
Names(Vec<String>),
}

impl ColumnSpec {
/// Either the length of the names or the size of the count
pub fn len(&self) -> usize {
match self {
ColumnSpec::Count(n) => *n,
ColumnSpec::Names(ns) => ns.len(),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegexEncoding {
#[serde(with = "serde_regex")]
Expand Down
8 changes: 3 additions & 5 deletions src/dataflow/src/decode/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ impl CsvDecoderState {
}

pub fn new(format: CsvEncoding, operators: &mut Option<LinearOperator>) -> Self {
let CsvEncoding {
header_row,
n_cols,
delimiter,
} = format;
let header_row = format.has_header_rows();
let CsvEncoding { columns, delimiter } = format;
let n_cols = columns.len();

let operators = operators.take();
let demanded = (0..n_cols)
Expand Down
52 changes: 39 additions & 13 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ pub enum Format<T: AstInfo> {
},
Regex(String),
Csv {
header_row: bool,
n_cols: Option<usize>,
columns: CsvColumns,
delimiter: char,
},
Json,
Expand All @@ -202,6 +201,41 @@ impl<T: AstInfo> Format<T> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CsvColumns {
/// `WITH count COLUMNS`
Count(usize),
/// Column count not specified by syntax
Header,
/// `WITH HEADER COLUMNS (ident, ...)`
HeaderNamed(Vec<Ident>),
}

impl AstDisplay for CsvColumns {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CsvColumns::Header => {
f.write_str("HEADER");
}
CsvColumns::Count(n) => {
f.write_str(n);
f.write_str(" COLUMNS")
}
CsvColumns::HeaderNamed(names) => {
f.write_str("HEADER COLUMNS (");
let last_name = names.len() - 1;
for (i, name) in names.iter().enumerate() {
f.write_node(name);
if i < last_name {
f.write_str(", ")
}
}
f.write_str(")");
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSourceKeyEnvelope {
/// `INCLUDE KEY` is absent
Expand Down Expand Up @@ -290,18 +324,10 @@ impl<T: AstInfo> AstDisplay for Format<T> {
f.write_node(&display::escape_single_quote_string(regex));
f.write_str("'");
}
Self::Csv {
header_row,
n_cols,
delimiter,
} => {
Self::Csv { columns, delimiter } => {
f.write_str("CSV WITH ");
if *header_row {
f.write_str("HEADER");
} else {
f.write_str(n_cols.unwrap());
f.write_str(" COLUMNS");
}
f.write_node(columns);

if *delimiter != ',' {
f.write_str(" DELIMITED BY '");
f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
Expand Down
6 changes: 6 additions & 0 deletions src/sql-parser/src/ast/defs/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl From<&str> for Ident {
}
}

impl From<String> for Ident {
fn from(value: String) -> Self {
Ident(value)
}
}

/// More-or-less a direct translation of the Postgres function for doing the same thing:
///
/// <https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/ruleutils.c#L10730-L10812>
Expand Down
18 changes: 9 additions & 9 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,13 +1463,17 @@ impl<'a> Parser<'a> {
Format::Regex(regex)
} else if self.parse_keyword(CSV) {
self.expect_keyword(WITH)?;
let (header_row, n_cols) = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS)
{
(true, None)
let columns = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) {
if self.parse_keyword(COLUMNS) {
let columns = self.parse_parenthesized_column_list(Mandatory)?;
CsvColumns::HeaderNamed(columns)
} else {
CsvColumns::Header
}
} else {
let n_cols = self.parse_literal_uint()? as usize;
self.expect_keyword(COLUMNS)?;
(false, Some(n_cols))
CsvColumns::Count(n_cols)
};
let delimiter = if self.parse_keywords(&[DELIMITED, BY]) {
let s = self.parse_literal_string()?;
Expand All @@ -1480,11 +1484,7 @@ impl<'a> Parser<'a> {
} else {
','
};
Format::Csv {
header_row,
n_cols,
delimiter,
}
Format::Csv { columns, delimiter }
} else if self.parse_keyword(JSON) {
Format::Json
} else if self.parse_keyword(TEXT) {
Expand Down
15 changes: 11 additions & 4 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -410,28 +410,35 @@ CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER COLUMNS (a, b, c)
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER COLUMNS (a, b, c)
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: HeaderNamed([Ident("a"), Ident("b"), Ident("c")]), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Count(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER
----
CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { columns: Header, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|'
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|'
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { columns: Count(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE MATERIALIZED OR VIEW foo as SELECT * from bar
Expand Down
56 changes: 24 additions & 32 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ use regex::Regex;
use reqwest::Url;

use dataflow_types::{
AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, Consistency,
CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, FileSourceConnector,
KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, KeyEnvelope,
KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, PubNubSourceConnector,
RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, SourceConnector,
SourceDataEncoding, SourceEnvelope, Timeline,
AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, ColumnSpec,
Consistency, CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector,
FileSourceConnector, KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector,
KeyEnvelope, KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding,
PubNubSourceConnector, RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope,
SourceConnector, SourceDataEncoding, SourceEnvelope, Timeline,
};
use expr::{GlobalId, MirRelationExpr, TableFunc, UnaryFunc};
use interchange::avro::{self, AvroSchemaGenerator, DebeziumDeduplicationStrategy};
use interchange::envelopes;
use ore::collections::CollectionExt;
use ore::str::StrExt;
use repr::{strconv, ColumnName, ColumnType, Datum, RelationDesc, RelationType, Row, ScalarType};
use sql_parser::ast::{CreateSourceFormat, KeyConstraint};
use sql_parser::ast::{CreateSourceFormat, CsvColumns, KeyConstraint};

use crate::ast::display::AstDisplay;
use crate::ast::{
Expand Down Expand Up @@ -460,7 +460,7 @@ pub fn plan_create_source(
Some(v) => bail!("invalid start_offset value: {}", v),
}

let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
let key_envelope = get_key_envelope(key_envelope, envelope, &encoding)?;

let connector = ExternalSourceConnector::Kafka(KafkaSourceConnector {
Expand Down Expand Up @@ -504,7 +504,7 @@ pub fn plan_create_source(
stream_name,
aws_info,
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
Connector::File { path, compression } => {
Expand All @@ -526,7 +526,7 @@ pub fn plan_create_source(
},
tail,
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
Connector::S3 {
Expand Down Expand Up @@ -568,7 +568,7 @@ pub fn plan_create_source(
Compression::None => dataflow_types::Compression::None,
},
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
Connector::Postgres {
Expand Down Expand Up @@ -808,6 +808,7 @@ pub fn plan_create_source(
} else {
None
};

bare_desc =
plan_utils::maybe_rename_columns(format!("source {}", name), bare_desc, &col_names)?;

Expand Down Expand Up @@ -918,17 +919,16 @@ fn get_encoding<T: sql_parser::ast::AstInfo>(
format: &CreateSourceFormat<Raw>,
envelope: &Envelope,
with_options: &Vec<SqlOption<T>>,
col_names: &[Ident],
) -> Result<SourceDataEncoding, anyhow::Error> {
let encoding = match format {
CreateSourceFormat::None => bail!("Source format must be specified"),
CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options, col_names)?,
CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options)?,
CreateSourceFormat::KeyValue { key, value } => {
let key = match get_encoding_inner(key, with_options, col_names)? {
let key = match get_encoding_inner(key, with_options)? {
SourceDataEncoding::Single(key) => key,
SourceDataEncoding::KeyValue { key, .. } => key,
};
let value = match get_encoding_inner(value, with_options, col_names)? {
let value = match get_encoding_inner(value, with_options)? {
SourceDataEncoding::Single(value) => value,
SourceDataEncoding::KeyValue { value, .. } => value,
};
Expand All @@ -951,7 +951,6 @@ fn get_encoding<T: sql_parser::ast::AstInfo>(
fn get_encoding_inner<T: sql_parser::ast::AstInfo>(
format: &Format<Raw>,
with_options: &Vec<SqlOption<T>>,
col_names: &[Ident],
) -> Result<SourceDataEncoding, anyhow::Error> {
// Avro/CSR can return a `SourceDataEncoding::KeyValue`
Ok(SourceDataEncoding::Single(match format {
Expand Down Expand Up @@ -1058,25 +1057,18 @@ fn get_encoding_inner<T: sql_parser::ast::AstInfo>(
let regex = Regex::new(&regex)?;
DataEncoding::Regex(RegexEncoding { regex })
}
Format::Csv {
header_row,
n_cols,
delimiter,
} => {
let n_cols = if col_names.is_empty() {
match n_cols {
Some(n) => *n,
None => bail!(
"Cannot determine number of columns in CSV source; specify using \
CREATE SOURCE...FORMAT CSV WITH X COLUMNS"
),
Format::Csv { columns, delimiter } => {
let columns = match columns {
CsvColumns::HeaderNamed(ns) => {
ColumnSpec::Names(ns.iter().cloned().map(|n| n.into_string()).collect())
}
CsvColumns::Count(n) => ColumnSpec::Count(*n),
CsvColumns::Header => {
bail!("[internal error] column spec should get names in purify")
}
} else {
col_names.len()
};
DataEncoding::Csv(CsvEncoding {
header_row: *header_row,
n_cols,
columns,
delimiter: match *delimiter as u32 {
0..=127 => *delimiter as u8,
_ => bail!("CSV delimiter must be an ASCII character"),
Expand Down
Loading

0 comments on commit 77c7b63

Please sign in to comment.