Skip to content

Commit

Permalink
Implement new CSV WITH HEADER (cols) syntax
Browse files Browse the repository at this point in the history
This syntax allows users to provide header names for objects that do not yet
exist. It additionally allows Materialize to record header columns into SQL for
the catalog interacting less with the more general aliases SQL feature -- we
still put default column names in the SQL aliases if the format is specifed as
`WITH n COLUMNS`.

Design: MaterializeInc#7407
Part of: MaterializeInc#7145
  • Loading branch information
quodlibetor committed Aug 27, 2021
1 parent 9706350 commit b15125d
Show file tree
Hide file tree
Showing 16 changed files with 547 additions and 151 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 74 additions & 4 deletions src/coord/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
// by the Apache License, Version 2.0.

use anyhow::bail;
use futures::executor::block_on;
use lazy_static::lazy_static;
use semver::Version;
use tokio::fs::File;

use ore::collections::CollectionExt;
use sql::ast::display::AstDisplay;
use sql::ast::visit_mut::{self, VisitMut};
use sql::ast::{
AvroSchema, CreateIndexStatement, CreateSinkStatement, CreateSourceFormat,
CreateSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
CsrConnector, DataType, Format, Function, Ident, Raw, RawName, SqlOption, Statement,
TableFactor, UnresolvedObjectName, Value, ViewDefinition, WithOption, WithOptionValue,
AvroSchema, CreateIndexStatement, CreateSinkStatement, CreateSourceConnector,
CreateSourceFormat, CreateSourceStatement, CreateTableStatement, CreateTypeStatement,
CreateViewStatement, CsrConnector, CsvColumns, DataType, Format, Function, Ident, Raw, RawName,
SqlOption, Statement, TableFactor, UnresolvedObjectName, Value, ViewDefinition, WithOption,
WithOptionValue,
};
use sql::plan::resolve_names_stmt;

Expand Down Expand Up @@ -76,6 +79,7 @@ pub(crate) fn migrate(catalog: &mut Catalog) -> Result<(), anyhow::Error> {
ast_insert_default_confluent_wire_format_0_7_1(stmt)?;
if catalog_version < *VER_0_9_1 {
ast_rewrite_pg_catalog_char_to_text_0_9_1(stmt)?;
ast_rewrite_csv_column_aliases_0_9_1(stmt)?;
}
Ok(())
})?;
Expand Down Expand Up @@ -459,6 +463,72 @@ fn ast_rewrite_type_references_0_6_1(
Ok(())
}

/// Rewrite CSV sources to use the explicit `FORMAT CSV WITH HEADER (name, ...)` syntax
///
/// This provides us an explicit check that we are reading the correct columns, and also allows us
/// to in the future correctly loosen the semantics of our column aliases syntax to not exactly
/// match the number of columns in a source.
fn ast_rewrite_csv_column_aliases_0_9_1(
stmt: &mut sql::ast::Statement<Raw>,
) -> Result<(), anyhow::Error> {
let (connector, col_names, columns, delimiter) =
if let Statement::CreateSource(CreateSourceStatement {
connector,
col_names,
format: CreateSourceFormat::Bare(Format::Csv { columns, delimiter }),
..
}) = stmt
{
// only do anything if we have empty header names for a csv source
if !matches!(columns, CsvColumns::Header { .. }) {
return Ok(());
}
if let CsvColumns::Header { names } = columns {
if !names.is_empty() {
return Ok(());
}
}

(connector, col_names, columns, delimiter)
} else {
return Ok(());
};

// Try to load actual columns from existing file if we don't have correct data
let result = (|| -> anyhow::Result<()> {
if let CreateSourceConnector::File { path, .. } = &connector {
let file = block_on(async {
let f = File::open(&path).await?;

if f.metadata().await?.is_dir() {
bail!("expected a regular file, but {} is a directory.", path);
}
Ok(Some(f))
})?;

block_on(async { sql::pure::purify_csv(file, &connector, *delimiter, columns).await })?;
}
Ok(())
})();

// if we can't read from the file, or purification fails for some other reason, then we can
// at least use the names that may have been auto-populated from the file previously. If
// they match then everything will work out. If they don't match, then at least there isn't
// a catalog corruption error.
if let Err(e) = result {
log::warn!(
"Error retrieving column names from file ({}) \
using previously defined column aliases",
e
);
if let CsvColumns::Header { names } = columns {
names.extend_from_slice(col_names);
}
}

Ok(())
}

