Skip to content

Commit

Permalink
Implement new CSV WITH HEADER (cols) syntax
Browse files Browse the repository at this point in the history
This syntax allows users to provide header names for objects that do not yet
exist. It additionally allows Materialize to record header columns into SQL for
the catalog interacting less with the more general aliases SQL feature -- we
still put default column names in the SQL aliases if the format is specifed as
`WITH n COLUMNS`.

Design: MaterializeInc#7407
Part of: MaterializeInc#7145
  • Loading branch information
quodlibetor committed Aug 18, 2021
1 parent bd2bded commit b537525
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 125 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 34 additions & 7 deletions src/dataflow-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,19 @@ impl DataEncoding {
let ty = ScalarType::String.nullable(true);
desc.with_named_column(name, ty)
}),
DataEncoding::Csv(CsvEncoding { n_cols, .. }) => {
(1..=*n_cols).fold(key_desc, |desc, i| {
DataEncoding::Csv(CsvEncoding { columns, .. }) => match columns {
ColumnSpec::Count(n) => (1..=*n).into_iter().fold(key_desc, |desc, i| {
desc.with_named_column(
format!("column{}", i),
ScalarType::String.nullable(false),
)
})
}
}),
ColumnSpec::Header { names } => {
names.iter().map(|s| &**s).fold(key_desc, |desc, name| {
desc.with_named_column(name, ScalarType::String.nullable(false))
})
}
},
DataEncoding::Text => {
key_desc.with_named_column("text", ScalarType::String.nullable(false))
}
Expand Down Expand Up @@ -598,14 +603,36 @@ pub struct ProtobufEncoding {
pub message_name: String,
}

/// Encoding in CSV format, with `n_cols` columns per row, with an optional header.
/// Encoding in CSV format
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CsvEncoding {
pub header_row: bool,
pub n_cols: usize,
pub columns: ColumnSpec,
pub delimiter: u8,
}

impl CsvEncoding {
pub fn has_header(&self) -> bool {
matches!(self.columns, ColumnSpec::Header { .. })
}
}

/// What we know about the CSV columns
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ColumnSpec {
Count(usize),
Header { names: Vec<String> },
}

