From a6a8228ab17959c339f0f865c76e1f90a30a0cd4 Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Tue, 27 Jul 2021 13:34:29 -0400 Subject: [PATCH] Implement new CSV WITH HEADER (cols) syntax This syntax allows users to provide header names for objects that do not yet exist. It additionally allows Materialize to record header columns into SQL for the catalog interacting less with the more general aliases SQL feature -- we still put default column names in the SQL aliases if the format is specifed as `WITH n COLUMNS`. Design: #7407 Part of: #7145 --- Cargo.lock | 1 + src/dataflow-types/src/types.rs | 41 ++++- src/dataflow/src/decode/csv.rs | 8 +- src/sql-parser/src/ast/defs/ddl.rs | 44 +++-- src/sql-parser/src/ast/defs/name.rs | 6 + src/sql-parser/src/parser.rs | 15 +- src/sql-parser/tests/testdata/ddl | 15 +- src/sql/Cargo.toml | 1 + src/sql/src/plan/statement/ddl.rs | 58 +++---- src/sql/src/pure.rs | 163 ++++++++++++------ test/testdrive/csv-sources.td | 48 ++++++ test/upgrade/check-from-any_version-csv.td | 30 ++++ ...from-latest_version-csv-header-explicit.td | 18 ++ test/upgrade/create-in-any_version-csv.td | 28 +++ ...latest_version-csv-with-header-explicit.td | 20 +++ 15 files changed, 371 insertions(+), 125 deletions(-) create mode 100644 test/upgrade/check-from-any_version-csv.td create mode 100644 test/upgrade/check-from-latest_version-csv-header-explicit.td create mode 100644 test/upgrade/create-in-any_version-csv.td create mode 100644 test/upgrade/create-in-latest_version-csv-with-header-explicit.td diff --git a/Cargo.lock b/Cargo.lock index 0308b06d72a8..0dc9f0d84c16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3951,6 +3951,7 @@ dependencies = [ "build-info", "ccsr", "chrono", + "csv", "datadriven", "dataflow-types", "dec", diff --git a/src/dataflow-types/src/types.rs b/src/dataflow-types/src/types.rs index 9a37ae0bfc3a..ed02f5773a34 100644 --- a/src/dataflow-types/src/types.rs +++ b/src/dataflow-types/src/types.rs @@ -536,14 +536,19 @@ impl DataEncoding { let ty = ScalarType::String.nullable(true); desc.with_named_column(name, ty) }), - DataEncoding::Csv(CsvEncoding { n_cols, .. }) => { - (1..=*n_cols).fold(key_desc, |desc, i| { + DataEncoding::Csv(CsvEncoding { columns, .. }) => match columns { + ColumnSpec::Count(n) => (1..=*n).into_iter().fold(key_desc, |desc, i| { desc.with_named_column( format!("column{}", i), ScalarType::String.nullable(false), ) - }) - } + }), + ColumnSpec::Header { names } => { + names.iter().map(|s| &**s).fold(key_desc, |desc, name| { + desc.with_named_column(name, ScalarType::String.nullable(false)) + }) + } + }, DataEncoding::Text => { key_desc.with_named_column("text", ScalarType::String.nullable(false)) } @@ -598,14 +603,36 @@ pub struct ProtobufEncoding { pub message_name: String, } -/// Encoding in CSV format, with `n_cols` columns per row, with an optional header. +/// Encoding in CSV format #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CsvEncoding { - pub header_row: bool, - pub n_cols: usize, + pub columns: ColumnSpec, pub delimiter: u8, } +impl CsvEncoding { + pub fn has_header(&self) -> bool { + matches!(self.columns, ColumnSpec::Header { .. }) + } +} + +/// What we know about the CSV columns +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ColumnSpec { + Count(usize), + Header { names: Vec }, +} + +impl ColumnSpec { + /// The number of columns described by the column spec. + pub fn arity(&self) -> usize { + match self { + ColumnSpec::Count(n) => *n, + ColumnSpec::Header { names } => names.len(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RegexEncoding { #[serde(with = "serde_regex")] diff --git a/src/dataflow/src/decode/csv.rs b/src/dataflow/src/decode/csv.rs index 7716dfe4db16..ce446ccce31f 100644 --- a/src/dataflow/src/decode/csv.rs +++ b/src/dataflow/src/decode/csv.rs @@ -36,11 +36,9 @@ impl CsvDecoderState { } pub fn new(format: CsvEncoding, operators: &mut Option) -> Self { - let CsvEncoding { - header_row, - n_cols, - delimiter, - } = format; + let header_row = format.has_header(); + let CsvEncoding { columns, delimiter } = format; + let n_cols = columns.arity(); let operators = operators.take(); let demanded = (0..n_cols) diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index e8c7dc03b2fa..3995a65c5432 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -226,8 +226,7 @@ pub enum Format { Protobuf(ProtobufSchema), Regex(String), Csv { - header_row: bool, - n_cols: Option, + columns: CsvColumns, delimiter: char, }, Json, @@ -241,6 +240,33 @@ impl Format { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum CsvColumns { + /// `WITH count COLUMNS` + Count(usize), + /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified + Header { names: Vec }, +} + +impl AstDisplay for CsvColumns { + fn fmt(&self, f: &mut AstFormatter) { + match self { + CsvColumns::Count(n) => { + f.write_str(n); + f.write_str(" COLUMNS") + } + CsvColumns::Header { names } => { + f.write_str("HEADER"); + if !names.is_empty() { + f.write_str(" ("); + f.write_node(&display::comma_separated(&names)); + f.write_str(")"); + } + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum CreateSourceKeyEnvelope { /// `INCLUDE KEY` is absent @@ -324,18 +350,10 @@ impl AstDisplay for Format { f.write_node(&display::escape_single_quote_string(regex)); f.write_str("'"); } - Self::Csv { - header_row, - n_cols, - delimiter, - } => { + Self::Csv { columns, delimiter } => { f.write_str("CSV WITH "); - if *header_row { - f.write_str("HEADER"); - } else { - f.write_str(n_cols.unwrap()); - f.write_str(" COLUMNS"); - } + f.write_node(columns); + if *delimiter != ',' { f.write_str(" DELIMITED BY '"); f.write_node(&display::escape_single_quote_string(&delimiter.to_string())); diff --git a/src/sql-parser/src/ast/defs/name.rs b/src/sql-parser/src/ast/defs/name.rs index b02ab775adfd..955bf8673f4a 100644 --- a/src/sql-parser/src/ast/defs/name.rs +++ b/src/sql-parser/src/ast/defs/name.rs @@ -71,6 +71,12 @@ impl From<&str> for Ident { } } +impl From for Ident { + fn from(value: String) -> Self { + Ident(value) + } +} + /// More-or-less a direct translation of the Postgres function for doing the same thing: /// /// diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 75cff63c4cc0..f62b94b7f6da 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1456,13 +1456,14 @@ impl<'a> Parser<'a> { Format::Regex(regex) } else if self.parse_keyword(CSV) { self.expect_keyword(WITH)?; - let (header_row, n_cols) = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) - { - (true, None) + let columns = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) { + CsvColumns::Header { + names: self.parse_parenthesized_column_list(Optional)?, + } } else { let n_cols = self.parse_literal_uint()? as usize; self.expect_keyword(COLUMNS)?; - (false, Some(n_cols)) + CsvColumns::Count(n_cols) }; let delimiter = if self.parse_keywords(&[DELIMITED, BY]) { let s = self.parse_literal_string()?; @@ -1473,11 +1474,7 @@ impl<'a> Parser<'a> { } else { ',' }; - Format::Csv { - header_row, - n_cols, - delimiter, - } + Format::Csv { columns, delimiter } } else if self.parse_keyword(JSON) { Format::Json } else if self.parse_keyword(TEXT) { diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index a2d2d9d83f84..1cb003d1fdfa 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -410,28 +410,35 @@ CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) + +parse-statement +CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c) +---- +CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c) +=> +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [Ident("a"), Ident("b"), Ident("c")] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Count(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER ---- CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|' ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|' => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { columns: Count(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE MATERIALIZED OR VIEW foo as SELECT * from bar diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index ab6353430cdb..ba85bcf96a3e 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -12,6 +12,7 @@ aws-util = { path = "../aws-util" } build-info = { path = "../build-info" } ccsr = { path = "../ccsr" } chrono = { version = "0.4.0", default-features = false, features = ["clock", "std"] } +csv = "1.1.6" dataflow-types = { path = "../dataflow-types" } dec = "0.4.5" enum-kinds = "0.5.0" diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index f2e6a48cb023..959b0469b12a 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -27,12 +27,12 @@ use regex::Regex; use reqwest::Url; use dataflow_types::{ - AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, Consistency, - CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, FileSourceConnector, - KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, KeyEnvelope, - KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, PubNubSourceConnector, - RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, SourceConnector, - SourceDataEncoding, SourceEnvelope, Timeline, + AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, ColumnSpec, + Consistency, CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, + FileSourceConnector, KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, + KeyEnvelope, KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, + PubNubSourceConnector, RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, + SourceConnector, SourceDataEncoding, SourceEnvelope, Timeline, }; use expr::{GlobalId, MirRelationExpr, TableFunc, UnaryFunc}; use interchange::avro::{self, AvroSchemaGenerator, DebeziumDeduplicationStrategy}; @@ -40,7 +40,7 @@ use interchange::envelopes; use ore::collections::CollectionExt; use ore::str::StrExt; use repr::{strconv, ColumnName, ColumnType, Datum, RelationDesc, RelationType, Row, ScalarType}; -use sql_parser::ast::{CreateSourceFormat, KeyConstraint}; +use sql_parser::ast::{CreateSourceFormat, CsvColumns, KeyConstraint}; use crate::ast::display::AstDisplay; use crate::ast::{ @@ -461,7 +461,7 @@ pub fn plan_create_source( Some(v) => bail!("invalid start_offset value: {}", v), } - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; let key_envelope = get_key_envelope(key_envelope, envelope, &encoding)?; let connector = ExternalSourceConnector::Kafka(KafkaSourceConnector { @@ -505,7 +505,7 @@ pub fn plan_create_source( stream_name, aws_info, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::File { path, compression } => { @@ -527,7 +527,7 @@ pub fn plan_create_source( }, tail, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::S3 { @@ -569,7 +569,7 @@ pub fn plan_create_source( Compression::None => dataflow_types::Compression::None, }, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::Postgres { @@ -809,6 +809,7 @@ pub fn plan_create_source( } else { None }; + bare_desc = plan_utils::maybe_rename_columns(format!("source {}", name), bare_desc, &col_names)?; @@ -919,17 +920,16 @@ fn get_encoding( format: &CreateSourceFormat, envelope: &Envelope, with_options: &Vec>, - col_names: &[Ident], ) -> Result { let encoding = match format { CreateSourceFormat::None => bail!("Source format must be specified"), - CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options, col_names)?, + CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options)?, CreateSourceFormat::KeyValue { key, value } => { - let key = match get_encoding_inner(key, with_options, col_names)? { + let key = match get_encoding_inner(key, with_options)? { SourceDataEncoding::Single(key) => key, SourceDataEncoding::KeyValue { key, .. } => key, }; - let value = match get_encoding_inner(value, with_options, col_names)? { + let value = match get_encoding_inner(value, with_options)? { SourceDataEncoding::Single(value) => value, SourceDataEncoding::KeyValue { value, .. } => value, }; @@ -952,7 +952,6 @@ fn get_encoding( fn get_encoding_inner( format: &Format, with_options: &Vec>, - col_names: &[Ident], ) -> Result { // Avro/CSR can return a `SourceDataEncoding::KeyValue` Ok(SourceDataEncoding::Single(match format { @@ -1067,25 +1066,20 @@ fn get_encoding_inner( let regex = Regex::new(®ex)?; DataEncoding::Regex(RegexEncoding { regex }) } - Format::Csv { - header_row, - n_cols, - delimiter, - } => { - let n_cols = if col_names.is_empty() { - match n_cols { - Some(n) => *n, - None => bail!( - "Cannot determine number of columns in CSV source; specify using \ - CREATE SOURCE...FORMAT CSV WITH X COLUMNS" - ), + Format::Csv { columns, delimiter } => { + let columns = match columns { + CsvColumns::Header { names } => { + if names.is_empty() { + bail!("[internal error] column spec should get names in purify") + } + ColumnSpec::Header { + names: names.iter().cloned().map(|n| n.into_string()).collect(), + } } - } else { - col_names.len() + CsvColumns::Count(n) => ColumnSpec::Count(*n), }; DataEncoding::Csv(CsvEncoding { - header_row: *header_row, - n_cols, + columns, delimiter: match *delimiter as u32 { 0..=127 => *delimiter as u8, _ => bail!("CSV delimiter must be an ASCII character"), diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index b3dd4ecc784c..cf59a2e0a80c 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -16,6 +16,7 @@ use std::future::Future; use anyhow::{anyhow, bail, ensure, Context}; use aws_arn::ARN; +use csv::ReaderBuilder; use itertools::Itertools; use tokio::fs::File; use tokio::io::AsyncBufReadExt; @@ -28,8 +29,8 @@ use repr::strconv; use sql_parser::ast::{ display::AstDisplay, AvroSchema, CreateSourceConnector, CreateSourceFormat, CreateSourceStatement, CreateViewsDefinitions, CreateViewsSourceTarget, CreateViewsStatement, - CsrConnector, CsrSeed, DbzMode, Envelope, Expr, Format, Ident, ProtobufSchema, Query, Raw, - RawName, Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, + CsrConnector, CsrSeed, CsvColumns, DbzMode, Envelope, Expr, Format, Ident, ProtobufSchema, + Query, Raw, RawName, Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, UnresolvedObjectName, Value, ViewDefinition, WithOption, WithOptionValue, }; use sql_parser::parser::parse_columns; @@ -75,7 +76,6 @@ pub fn purify( async move { if let Statement::CreateSource(CreateSourceStatement { - col_names, connector, format, envelope, @@ -197,15 +197,7 @@ pub fn purify( CreateSourceConnector::PubNub { .. } => (), } - purify_source_format( - format, - connector, - &envelope, - col_names, - file, - &config_options, - ) - .await?; + purify_source_format(format, connector, &envelope, file, &config_options).await?; } if let Statement::CreateViews(CreateViewsStatement { definitions, .. }) = &mut stmt { if let CreateViewsDefinitions::Source { @@ -360,7 +352,6 @@ async fn purify_source_format( format: &mut CreateSourceFormat, connector: &mut CreateSourceConnector, envelope: &Envelope, - col_names: &mut Vec, file: Option, connector_options: &BTreeMap, ) -> Result<(), anyhow::Error> { @@ -393,15 +384,8 @@ async fn purify_source_format( match format { CreateSourceFormat::None => {} CreateSourceFormat::Bare(format) => { - purify_source_format_single( - format, - connector, - envelope, - col_names, - file, - connector_options, - ) - .await? + purify_source_format_single(format, connector, envelope, file, connector_options) + .await? } CreateSourceFormat::KeyValue { key, value: val } => { @@ -410,24 +394,8 @@ async fn purify_source_format( anyhow!("[internal-error] File sources cannot be key-value sources") ); - purify_source_format_single( - key, - connector, - envelope, - col_names, - None, - connector_options, - ) - .await?; - purify_source_format_single( - val, - connector, - envelope, - col_names, - None, - connector_options, - ) - .await?; + purify_source_format_single(key, connector, envelope, None, connector_options).await?; + purify_source_format_single(val, connector, envelope, None, connector_options).await?; } } Ok(()) @@ -437,7 +405,6 @@ async fn purify_source_format_single( format: &mut Format, connector: &mut CreateSourceConnector, envelope: &Envelope, - col_names: &mut Vec, file: Option, connector_options: &BTreeMap, ) -> Result<(), anyhow::Error> { @@ -488,24 +455,110 @@ async fn purify_source_format_single( } }, Format::Csv { - header_row, delimiter, - .. + ref mut columns, } => { - if *header_row && col_names.is_empty() { - if let Some(file) = file { - let file = tokio::io::BufReader::new(file); - let csv_header = file.lines().next_line().await?; - match csv_header { - Some(csv_header) => { - csv_header - .split(*delimiter as char) - .for_each(|v| col_names.push(Ident::from(v))); + if matches!(columns, CsvColumns::Header { .. }) + && !matches!(connector, CreateSourceConnector::File { .. }) + { + unimplemented!("CSV WITH HEADER with non-file sources"); + } + + let first_row = if let Some(file) = file { + let file = tokio::io::BufReader::new(file); + let csv_header = file.lines().next_line().await; + if !delimiter.is_ascii() { + bail!("CSV delimiter must be ascii"); + } + match csv_header { + Ok(Some(csv_header)) => { + let mut reader = ReaderBuilder::new() + .delimiter(*delimiter as u8) + .has_headers(false) + .from_reader(csv_header.as_bytes()); + + if let Some(result) = reader.records().next() { + match result { + Ok(headers) => Some(headers), + Err(e) => bail!("Unable to parse header row: {}", e), + } + } else { + None + } + } + Ok(None) => { + if let CsvColumns::Header { names } = columns { + if names.is_empty() { + bail!( + "CSV file expected to have at least one line \ + to determine column names, but is empty" + ); + } else { + None + } + } else { + None + } + } + Err(e) => { + // TODO(#7562): support compressed files + if let CsvColumns::Header { names } = columns { + if names.is_empty() { + bail!("Cannot determine header by reading CSV file: {}", e); + } else { + None + } + } else { + None + } + } + } + } else { + None + }; + + match (&columns, first_row) { + (CsvColumns::Header { names }, Some(headers)) if names.is_empty() => { + *columns = CsvColumns::Header { + names: headers.into_iter().map(Ident::from).collect(), + }; + } + (CsvColumns::Header { names }, Some(headers)) => { + if names.len() != headers.len() { + bail!( + "Named column count ({}) does not match \ + number of columns discovered ({})", + names.len(), + headers.len() + ); + } else if let Some((sql, csv)) = names + .iter() + .zip(headers.iter()) + .find(|(sql, csv)| sql.as_str() != &**csv) + { + bail!("Header columns do not match named columns from CREATE SOURCE statement. \ + First mismatched columns: {} != {}", sql, csv); + } + } + (CsvColumns::Header { names }, None) if names.is_empty() => { + bail!("WITH HEADER requires a way to determine the header row, but file does not exist"); + } + (CsvColumns::Header { names }, None) => { + // we don't need to do any verification if we are told the names of the headers + assert!(!names.is_empty(), "match arm moved into the wrong spot"); + } + + (CsvColumns::Count(n), first_line) => { + if let Some(columns) = first_line { + if *n != columns.len() { + bail!( + "Specified column count (WITH {} COLUMNS) \ + does not match number of columns in CSV file ({})", + n, + columns.len() + ); } - None => bail!("CSV file expected header line, but is empty"), } - } else { - bail!("CSV format with headers only works with file connectors") } } } diff --git a/test/testdrive/csv-sources.td b/test/testdrive/csv-sources.td index a43783433fcd..311ad3d297e2 100644 --- a/test/testdrive/csv-sources.td +++ b/test/testdrive/csv-sources.td @@ -21,6 +21,39 @@ place""",CA,92679 FORMAT CSV WITH 3 COLUMNS unexpected parameters for CREATE SOURCE: badoption +! CREATE SOURCE mismatched_column_count + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH 2 COLUMNS +Specified column count (WITH 2 COLUMNS) does not match number of columns in CSV file (3) + +> CREATE MATERIALIZED SOURCE matching_column_names + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (city, state, zip) + +> SELECT * FROM matching_column_names where zip = '14618' +city state zip mz_line_no +------------------------- +Rochester NY 14618 1 + +> CREATE MATERIALIZED SOURCE matching_column_names_alias (a, b, c) + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (city, state, zip) + +> SELECT * FROM matching_column_names_alias where c = '14618' +a b c mz_line_no +---------------- +Rochester NY 14618 1 + +! CREATE SOURCE mismatched_column_names + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (cities, country, zip) +Header columns do not match named columns from CREATE SOURCE statement. First mismatched columns: cities != city + +! CREATE SOURCE mismatched_column_names_count + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (cities, state) +Named column count (2) does not match number of columns discovered (3) + # Static CSV without headers. > CREATE MATERIALIZED SOURCE static_csv FROM FILE '${testdrive.temp-dir}/static.csv' @@ -136,6 +169,21 @@ Dollars,Category ! SELECT * FROM bad_text_csv Decode error: Text: CSV error at record number 2: invalid UTF-8 +# CSV file with comma in header + +$ file-append path=header-delimiter.csv +"interesting,id",name +1,blat + +> CREATE MATERIALIZED SOURCE header_delimited + FROM FILE '${testdrive.temp-dir}/header-delimiter.csv' + FORMAT CSV WITH HEADER + +> SELECT * FROM header_delimited +interesting,id name mz_line_no +------------------------------ +1 blat 1 + # Declare a key constraint (PRIMARY KEY NOT ENFORCED) $ kafka-create-topic topic=static-csv-pkne-sink diff --git a/test/upgrade/check-from-any_version-csv.td b/test/upgrade/check-from-any_version-csv.td new file mode 100644 index 000000000000..344cbc3f52d0 --- /dev/null +++ b/test/upgrade/check-from-any_version-csv.td @@ -0,0 +1,30 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +> select * from csv_upgrade_with_header +id value +--------- +1 person + +> select * from csv_upgrade_with_header_alias +al sia +-------- +1 person + +> select * from csv_upgrade_no_header +column1 column2 +---------------- +id value +1 person + +> select * from csv_upgrade_no_header_alias +al sia +---------- +id value +1 person diff --git a/test/upgrade/check-from-latest_version-csv-header-explicit.td b/test/upgrade/check-from-latest_version-csv-header-explicit.td new file mode 100644 index 000000000000..e208e1f88e13 --- /dev/null +++ b/test/upgrade/check-from-latest_version-csv-header-explicit.td @@ -0,0 +1,18 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +> select * from csv_upgrade_explicit_header +id value +-------- +1 person + +> select * from csv_upgrade_explicit_header_alias +foo bar +-------- +1 person diff --git a/test/upgrade/create-in-any_version-csv.td b/test/upgrade/create-in-any_version-csv.td new file mode 100644 index 000000000000..63e4e8dc3bdd --- /dev/null +++ b/test/upgrade/create-in-any_version-csv.td @@ -0,0 +1,28 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ file-append path=csv.csv +id,value +1,person + +> CREATE MATERIALIZED SOURCE csv_upgrade_with_header + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH HEADER + +> CREATE MATERIALIZED SOURCE csv_upgrade_with_header_alias (al, sia) + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH HEADER + +> CREATE MATERIALIZED SOURCE csv_upgrade_no_header + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH 2 COLUMNS + +> CREATE MATERIALIZED SOURCE csv_upgrade_no_header_alias (al, sia) + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH 2 COLUMNS diff --git a/test/upgrade/create-in-latest_version-csv-with-header-explicit.td b/test/upgrade/create-in-latest_version-csv-with-header-explicit.td new file mode 100644 index 000000000000..3a63e207fee0 --- /dev/null +++ b/test/upgrade/create-in-latest_version-csv-with-header-explicit.td @@ -0,0 +1,20 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ file-append path=upgrade-csv-with-header-explicit.csv +id,value +1,person + +> CREATE MATERIALIZED SOURCE csv_upgrade_explicit_header + FROM FILE '${testdrive.temp-dir}/upgrade-csv-with-header-explicit.csv' + FORMAT CSV WITH HEADER (id, value) + +> CREATE MATERIALIZED SOURCE csv_upgrade_explicit_header_alias (foo, bar) + FROM FILE '${testdrive.temp-dir}/upgrade-csv-with-header-explicit.csv' + FORMAT CSV WITH HEADER (id, value)