This repository has been archived by the owner on Nov 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
duckdb.rs
199 lines (178 loc) · 6.91 KB
/
duckdb.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
use std::io::prelude::*;
use anyhow::{anyhow, Result};
use log::{debug, error, info, warn};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow::{csv, json};
use parquet::arrow::arrow_writer;
use chrono::{DateTime, Utc};
use duckdb::{
types::{FromSql, ValueRef},
Connection,
};
use regex::Regex;
use crate::{get_dest_from_to, get_sql_from_query, OutputFormat, OutputWriter, SourcesType};
pub fn query(
query: &str,
sources: &SourcesType,
to: &str,
database: &str,
format: &OutputFormat,
writer: &OutputWriter,
) -> Result<()> {
let mut query = query.to_string();
if query.starts_with("prql ") {
// prepend CTEs for the source aliases
let mut lines: Vec<String> = query.split("\n").map(|s| s.to_string()).collect();
debug!("sources = {sources:?}");
for (alias, source) in sources.iter() {
debug!("alias = {alias:?}; source = {source:?}");
// Needs the _{}_ on the LHS for _{}_.*
lines.insert(
1,
format!("table {alias} = (from __{alias}__=__file_{alias}__)"),
);
}
query = lines.join("\n");
debug!("query = {query:?}");
}
// compile the PRQL to SQL
let mut sql = get_sql_from_query(&query)?;
debug!(
"sql = {:?}",
sql.split_whitespace().collect::<Vec<&str>>().join(" ")
);
if query.starts_with("prql ") {
// replace the table placeholders again
for (alias, source) in sources.iter() {
debug!("alias = {alias:?}; source = {source:?}");
let placeholder = format!("__file_{alias}__");
debug!("placeholder = {placeholder:?}");
let quoted_source = if source.ends_with(".csv") {
format!("read_csv_auto('{source}')")
} else if source.ends_with(".parquet") {
format!("read_parquet('{source}')")
} else if database.starts_with("postgres") {
let mut parts: Vec<&str> = source.split(".").collect();
if parts.len() == 1 {
parts.insert(0, "public");
}
let table = parts
.pop()
.ok_or(anyhow!("Couldn't extract table name from {source}."))?;
let schema = parts
.pop()
.ok_or(anyhow!("Couldn't extract schema name from {source}."))?;
format!("postgres_scan('{database}', '{schema}', '{table}')")
} else {
format!(r#"'{source}'"#)
};
debug!("quoted_source = {quoted_source:?}");
sql = sql.replace(&placeholder, "ed_source);
}
debug!("sql = {sql:?}");
}
// prepare the connection and statement
let conn = if database == "" {
debug!("Opening in-memory DuckDB database");
Connection::open_in_memory()?
} else if database.starts_with("sqlite://") {
let con = Connection::open_in_memory()?;
// Install and load the sqlite_scanner extension
let load_extension = "INSTALL sqlite_scanner; LOAD sqlite_scanner;";
con.execute_batch(load_extension)?;
let dbpath = database.strip_prefix("sqlite://").map_or(database, |p| p);
let attach_sql = format!("CALL sqlite_attach('{dbpath}')");
con.execute_batch(&attach_sql)?;
con
} else if database.starts_with("postgres") {
let con = Connection::open_in_memory()?;
// Check if a schema was specified
let re = Regex::new(r"^(?P<uri>[^?]+)(?P<schema>\?currentSchema=.+)?$")?;
let caps = re
.captures(database)
.ok_or(anyhow!("Couldn't match regex!"))?;
let uri = caps
.name("uri")
.ok_or(anyhow!("Couldn't extract URI!"))?
.as_str();
debug!("uri={:?}", uri);
let schema_param = caps
.name("schema")
.map_or("?currentSchema=public", |p| p.as_str());
let schema = schema_param.split("=").last().map_or("public", |p| p);
debug!("schema={:?}", schema);
// Install and load the postgres_scanner extension
let load_extension = "INSTALL postgres_scanner; LOAD postgres_scanner;";
con.execute_batch(load_extension)?;
let attach_sql = format!("CALL postgres_attach('{uri}', source_schema='{schema}')");
debug!("attach_sql={:?}", attach_sql);
con.execute_batch(&attach_sql)?;
con
} else {
let dbpath = database.strip_prefix("duckdb://").map_or(database, |p| p);
debug!("Opening DuckDB database: dbpath={:?}", dbpath);
Connection::open(dbpath)?
};
// Install and load the parquet extension
// FIXME: Be smarter about this and only do it where required
let load_parquet_extension = "INSTALL parquet; LOAD parquet;";
conn.execute_batch(load_parquet_extension)?;
// Execute the query
let mut stmt = conn.prepare(&sql)?;
let rbs = stmt.query_arrow([])?.collect::<Vec<RecordBatch>>();
match writer {
OutputWriter::arrow => write_results_with_arrow(&rbs, to, format),
OutputWriter::backend => write_results_with_duckdb(&rbs, to, format),
}
}
fn write_results_with_duckdb(rbs: &[RecordBatch], to: &str, format: &OutputFormat) -> Result<()> {
unimplemented!("write_results_with_duckdb");
}
fn write_results_with_arrow(rbs: &[RecordBatch], to: &str, format: &OutputFormat) -> Result<()> {
let mut dest: Box<dyn Write> = get_dest_from_to(to)?;
match format {
OutputFormat::csv => write_record_batches_to_csv(rbs, &mut dest)?,
OutputFormat::json => write_record_batches_to_json(rbs, &mut dest)?,
OutputFormat::parquet => write_record_batches_to_parquet(rbs, &mut dest)?,
OutputFormat::table => write_record_batches_to_table(rbs, &mut dest)?,
}
Ok(())
}
fn write_record_batches_to_csv(rbs: &[RecordBatch], dest: &mut dyn Write) -> Result<()> {
{
let mut writer = csv::Writer::new(dest);
for rb in rbs {
writer.write(rb)?;
}
}
Ok(())
}
fn write_record_batches_to_json(rbs: &[RecordBatch], dest: &mut dyn Write) -> Result<()> {
{
// let mut writer = json::ArrayWriter::new(&mut buf);
let mut writer = json::LineDelimitedWriter::new(dest);
writer.write_batches(&rbs)?;
writer.finish()?;
}
Ok(())
}
fn write_record_batches_to_parquet(rbs: &[RecordBatch], dest: &mut dyn Write) -> Result<()> {
if rbs.is_empty() {
return Ok(());
}
let schema = rbs[0].schema();
{
let mut writer = arrow_writer::ArrowWriter::try_new(dest, schema, None)?;
for rb in rbs {
writer.write(rb)?;
}
writer.close()?;
}
Ok(())
}
fn write_record_batches_to_table(rbs: &[RecordBatch], dest: &mut dyn Write) -> Result<()> {
dest.write(pretty_format_batches(rbs)?.to_string().as_bytes());
dest.write(b"\n");
Ok(())
}