Skip to content

Commit

Permalink
feat: Add different output support to queries
Browse files Browse the repository at this point in the history
This commit adds the ability to choose the output format of a query via
the v3 api so that a user can choose, whether by Accept headers or the
format url param, how the data will be returned to them.

Prior to this commit the default was a pretty printed text format, but
that instead has been changed to json as the default.

There are multiple formats one can choose:

1. json
2. csv
3. pretty printed text
4. parquet

I've tested each of these out and it works well. In particular the
parquet output is exciting as users will be able to perform a query and
receive back parquet data that they can then load into say a Python
script or something else to work on and operate it. As we extend what
data can be queried, as well as persisting it, what people will be able
to do with Edge will be really cool and I'm interested to see how users
will end up using this functionality in the future.
  • Loading branch information
mgattozzi committed Feb 1, 2024
1 parent ff567cd commit 16c4475
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ flate2 = "1.0.27"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
arrow-json = "49.0.0"
arrow-schema = "49.0.0"
arrow-csv = "49.0.0"

[dev-dependencies]
parquet_file = { path = "../parquet_file" }
Expand Down
117 changes: 111 additions & 6 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use arrow::util::pretty;
use authz::http::AuthorizationHeaderExtension;
use bytes::{Bytes, BytesMut};
use data_types::NamespaceName;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use futures::StreamExt;
use hyper::header::ACCEPT;
use hyper::header::CONTENT_ENCODING;
use hyper::http::HeaderValue;
use hyper::server::conn::{AddrIncoming, AddrStream};
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::WriteBuffer;
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, error, info};
Expand Down Expand Up @@ -117,6 +120,18 @@ pub enum Error {
/// WriteBuffer error
#[error("write buffer error: {0}")]
WriteBuffer(#[from] influxdb3_write::write_buffer::Error),

// ToStrError
#[error("to str error: {0}")]
ToStr(#[from] hyper::header::ToStrError),

// SerdeJsonError
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),

// Influxdb3 Write
#[error("serde json error: {0}")]
Influxdb3Write(#[from] influxdb3_write::Error),
}

impl Error {
Expand Down Expand Up @@ -200,13 +215,102 @@ where
.into_iter()
.map(|b| b.unwrap())
.collect();
let pretty_string = format!("{}", pretty::pretty_format_batches(&batches)?);

// Create a response with the pretty-printed string as the body.
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Body::from(pretty_string))?) // Handle this unwrap in production.
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let batches: Vec<&RecordBatch> = batches.iter().collect();
Ok(Bytes::from(serde_json::to_string(
&arrow_json::writer::record_batches_to_json_rows(batches.as_slice())?,
)?))
}

fn to_csv(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_csv::writer::Writer::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}

Ok(Bytes::from(writer.into_inner()))
}

fn to_pretty(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
)))
}

fn to_parquet(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer =
TrackedMemoryArrowWriter::try_new(&mut bytes, batches[0].schema(), mem_pool)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(Bytes::from(bytes))
}

enum Format {
Parquet,
Csv,
Pretty,
Json,
Error,
}

let (body, format) = match params.format {
None => match req
.headers()
.get(ACCEPT)
.map(HeaderValue::to_str)
.transpose()?
{
// Accept Headers use the MIME types maintained by IANA here:
// https://www.iana.org/assignments/media-types/media-types.xhtml
// Note parquet hasn't been accepted yet just Arrow, but there
// is the possibility it will be:
// https://issues.apache.org/jira/browse/PARQUET-1889
Some("application/vnd.apache.parquet") => {
(to_parquet(batches)?, Format::Parquet)
}
Some("text/csv") => (to_csv(batches)?, Format::Csv),
Some("text/plain") => (to_pretty(batches)?, Format::Pretty),
Some("application/json") => (to_json(batches)?, Format::Json),
Some(_) => (Bytes::from("{ \"error\": \"Available mime types are: application/vnd.apache.parquet, text/csv, text/plain, and application/json\" }"), Format::Error),
None => (to_json(batches)?, Format::Json),
},
Some(format) => match format.as_str() {
"parquet" => (to_parquet(batches)?, Format::Parquet),
"csv" => (to_csv(batches)?, Format::Csv),
"pretty" => (to_pretty(batches)?, Format::Pretty),
"json" => (to_json(batches)?, Format::Json),
_ => (Bytes::from("{ \"error\": \"Available formats are: parquet, csv, pretty, and json\" }"), Format::Error),
},
};

match format {
Format::Parquet => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/vnd.apache.parquet")
.body(Body::from(body))?),
Format::Csv => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/csv")
.body(Body::from(body))?),
Format::Pretty => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Body::from(body))?),
Format::Json => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
Format::Error => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(Body::from(body))?),
}
}

fn health(&self) -> Result<Response<Body>> {
Expand Down Expand Up @@ -284,6 +388,7 @@ where
pub(crate) struct QuerySqlParams {
pub(crate) db: String,
pub(crate) q: String,
pub(crate) format: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down
5 changes: 4 additions & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub enum Error {

#[error("datafusion error: {0}")]
DataFusion(#[from] datafusion::error::DataFusionError),

#[error("influxdb3_write error: {0}")]
InfluxDB3Write(#[from] influxdb3_write::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -281,7 +284,7 @@ mod tests {
// query escaped for uri
let query = urlencoding::encode(&query.into());
let url = format!(
"{}/api/v3/query_sql?db={}&q={}",
"{}/api/v3/query_sql?db={}&q={}&format=pretty",
server.into(),
database.into(),
query
Expand Down

0 comments on commit 16c4475

Please sign in to comment.