diff --git a/Cargo.lock b/Cargo.lock index 0308b06d72a8..0dc9f0d84c16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3951,6 +3951,7 @@ dependencies = [ "build-info", "ccsr", "chrono", + "csv", "datadriven", "dataflow-types", "dec", diff --git a/src/dataflow-types/src/types.rs b/src/dataflow-types/src/types.rs index 9a37ae0bfc3a..ed02f5773a34 100644 --- a/src/dataflow-types/src/types.rs +++ b/src/dataflow-types/src/types.rs @@ -536,14 +536,19 @@ impl DataEncoding { let ty = ScalarType::String.nullable(true); desc.with_named_column(name, ty) }), - DataEncoding::Csv(CsvEncoding { n_cols, .. }) => { - (1..=*n_cols).fold(key_desc, |desc, i| { + DataEncoding::Csv(CsvEncoding { columns, .. }) => match columns { + ColumnSpec::Count(n) => (1..=*n).into_iter().fold(key_desc, |desc, i| { desc.with_named_column( format!("column{}", i), ScalarType::String.nullable(false), ) - }) - } + }), + ColumnSpec::Header { names } => { + names.iter().map(|s| &**s).fold(key_desc, |desc, name| { + desc.with_named_column(name, ScalarType::String.nullable(false)) + }) + } + }, DataEncoding::Text => { key_desc.with_named_column("text", ScalarType::String.nullable(false)) } @@ -598,14 +603,36 @@ pub struct ProtobufEncoding { pub message_name: String, } -/// Encoding in CSV format, with `n_cols` columns per row, with an optional header. +/// Encoding in CSV format #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CsvEncoding { - pub header_row: bool, - pub n_cols: usize, + pub columns: ColumnSpec, pub delimiter: u8, } +impl CsvEncoding { + pub fn has_header(&self) -> bool { + matches!(self.columns, ColumnSpec::Header { .. }) + } +} + +/// What we know about the CSV columns +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ColumnSpec { + Count(usize), + Header { names: Vec }, +} + +impl ColumnSpec { + /// The number of columns described by the column spec. + pub fn arity(&self) -> usize { + match self { + ColumnSpec::Count(n) => *n, + ColumnSpec::Header { names } => names.len(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RegexEncoding { #[serde(with = "serde_regex")] diff --git a/src/dataflow/src/decode/csv.rs b/src/dataflow/src/decode/csv.rs index 7716dfe4db16..ce446ccce31f 100644 --- a/src/dataflow/src/decode/csv.rs +++ b/src/dataflow/src/decode/csv.rs @@ -36,11 +36,9 @@ impl CsvDecoderState { } pub fn new(format: CsvEncoding, operators: &mut Option) -> Self { - let CsvEncoding { - header_row, - n_cols, - delimiter, - } = format; + let header_row = format.has_header(); + let CsvEncoding { columns, delimiter } = format; + let n_cols = columns.arity(); let operators = operators.take(); let demanded = (0..n_cols) diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index e8c7dc03b2fa..3995a65c5432 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -226,8 +226,7 @@ pub enum Format { Protobuf(ProtobufSchema), Regex(String), Csv { - header_row: bool, - n_cols: Option, + columns: CsvColumns, delimiter: char, }, Json, @@ -241,6 +240,33 @@ impl Format { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum CsvColumns { + /// `WITH count COLUMNS` + Count(usize), + /// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified + Header { names: Vec }, +} + +impl AstDisplay for CsvColumns { + fn fmt(&self, f: &mut AstFormatter) { + match self { + CsvColumns::Count(n) => { + f.write_str(n); + f.write_str(" COLUMNS") + } + CsvColumns::Header { names } => { + f.write_str("HEADER"); + if !names.is_empty() { + f.write_str(" ("); + f.write_node(&display::comma_separated(&names)); + f.write_str(")"); + } + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum CreateSourceKeyEnvelope { /// `INCLUDE KEY` is absent @@ -324,18 +350,10 @@ impl AstDisplay for Format { f.write_node(&display::escape_single_quote_string(regex)); f.write_str("'"); } - Self::Csv { - header_row, - n_cols, - delimiter, - } => { + Self::Csv { columns, delimiter } => { f.write_str("CSV WITH "); - if *header_row { - f.write_str("HEADER"); - } else { - f.write_str(n_cols.unwrap()); - f.write_str(" COLUMNS"); - } + f.write_node(columns); + if *delimiter != ',' { f.write_str(" DELIMITED BY '"); f.write_node(&display::escape_single_quote_string(&delimiter.to_string())); diff --git a/src/sql-parser/src/ast/defs/name.rs b/src/sql-parser/src/ast/defs/name.rs index b02ab775adfd..955bf8673f4a 100644 --- a/src/sql-parser/src/ast/defs/name.rs +++ b/src/sql-parser/src/ast/defs/name.rs @@ -71,6 +71,12 @@ impl From<&str> for Ident { } } +impl From for Ident { + fn from(value: String) -> Self { + Ident(value) + } +} + /// More-or-less a direct translation of the Postgres function for doing the same thing: /// /// diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 75cff63c4cc0..f62b94b7f6da 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1456,13 +1456,14 @@ impl<'a> Parser<'a> { Format::Regex(regex) } else if self.parse_keyword(CSV) { self.expect_keyword(WITH)?; - let (header_row, n_cols) = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) - { - (true, None) + let columns = if self.parse_keyword(HEADER) || self.parse_keyword(HEADERS) { + CsvColumns::Header { + names: self.parse_parenthesized_column_list(Optional)?, + } } else { let n_cols = self.parse_literal_uint()? as usize; self.expect_keyword(COLUMNS)?; - (false, Some(n_cols)) + CsvColumns::Count(n_cols) }; let delimiter = if self.parse_keywords(&[DELIMITED, BY]) { let s = self.parse_literal_string()?; @@ -1473,11 +1474,7 @@ impl<'a> Parser<'a> { } else { ',' }; - Format::Csv { - header_row, - n_cols, - delimiter, - } + Format::Csv { columns, delimiter } } else if self.parse_keyword(JSON) { Format::Json } else if self.parse_keyword(TEXT) { diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index a2d2d9d83f84..1cb003d1fdfa 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -410,28 +410,35 @@ CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) + +parse-statement +CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c) +---- +CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH HEADER (a, b, c) +=> +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Header { names: [Ident("a"), Ident("b"), Ident("c")] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = false) FORMAT CSV WITH 3 COLUMNS => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(false) }], format: Bare(Csv { columns: Count(3), delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER ---- CREATE SOURCE foo (one, two) FROM FILE 'bar' FORMAT CSV WITH HEADER => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { header_row: true, n_cols: None, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [Ident("one"), Ident("two")], connector: File { path: "bar", compression: None }, with_options: [], format: Bare(Csv { columns: Header { names: [] }, delimiter: ',' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|' ---- CREATE SOURCE foo FROM FILE 'bar' WITH (tail = true) FORMAT CSV WITH 3 COLUMNS DELIMITED BY '|' => -CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { header_row: false, n_cols: Some(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) +CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("foo")]), col_names: [], connector: File { path: "bar", compression: None }, with_options: [Value { name: Ident("tail"), value: Boolean(true) }], format: Bare(Csv { columns: Count(3), delimiter: '|' }), key_envelope: None, envelope: None, if_not_exists: false, materialized: false, key_constraint: None }) parse-statement CREATE MATERIALIZED OR VIEW foo as SELECT * from bar diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index ab6353430cdb..ba85bcf96a3e 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -12,6 +12,7 @@ aws-util = { path = "../aws-util" } build-info = { path = "../build-info" } ccsr = { path = "../ccsr" } chrono = { version = "0.4.0", default-features = false, features = ["clock", "std"] } +csv = "1.1.6" dataflow-types = { path = "../dataflow-types" } dec = "0.4.5" enum-kinds = "0.5.0" diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index f2e6a48cb023..959b0469b12a 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -27,12 +27,12 @@ use regex::Regex; use reqwest::Url; use dataflow_types::{ - AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, Consistency, - CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, FileSourceConnector, - KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, KeyEnvelope, - KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, PubNubSourceConnector, - RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, SourceConnector, - SourceDataEncoding, SourceEnvelope, Timeline, + AvroEncoding, AvroOcfEncoding, AvroOcfSinkConnectorBuilder, BringYourOwn, ColumnSpec, + Consistency, CsvEncoding, DataEncoding, DebeziumMode, ExternalSourceConnector, + FileSourceConnector, KafkaSinkConnectorBuilder, KafkaSinkFormat, KafkaSourceConnector, + KeyEnvelope, KinesisSourceConnector, PostgresSourceConnector, ProtobufEncoding, + PubNubSourceConnector, RegexEncoding, S3SourceConnector, SinkConnectorBuilder, SinkEnvelope, + SourceConnector, SourceDataEncoding, SourceEnvelope, Timeline, }; use expr::{GlobalId, MirRelationExpr, TableFunc, UnaryFunc}; use interchange::avro::{self, AvroSchemaGenerator, DebeziumDeduplicationStrategy}; @@ -40,7 +40,7 @@ use interchange::envelopes; use ore::collections::CollectionExt; use ore::str::StrExt; use repr::{strconv, ColumnName, ColumnType, Datum, RelationDesc, RelationType, Row, ScalarType}; -use sql_parser::ast::{CreateSourceFormat, KeyConstraint}; +use sql_parser::ast::{CreateSourceFormat, CsvColumns, KeyConstraint}; use crate::ast::display::AstDisplay; use crate::ast::{ @@ -461,7 +461,7 @@ pub fn plan_create_source( Some(v) => bail!("invalid start_offset value: {}", v), } - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; let key_envelope = get_key_envelope(key_envelope, envelope, &encoding)?; let connector = ExternalSourceConnector::Kafka(KafkaSourceConnector { @@ -505,7 +505,7 @@ pub fn plan_create_source( stream_name, aws_info, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::File { path, compression } => { @@ -527,7 +527,7 @@ pub fn plan_create_source( }, tail, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::S3 { @@ -569,7 +569,7 @@ pub fn plan_create_source( Compression::None => dataflow_types::Compression::None, }, }); - let encoding = get_encoding(format, envelope, with_options_original, col_names)?; + let encoding = get_encoding(format, envelope, with_options_original)?; (connector, encoding, KeyEnvelope::None) } CreateSourceConnector::Postgres { @@ -809,6 +809,7 @@ pub fn plan_create_source( } else { None }; + bare_desc = plan_utils::maybe_rename_columns(format!("source {}", name), bare_desc, &col_names)?; @@ -919,17 +920,16 @@ fn get_encoding( format: &CreateSourceFormat, envelope: &Envelope, with_options: &Vec>, - col_names: &[Ident], ) -> Result { let encoding = match format { CreateSourceFormat::None => bail!("Source format must be specified"), - CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options, col_names)?, + CreateSourceFormat::Bare(format) => get_encoding_inner(format, with_options)?, CreateSourceFormat::KeyValue { key, value } => { - let key = match get_encoding_inner(key, with_options, col_names)? { + let key = match get_encoding_inner(key, with_options)? { SourceDataEncoding::Single(key) => key, SourceDataEncoding::KeyValue { key, .. } => key, }; - let value = match get_encoding_inner(value, with_options, col_names)? { + let value = match get_encoding_inner(value, with_options)? { SourceDataEncoding::Single(value) => value, SourceDataEncoding::KeyValue { value, .. } => value, }; @@ -952,7 +952,6 @@ fn get_encoding( fn get_encoding_inner( format: &Format, with_options: &Vec>, - col_names: &[Ident], ) -> Result { // Avro/CSR can return a `SourceDataEncoding::KeyValue` Ok(SourceDataEncoding::Single(match format { @@ -1067,25 +1066,20 @@ fn get_encoding_inner( let regex = Regex::new(®ex)?; DataEncoding::Regex(RegexEncoding { regex }) } - Format::Csv { - header_row, - n_cols, - delimiter, - } => { - let n_cols = if col_names.is_empty() { - match n_cols { - Some(n) => *n, - None => bail!( - "Cannot determine number of columns in CSV source; specify using \ - CREATE SOURCE...FORMAT CSV WITH X COLUMNS" - ), + Format::Csv { columns, delimiter } => { + let columns = match columns { + CsvColumns::Header { names } => { + if names.is_empty() { + bail!("[internal error] column spec should get names in purify") + } + ColumnSpec::Header { + names: names.iter().cloned().map(|n| n.into_string()).collect(), + } } - } else { - col_names.len() + CsvColumns::Count(n) => ColumnSpec::Count(*n), }; DataEncoding::Csv(CsvEncoding { - header_row: *header_row, - n_cols, + columns, delimiter: match *delimiter as u32 { 0..=127 => *delimiter as u8, _ => bail!("CSV delimiter must be an ASCII character"), diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index b3dd4ecc784c..cf59a2e0a80c 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -16,6 +16,7 @@ use std::future::Future; use anyhow::{anyhow, bail, ensure, Context}; use aws_arn::ARN; +use csv::ReaderBuilder; use itertools::Itertools; use tokio::fs::File; use tokio::io::AsyncBufReadExt; @@ -28,8 +29,8 @@ use repr::strconv; use sql_parser::ast::{ display::AstDisplay, AvroSchema, CreateSourceConnector, CreateSourceFormat, CreateSourceStatement, CreateViewsDefinitions, CreateViewsSourceTarget, CreateViewsStatement, - CsrConnector, CsrSeed, DbzMode, Envelope, Expr, Format, Ident, ProtobufSchema, Query, Raw, - RawName, Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, + CsrConnector, CsrSeed, CsvColumns, DbzMode, Envelope, Expr, Format, Ident, ProtobufSchema, + Query, Raw, RawName, Select, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, UnresolvedObjectName, Value, ViewDefinition, WithOption, WithOptionValue, }; use sql_parser::parser::parse_columns; @@ -75,7 +76,6 @@ pub fn purify( async move { if let Statement::CreateSource(CreateSourceStatement { - col_names, connector, format, envelope, @@ -197,15 +197,7 @@ pub fn purify( CreateSourceConnector::PubNub { .. } => (), } - purify_source_format( - format, - connector, - &envelope, - col_names, - file, - &config_options, - ) - .await?; + purify_source_format(format, connector, &envelope, file, &config_options).await?; } if let Statement::CreateViews(CreateViewsStatement { definitions, .. }) = &mut stmt { if let CreateViewsDefinitions::Source { @@ -360,7 +352,6 @@ async fn purify_source_format( format: &mut CreateSourceFormat, connector: &mut CreateSourceConnector, envelope: &Envelope, - col_names: &mut Vec, file: Option, connector_options: &BTreeMap, ) -> Result<(), anyhow::Error> { @@ -393,15 +384,8 @@ async fn purify_source_format( match format { CreateSourceFormat::None => {} CreateSourceFormat::Bare(format) => { - purify_source_format_single( - format, - connector, - envelope, - col_names, - file, - connector_options, - ) - .await? + purify_source_format_single(format, connector, envelope, file, connector_options) + .await? } CreateSourceFormat::KeyValue { key, value: val } => { @@ -410,24 +394,8 @@ async fn purify_source_format( anyhow!("[internal-error] File sources cannot be key-value sources") ); - purify_source_format_single( - key, - connector, - envelope, - col_names, - None, - connector_options, - ) - .await?; - purify_source_format_single( - val, - connector, - envelope, - col_names, - None, - connector_options, - ) - .await?; + purify_source_format_single(key, connector, envelope, None, connector_options).await?; + purify_source_format_single(val, connector, envelope, None, connector_options).await?; } } Ok(()) @@ -437,7 +405,6 @@ async fn purify_source_format_single( format: &mut Format, connector: &mut CreateSourceConnector, envelope: &Envelope, - col_names: &mut Vec, file: Option, connector_options: &BTreeMap, ) -> Result<(), anyhow::Error> { @@ -488,24 +455,110 @@ async fn purify_source_format_single( } }, Format::Csv { - header_row, delimiter, - .. + ref mut columns, } => { - if *header_row && col_names.is_empty() { - if let Some(file) = file { - let file = tokio::io::BufReader::new(file); - let csv_header = file.lines().next_line().await?; - match csv_header { - Some(csv_header) => { - csv_header - .split(*delimiter as char) - .for_each(|v| col_names.push(Ident::from(v))); + if matches!(columns, CsvColumns::Header { .. }) + && !matches!(connector, CreateSourceConnector::File { .. }) + { + unimplemented!("CSV WITH HEADER with non-file sources"); + } + + let first_row = if let Some(file) = file { + let file = tokio::io::BufReader::new(file); + let csv_header = file.lines().next_line().await; + if !delimiter.is_ascii() { + bail!("CSV delimiter must be ascii"); + } + match csv_header { + Ok(Some(csv_header)) => { + let mut reader = ReaderBuilder::new() + .delimiter(*delimiter as u8) + .has_headers(false) + .from_reader(csv_header.as_bytes()); + + if let Some(result) = reader.records().next() { + match result { + Ok(headers) => Some(headers), + Err(e) => bail!("Unable to parse header row: {}", e), + } + } else { + None + } + } + Ok(None) => { + if let CsvColumns::Header { names } = columns { + if names.is_empty() { + bail!( + "CSV file expected to have at least one line \ + to determine column names, but is empty" + ); + } else { + None + } + } else { + None + } + } + Err(e) => { + // TODO(#7562): support compressed files + if let CsvColumns::Header { names } = columns { + if names.is_empty() { + bail!("Cannot determine header by reading CSV file: {}", e); + } else { + None + } + } else { + None + } + } + } + } else { + None + }; + + match (&columns, first_row) { + (CsvColumns::Header { names }, Some(headers)) if names.is_empty() => { + *columns = CsvColumns::Header { + names: headers.into_iter().map(Ident::from).collect(), + }; + } + (CsvColumns::Header { names }, Some(headers)) => { + if names.len() != headers.len() { + bail!( + "Named column count ({}) does not match \ + number of columns discovered ({})", + names.len(), + headers.len() + ); + } else if let Some((sql, csv)) = names + .iter() + .zip(headers.iter()) + .find(|(sql, csv)| sql.as_str() != &**csv) + { + bail!("Header columns do not match named columns from CREATE SOURCE statement. \ + First mismatched columns: {} != {}", sql, csv); + } + } + (CsvColumns::Header { names }, None) if names.is_empty() => { + bail!("WITH HEADER requires a way to determine the header row, but file does not exist"); + } + (CsvColumns::Header { names }, None) => { + // we don't need to do any verification if we are told the names of the headers + assert!(!names.is_empty(), "match arm moved into the wrong spot"); + } + + (CsvColumns::Count(n), first_line) => { + if let Some(columns) = first_line { + if *n != columns.len() { + bail!( + "Specified column count (WITH {} COLUMNS) \ + does not match number of columns in CSV file ({})", + n, + columns.len() + ); } - None => bail!("CSV file expected header line, but is empty"), } - } else { - bail!("CSV format with headers only works with file connectors") } } } diff --git a/test/testdrive/csv-sources.td b/test/testdrive/csv-sources.td index a43783433fcd..311ad3d297e2 100644 --- a/test/testdrive/csv-sources.td +++ b/test/testdrive/csv-sources.td @@ -21,6 +21,39 @@ place""",CA,92679 FORMAT CSV WITH 3 COLUMNS unexpected parameters for CREATE SOURCE: badoption +! CREATE SOURCE mismatched_column_count + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH 2 COLUMNS +Specified column count (WITH 2 COLUMNS) does not match number of columns in CSV file (3) + +> CREATE MATERIALIZED SOURCE matching_column_names + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (city, state, zip) + +> SELECT * FROM matching_column_names where zip = '14618' +city state zip mz_line_no +------------------------- +Rochester NY 14618 1 + +> CREATE MATERIALIZED SOURCE matching_column_names_alias (a, b, c) + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (city, state, zip) + +> SELECT * FROM matching_column_names_alias where c = '14618' +a b c mz_line_no +---------------- +Rochester NY 14618 1 + +! CREATE SOURCE mismatched_column_names + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (cities, country, zip) +Header columns do not match named columns from CREATE SOURCE statement. First mismatched columns: cities != city + +! CREATE SOURCE mismatched_column_names_count + FROM FILE '${testdrive.temp-dir}/static.csv' + FORMAT CSV WITH HEADER (cities, state) +Named column count (2) does not match number of columns discovered (3) + # Static CSV without headers. > CREATE MATERIALIZED SOURCE static_csv FROM FILE '${testdrive.temp-dir}/static.csv' @@ -136,6 +169,21 @@ Dollars,Category ! SELECT * FROM bad_text_csv Decode error: Text: CSV error at record number 2: invalid UTF-8 +# CSV file with comma in header + +$ file-append path=header-delimiter.csv +"interesting,id",name +1,blat + +> CREATE MATERIALIZED SOURCE header_delimited + FROM FILE '${testdrive.temp-dir}/header-delimiter.csv' + FORMAT CSV WITH HEADER + +> SELECT * FROM header_delimited +interesting,id name mz_line_no +------------------------------ +1 blat 1 + # Declare a key constraint (PRIMARY KEY NOT ENFORCED) $ kafka-create-topic topic=static-csv-pkne-sink diff --git a/test/upgrade/check-from-any_version-csv.td b/test/upgrade/check-from-any_version-csv.td new file mode 100644 index 000000000000..344cbc3f52d0 --- /dev/null +++ b/test/upgrade/check-from-any_version-csv.td @@ -0,0 +1,30 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +> select * from csv_upgrade_with_header +id value +--------- +1 person + +> select * from csv_upgrade_with_header_alias +al sia +-------- +1 person + +> select * from csv_upgrade_no_header +column1 column2 +---------------- +id value +1 person + +> select * from csv_upgrade_no_header_alias +al sia +---------- +id value +1 person diff --git a/test/upgrade/check-from-latest_version-csv-header-explicit.td b/test/upgrade/check-from-latest_version-csv-header-explicit.td new file mode 100644 index 000000000000..e208e1f88e13 --- /dev/null +++ b/test/upgrade/check-from-latest_version-csv-header-explicit.td @@ -0,0 +1,18 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +> select * from csv_upgrade_explicit_header +id value +-------- +1 person + +> select * from csv_upgrade_explicit_header_alias +foo bar +-------- +1 person diff --git a/test/upgrade/create-in-any_version-csv.td b/test/upgrade/create-in-any_version-csv.td new file mode 100644 index 000000000000..63e4e8dc3bdd --- /dev/null +++ b/test/upgrade/create-in-any_version-csv.td @@ -0,0 +1,28 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ file-append path=csv.csv +id,value +1,person + +> CREATE MATERIALIZED SOURCE csv_upgrade_with_header + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH HEADER + +> CREATE MATERIALIZED SOURCE csv_upgrade_with_header_alias (al, sia) + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH HEADER + +> CREATE MATERIALIZED SOURCE csv_upgrade_no_header + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH 2 COLUMNS + +> CREATE MATERIALIZED SOURCE csv_upgrade_no_header_alias (al, sia) + FROM FILE '${testdrive.temp-dir}/csv.csv' + FORMAT CSV WITH 2 COLUMNS diff --git a/test/upgrade/create-in-latest_version-csv-with-header-explicit.td b/test/upgrade/create-in-latest_version-csv-with-header-explicit.td new file mode 100644 index 000000000000..3a63e207fee0 --- /dev/null +++ b/test/upgrade/create-in-latest_version-csv-with-header-explicit.td @@ -0,0 +1,20 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ file-append path=upgrade-csv-with-header-explicit.csv +id,value +1,person + +> CREATE MATERIALIZED SOURCE csv_upgrade_explicit_header + FROM FILE '${testdrive.temp-dir}/upgrade-csv-with-header-explicit.csv' + FORMAT CSV WITH HEADER (id, value) + +> CREATE MATERIALIZED SOURCE csv_upgrade_explicit_header_alias (foo, bar) + FROM FILE '${testdrive.temp-dir}/upgrade-csv-with-header-explicit.csv' + FORMAT CSV WITH HEADER (id, value)