// ****************************************************************************
// Semantic migrations -- Weird migrations that require access to the catalog
// ****************************************************************************
Expand Down
52 changes: 45 additions & 7 deletions src/dataflow-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,19 @@ impl DataEncoding {
let ty = ScalarType::String.nullable(true);
desc.with_named_column(name, ty)
}),
DataEncoding::Csv(CsvEncoding { n_cols, .. }) => {
(1..=*n_cols).fold(key_desc, |desc, i| {
DataEncoding::Csv(CsvEncoding { columns, .. }) => match columns {
ColumnSpec::Count(n) => (1..=*n).into_iter().fold(key_desc, |desc, i| {
desc.with_named_column(
format!("column{}", i),
ScalarType::String.nullable(false),
)
})
}
}),
ColumnSpec::Header { names } => {
names.iter().map(|s| &**s).fold(key_desc, |desc, name| {
desc.with_named_column(name, ScalarType::String.nullable(false))
})
}
},
DataEncoding::Text => {
key_desc.with_named_column("text", ScalarType::String.nullable(false))
}
Expand Down Expand Up @@ -598,14 +603,47 @@ pub struct ProtobufEncoding {
pub message_name: String,
}

/// Encoding in CSV format, with `n_cols` columns per row, with an optional header.
/// Arguments necessary to define how to decode from CSV format
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CsvEncoding {
pub header_row: bool,
pub n_cols: usize,
pub columns: ColumnSpec,
pub delimiter: u8,
}

impl CsvEncoding {
pub fn has_header(&self) -> bool {
matches!(self.columns, ColumnSpec::Header { .. })
}
}

/// Determines the RelationDesc and decoding of CSV objects
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ColumnSpec {
/// The first row is not a header row, and all columns get default names like `columnN`.
Count(usize),
/// The first row is a header row and therefore does become data
///
/// Each of the values in `names` becomes the default name of a column in the dataflow.
Header { names: Vec<String> },
}

