Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlp: various improvements #1635

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 97 additions & 13 deletions src/cmd/sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ grouping, sorting, and more - working on larger than memory CSV files.
Polars SQL is a SQL dialect, converting SQL queries to fast Polars LazyFrame expressions.

For a list of SQL functions and keywords supported by Polars SQL, see
https://github.com/pola-rs/polars/blob/rs-0.38.0/crates/polars-sql/src/functions.rs
https://github.com/pola-rs/polars/blob/rs-0.38.0/crates/polars-sql/src/keywords.rs and
https://github.com/pola-rs/polars/blob/rs-0.38.1/crates/polars-sql/src/functions.rs
https://github.com/pola-rs/polars/blob/rs-0.38.1/crates/polars-sql/src/keywords.rs and
https://github.com/pola-rs/polars/issues/7227

Returns the shape of the query result (number of rows, number of columns) to stderr.
Expand Down Expand Up @@ -133,6 +133,7 @@ sqlp options:
--format <arg> The output format to use. Valid values are:
csv Comma-separated values
json JSON
jsonl JSONL (JSON Lines)
parquet Apache Parquet
arrow Apache Arrow IPC
avro Apache Avro
Expand Down Expand Up @@ -178,10 +179,14 @@ sqlp options:
--wnull-value <arg> The string to use when WRITING null values.
(default: <empty string>)

PARQUET OUTPUT FORMAT ONLY:
--compression <arg> The compression codec to use when writing parquet files.
Valid values are: zstd, lz4raw, gzip, snappy, uncompressed
ARROW/AVRO/PARQUET OUTPUT FORMATS ONLY:
--compression <arg> The compression codec to use when writing arrow or parquet files.
For Arrow, valid values are: zstd, lz4, uncompressed
For Avro, valid values are: deflate, snappy, uncompressed (default)
For Parquet, valid values are: zstd, lz4raw, gzip, snappy, uncompressed
(default: zstd)

PARQUET OUTPUT FORMAT ONLY:
--compress-level <arg> The compression level to use when using zstd or gzip compression.
When using zstd, valid values are -7 to 22, with -7 being the
lowest compression level and 22 being the highest compression level.
Expand Down Expand Up @@ -212,10 +217,11 @@ use std::{
};