impl ColumnSpec {
/// The number of columns described by the column spec.
pub fn arity(&self) -> usize {
match self {
ColumnSpec::Count(n) => *n,
ColumnSpec::Header { names } => names.len(),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegexEncoding {
#[serde(with = "serde_regex")]
Expand Down
8 changes: 3 additions & 5 deletions src/dataflow/src/decode/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ impl CsvDecoderState {
}

pub fn new(format: CsvEncoding, operators: &mut Option<LinearOperator>) -> Self {
let CsvEncoding {
header_row,
n_cols,
delimiter,
} = format;
let header_row = format.has_header();
let CsvEncoding { columns, delimiter } = format;
let n_cols = columns.arity();

let operators = operators.take();
let demanded = (0..n_cols)
Expand Down
44 changes: 31 additions & 13 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ pub enum Format<T: AstInfo> {
Protobuf(ProtobufSchema<T>),
Regex(String),
Csv {
header_row: bool,
n_cols: Option<usize>,
columns: CsvColumns,
delimiter: char,
},
Json,
Expand All @@ -241,6 +240,33 @@ impl<T: AstInfo> Format<T> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CsvColumns {
/// `WITH count COLUMNS`
Count(usize),
/// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
Header { names: Vec<Ident> },
}

impl AstDisplay for CsvColumns {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CsvColumns::Count(n) => {
f.write_str(n);
f.write_str(" COLUMNS")
}
CsvColumns::Header { names } => {
f.write_str("HEADER");
if !names.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&names));
f.write_str(")");
}
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSourceKeyEnvelope {
/// `INCLUDE KEY` is absent
Expand Down Expand Up @@ -324,18 +350,10 @@ impl<T: AstInfo> AstDisplay for Format<T> {
f.write_node(&display::escape_single_quote_string(regex));
f.write_str("'");
}
Self::Csv {
header_row,
n_cols,
delimiter,
} => {
Self::Csv { columns, delimiter } => {
f.write_str("CSV WITH ");
if *header_row {
f.write_str("HEADER");
} else {
f.write_str(n_cols.unwrap());
f.write_str(" COLUMNS");
}
f.write_node(columns);

if *delimiter != ',' {
f.write_str(" DELIMITED BY '");
f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
Expand Down
6 changes: 6 additions & 0 deletions src/sql-parser/src/ast/defs/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl From<&str> for Ident {
}
}

impl From<String> for Ident {
fn from(value: String) -> Self {
Ident(value)
}
}

/// More-or-less a direct translation of the Postgres function for doing the same thing:
///
/// <https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/ruleutils.c#L10730-L10812>
Expand Down
15 changes: 6 additions & 9 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,13 +1456,14 @@ impl<'a> Parser<'a> {
Format::Regex(regex)
} else if self.parse_keyword(CSV) {
self.expect_keyword(WITH)?;
let (header_row, n_cols) = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS)
{
(true, None)
let columns = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) {
CsvColumns::Header {
names: self.parse_parenthesized_column_list(Optional)?,
}
} else {
let n_cols = self.parse_literal_uint()? as usize;
self.expect_keyword(COLUMNS)?;
(false, Some(n_cols))
CsvColumns::Count(n_cols)
};
let delimiter = if self.parse_keywords(&[DELIMITED, BY]) {
let s = self.parse_literal_string()?;
Expand All @@ -1473,11 +1474,7 @@ impl<'a> Parser<'a> {
} else {
','
};
Format::Csv {
header_row,
n_cols,
delimiter,
}
Format::Csv { columns, delimiter }
} else if self.parse_keyword(JSON) {
Format::Json
} else if self.parse_keyword(TEXT) {
Expand Down
15 changes: 11 additions & 4 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -410,28 +410,35 @@ CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c)
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c)
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [Ident("a"), Ident("b"), Ident("c")] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Count(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER
----
CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|'
----
CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|'
=>
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { columns: Count(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None })

parse-statement
CREATE MATERIALIZED OR VIEW foo as SELECT * from bar
Expand Down
1 change: 1 addition & 0 deletions src/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ aws-util = { path = "../aws-util" }
build-info = { path = "../build-info" }
ccsr = { path = "../ccsr" }
chrono = { version = "0.4.0", default-features = false, features = ["clock", "std"] }
csv = "1.1.6"
dataflow-types = { path = "../dataflow-types" }
dec = "0.4.5"
enum-kinds = "0.5.0"
Expand Down
58 changes: 26 additions & 32 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ use regex::Regex;
use reqwest::Url;

use dataflow_types::{
AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, Consistency,
CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, FileSourceConnector,
KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, KeyEnvelope,
KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, PubNubSourceConnector,
RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, SourceConnector,
SourceDataEncoding, SourceEnvelope, Timeline,
AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, ColumnSpec,
Consistency, CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector,
FileSourceConnector, KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector,
KeyEnvelope, KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding,
PubNubSourceConnector, RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope,
SourceConnector, SourceDataEncoding, SourceEnvelope, Timeline,
};
use expr::{GlobalId, MirRelationExpr, TableFunc, UnaryFunc};
use interchange::avro::{self, AvroSchemaGenerator, DebeziumDeduplicationStrategy};
use interchange::envelopes;
use ore::collections::CollectionExt;
use ore::str::StrExt;
use repr::{strconv, ColumnName, ColumnType, Datum, RelationDesc, RelationType, Row, ScalarType};
use sql_parser::ast::{CreateSourceFormat, KeyConstraint};
use sql_parser::ast::{CreateSourceFormat, CsvColumns, KeyConstraint};

use crate::ast::display::AstDisplay;
use crate::ast::{
Expand Down Expand Up @@ -461,7 +461,7 @@ pub fn plan_create_source(
Some(v) => bail!("invalid start_offset value: {}", v),
}

let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
let key_envelope = get_key_envelope(key_envelope, envelope, &encoding)?;

let connector = ExternalSourceConnector::Kafka(KafkaSourceConnector {
Expand Down Expand Up @@ -505,7 +505,7 @@ pub fn plan_create_source(
stream_name,
aws_info,
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
CreateSourceConnector::File { path, compression } => {
Expand All @@ -527,7 +527,7 @@ pub fn plan_create_source(
},
tail,
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
CreateSourceConnector::S3 {
Expand Down Expand Up @@ -569,7 +569,7 @@ pub fn plan_create_source(
Compression::None => dataflow_types::Compression::None,
},
});
let encoding = get_encoding(format, envelope, with_options_original, col_names)?;
let encoding = get_encoding(format, envelope, with_options_original)?;
(connector, encoding, KeyEnvelope::None)
}
CreateSourceConnector::Postgres {
Expand Down Expand Up @@ -809,6 +809,7 @@ pub fn plan_create_source(
} else {
None
};

bare_desc =
plan_utils::maybe_rename_columns(format!("source {}", name), bare_desc, &col_names)?;

Expand Down Expand Up @@ -919,17 +920,16 @@ fn get_encoding<T: sql_parser::ast::AstInfo>(
format: &CreateSourceFormat<Raw>,
envelope: &Envelope,
with_options: &Vec<SqlOption<T>>,
col_names: &[Ident],
) -> Result<SourceDataEncoding, anyhow::Error> {
let encoding = match format {
CreateSourceFormat::None => bail!("Source format must be specified"),
CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options, col_names)?,
CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options)?,
CreateSourceFormat::KeyValue { key, value } => {
let key = match get_encoding_inner(key, with_options, col_names)? {
let key = match get_encoding_inner(key, with_options)? {
SourceDataEncoding::Single(key) => key,
SourceDataEncoding::KeyValue { key, .. } => key,
};
let value = match get_encoding_inner(value, with_options, col_names)? {
let value = match get_encoding_inner(value, with_options)? {
SourceDataEncoding::Single(value) => value,
SourceDataEncoding::KeyValue { value, .. } => value,
};
Expand All @@ -952,7 +952,6 @@ fn get_encoding<T: sql_parser::ast::AstInfo>(
fn get_encoding_inner<T: sql_parser::ast::AstInfo>(
format: &Format<Raw>,
with_options: &Vec<SqlOption<T>>,
col_names: &[Ident],
) -> Result<SourceDataEncoding, anyhow::Error> {
// Avro/CSR can return a `SourceDataEncoding::KeyValue`
Ok(SourceDataEncoding::Single(match format {
Expand Down Expand Up @@ -1067,25 +1066,20 @@ fn get_encoding_inner<T: sql_parser::ast::AstInfo>(
let regex = Regex::new(&regex)?;
DataEncoding::Regex(RegexEncoding { regex })
}
Format::Csv {
header_row,
n_cols,
delimiter,
} => {
let n_cols = if col_names.is_empty() {
match n_cols {
Some(n) => *n,
None => bail!(
"Cannot determine number of columns in CSV source; specify using \
CREATE SOURCE...FORMAT CSV WITH X COLUMNS"
),
Format::Csv { columns, delimiter } => {
let columns = match columns {
CsvColumns::Header { names } => {
if names.is_empty() {
bail!("[internal error] column spec should get names in purify")
}
ColumnSpec::Header {
names: names.iter().cloned().map(|n| n.into_string()).collect(),
}
}
} else {
col_names.len()
CsvColumns::Count(n) => ColumnSpec::Count(*n),
};
DataEncoding::Csv(CsvEncoding {
header_row: *header_row,
n_cols,
columns,
delimiter: match *delimiter as u32 {
0..=127 => *delimiter as u8,
_ => bail!("CSV delimiter must be an ASCII character"),
Expand Down
Loading

0 comments on commit b537525

Please sign in to comment.