impl ColumnSpec {
/// The number of columns described by the column spec.
pub fn arity(&self) -> usize {
match self {
ColumnSpec::Count(n) => *n,
ColumnSpec::Header { names } => names.len(),
}
}

pub fn into_header_names(self) -> Option<Vec<String>> {
match self {
ColumnSpec::Count(_) => None,
ColumnSpec::Header { names } => Some(names),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegexEncoding {
#[serde(with = "serde_regex")]
Expand Down
71 changes: 48 additions & 23 deletions src/dataflow/src/decode/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

use std::iter;

use dataflow_types::CsvEncoding;
use dataflow_types::LinearOperator;

use dataflow_types::{DataflowError, DecodeError};
use dataflow_types::{CsvEncoding, DataflowError, DecodeError, LinearOperator};
use repr::{Datum, Row};

#[derive(Debug)]
pub struct CsvDecoderState {
header_row: bool,
next_row_is_header: bool,
header_names: Option<Vec<String>>,
n_cols: usize,
output: Vec<u8>,
output_cursor: usize,
Expand All @@ -36,11 +34,8 @@ impl CsvDecoderState {
}

pub fn new(format: CsvEncoding, operators: &mut Option<LinearOperator>) -> Self {
let CsvEncoding {
header_row,
n_cols,
delimiter,
} = format;
let CsvEncoding { columns, delimiter } = format;
let n_cols = columns.arity();

let operators = operators.take();
let demanded = (0..n_cols)
Expand All @@ -52,8 +47,10 @@ impl CsvDecoderState {
})
.collect::<Vec<_>>();

let header_names = columns.into_header_names();
Self {
header_row,
next_row_is_header: header_names.is_some(),
header_names,
n_cols,
output: vec![0],
output_cursor: 0,
Expand Down Expand Up @@ -117,21 +114,27 @@ impl CsvDecoderState {
row_packer.extend(
(0..self.n_cols)
.map(|i| {
Datum::String(if self.demanded[i] {
&output[self.ends[i]..self.ends[i + 1]]
} else {
""
})
Datum::String(
if self.next_row_is_header
|| self.demanded[i]
{
&output[self.ends[i]..self.ends[i + 1]]
} else {
""
},
)
})
.chain(iter::once(Datum::from(coord))),
);
} else {
row_packer.extend((0..self.n_cols).map(|i| {
Datum::String(if self.demanded[i] {
&output[self.ends[i]..self.ends[i + 1]]
} else {
""
})
Datum::String(
if self.next_row_is_header || self.demanded[i] {
&output[self.ends[i]..self.ends[i + 1]]
} else {
""
},
)
}));
}
self.row_packer = row_packer;
Expand All @@ -150,10 +153,32 @@ impl CsvDecoderState {
}
}
};
if self.header_row {
self.header_row = false;

// skip header rows, do not send them into dataflow
if self.next_row_is_header {
self.next_row_is_header = false;

if let Ok(Some(row)) = &result {
let mismatched = row
.iter()
.zip(self.header_names.iter().flatten())
.enumerate()
.find(|(_, (actual, expected))| actual.unwrap_str() != &**expected);
if let Some((i, (actual, expected))) = mismatched {
break Err(DataflowError::DecodeError(DecodeError::Text(format!(
"source file contains incorrect columns '{:?}', \
first mismatched column at index {} expected={} actual={}",
row,
i + 1,
expected,
actual
))));
}
}
if chunk.is_empty() {
break Ok(None);
} else if result.is_err() {
break result;
}
} else {
break result;
Expand Down
44 changes: 31 additions & 13 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ pub enum Format<T: AstInfo> {
Protobuf(ProtobufSchema<T>),
Regex(String),
Csv {
header_row: bool,
n_cols: Option<usize>,
columns: CsvColumns,
delimiter: char,
},
Json,
Expand All @@ -241,6 +240,33 @@ impl<T: AstInfo> Format<T> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CsvColumns {
/// `WITH count COLUMNS`
Count(usize),
/// `WITH HEADER (ident, ...)?`: `names` is empty if there are no names specified
Header { names: Vec<Ident> },
}

impl AstDisplay for CsvColumns {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CsvColumns::Count(n) => {
f.write_str(n);
f.write_str(" COLUMNS")
}
CsvColumns::Header { names } => {
f.write_str("HEADER");
if !names.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&names));
f.write_str(")");
}
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSourceKeyEnvelope {
/// `INCLUDE KEY` is absent
Expand Down Expand Up @@ -324,18 +350,10 @@ impl<T: AstInfo> AstDisplay for Format<T> {
f.write_node(&display::escape_single_quote_string(regex));
f.write_str("'");
}
Self::Csv {
header_row,
n_cols,
delimiter,
} => {
Self::Csv { columns, delimiter } => {
f.write_str("CSV WITH ");
if *header_row {
f.write_str("HEADER");
} else {
f.write_str(n_cols.unwrap());
f.write_str(" COLUMNS");
}
f.write_node(columns);

if *delimiter != ',' {
f.write_str(" DELIMITED BY '");
f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
Expand Down
6 changes: 6 additions & 0 deletions src/sql-parser/src/ast/defs/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl From<&str> for Ident {
}
}

impl From<String> for Ident {
fn from(value: String) -> Self {
Ident(value)
}
}

/// More-or-less a direct translation of the Postgres function for doing the same thing:
///
/// <https://github.com/postgres/postgres/blob/master/src/backend/utils/adt/ruleutils.c#L10730-L10812>
Expand Down
Loading

0 comments on commit b15125d

Please sign in to comment.