use polars::{
io::avro::AvroWriter,
io::avro::{AvroWriter, Compression as AvroCompression},
prelude::{
CsvWriter, DataFrame, GzipLevel, IpcWriter, JsonWriter, LazyCsvReader, LazyFileListReader,
NullValues, ParquetCompression, ParquetWriter, SerWriter, ZstdLevel,
CsvWriter, DataFrame, GzipLevel, IpcCompression, IpcWriter, JsonFormat, JsonWriter,
LazyCsvReader, LazyFileListReader, NullValues, ParquetCompression, ParquetWriter,
SerWriter, ZstdLevel,
},
sql::SQLContext,
};
Expand Down Expand Up @@ -263,6 +269,7 @@ enum OutputMode {
#[default]
Csv,
Json,
Jsonl,
Parquet,
Arrow,
Avro,
Expand Down Expand Up @@ -309,12 +316,17 @@ impl OutputMode {
.with_null_value(args.flag_wnull_value)
.include_bom(util::get_envvar_flag("QSV_OUTPUT_BOM"))
.finish(&mut df),
OutputMode::Json => JsonWriter::new(&mut w).finish(&mut df),
OutputMode::Json => JsonWriter::new(&mut w)
.with_json_format(JsonFormat::Json)
.finish(&mut df),
OutputMode::Jsonl => JsonWriter::new(&mut w)
.with_json_format(JsonFormat::JsonLines)
.finish(&mut df),
OutputMode::Parquet => {
let compression: PqtCompression = args
.flag_compression
.parse()
.unwrap_or(PqtCompression::Lz4Raw);
.unwrap_or(PqtCompression::Uncompressed);

let parquet_compression = match compression {
PqtCompression::Uncompressed => ParquetCompression::Uncompressed,
Expand Down Expand Up @@ -342,8 +354,38 @@ impl OutputMode {
.finish(&mut df)
.map(|_| ())
},
OutputMode::Arrow => IpcWriter::new(&mut w).finish(&mut df),
OutputMode::Avro => AvroWriter::new(&mut w).finish(&mut df),
OutputMode::Arrow => {
let compression: ArrowCompression = args
.flag_compression
.parse()
.unwrap_or(ArrowCompression::Uncompressed);

let ipc_compression: Option<IpcCompression> = match compression {
ArrowCompression::Uncompressed => None,
ArrowCompression::Lz4 => Some(IpcCompression::LZ4),
ArrowCompression::Zstd => Some(IpcCompression::ZSTD),
};

IpcWriter::new(&mut w)
.with_compression(ipc_compression)
.finish(&mut df)
},
OutputMode::Avro => {
let compression: QsvAvroCompression = args
.flag_compression
.parse()
.unwrap_or(QsvAvroCompression::Uncompressed);

let avro_compression = match compression {
QsvAvroCompression::Uncompressed => None,
QsvAvroCompression::Deflate => Some(AvroCompression::Deflate),
QsvAvroCompression::Snappy => Some(AvroCompression::Snappy),
};

AvroWriter::new(&mut w)
.with_compression(avro_compression)
.finish(&mut df)
},
OutputMode::None => Ok(()),
};

Expand All @@ -367,6 +409,7 @@ impl FromStr for OutputMode {
match s.to_ascii_lowercase().as_str() {
"csv" => Ok(OutputMode::Csv),
"json" => Ok(OutputMode::Json),
"jsonl" => Ok(OutputMode::Jsonl),
"parquet" => Ok(OutputMode::Parquet),
"arrow" => Ok(OutputMode::Arrow),
"avro" => Ok(OutputMode::Avro),
Expand All @@ -384,6 +427,21 @@ enum PqtCompression {
Zstd,
Lz4Raw,
}
#[derive(Default, Copy, Clone)]
enum ArrowCompression {
#[default]
Uncompressed,
Lz4,
Zstd,
}

#[derive(Default, Copy, Clone)]
enum QsvAvroCompression {
#[default]
Uncompressed,
Deflate,
Snappy,
}

impl FromStr for PqtCompression {
type Err = String;
Expand All @@ -395,7 +453,33 @@ impl FromStr for PqtCompression {
"snappy" => Ok(PqtCompression::Snappy),
"lz4raw" => Ok(PqtCompression::Lz4Raw),
"zstd" => Ok(PqtCompression::Zstd),
_ => Err(format!("Invalid compression format: {s}")),
_ => Err(format!("Invalid Parquet compression format: {s}")),
}
}
}

impl FromStr for ArrowCompression {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"uncompressed" => Ok(ArrowCompression::Uncompressed),
"lz4" => Ok(ArrowCompression::Lz4),
"zstd" => Ok(ArrowCompression::Zstd),
_ => Err(format!("Invalid Arrow compression format: {s}")),
}
}
}

impl FromStr for QsvAvroCompression {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"uncompressed" => Ok(QsvAvroCompression::Uncompressed),
"deflate" => Ok(QsvAvroCompression::Deflate),
"snappy" => Ok(QsvAvroCompression::Snappy),
_ => Err(format!("Invalid Avro compression format: {s}")),
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions tests/test_sqlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,29 @@ select ward,count(*) as cnt from temp_table2 group by ward order by cnt desc, wa
.arg("test.sql")
.args(["--format", "json"]);

let got: String = wrk.stdout(&mut cmd);
let expected = r#"[{"ward":"Ward 3","cnt":2},{"ward":" ","cnt":1},{"ward":"04","cnt":1},{"ward":"3","cnt":1},{"ward":"Ward 13","cnt":1},{"ward":"Ward 17","cnt":1},{"ward":"Ward 19","cnt":1},{"ward":"Ward 21","cnt":1},{"ward":"Ward 6","cnt":1}]"#;

assert_eq!(got, expected);
}

#[test]
fn sqlp_boston311_sql_script_jsonl() {
let wrk = Workdir::new("sqlp_boston311_sql_script_jsonl");
let test_file = wrk.load_test_file("boston311-100.csv");

wrk.create_from_string(
"test.sql",
r#"create table temp_table as select * from "boston311-100" where ontime = 'OVERDUE';
create table temp_table2 as select * from temp_table limit 10;
select ward,count(*) as cnt from temp_table2 group by ward order by cnt desc, ward asc;"#,
);

let mut cmd = wrk.command("sqlp");
cmd.arg(&test_file)
.arg("test.sql")
.args(["--format", "jsonl"]);

let got: String = wrk.stdout(&mut cmd);
let expected = r#"{"ward":"Ward 3","cnt":2}
{"ward":" ","cnt":1}
Expand Down
Loading