Skip to content

Commit

Permalink
fetch results for Trino more efficiently
Browse files Browse the repository at this point in the history
  • Loading branch information
domnikl committed Apr 19, 2024
1 parent 6979c65 commit 52da49a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
39 changes: 33 additions & 6 deletions connectorx/src/sources/trino/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ where
fn fetch_metadata(&mut self) {
assert!(!self.queries.is_empty());

// TODO: prevent from running the same query multiple times (limit1 + no limit)
let first_query = &self.queries[0];
let cxq = limit1_query(first_query, &GenericDialect {})?;

Expand Down Expand Up @@ -238,6 +237,9 @@ impl SourcePartition for TrinoSourcePartition {
}

pub struct TrinoSourcePartitionParser<'a> {
rt: Arc<Runtime>,
client: Arc<Client>,
next_uri: Option<String>,
rows: Vec<Row>,
ncols: usize,
current_col: usize,
Expand All @@ -253,11 +255,19 @@ impl<'a> TrinoSourcePartitionParser<'a> {
query: CXQuery,
schema: &[TrinoTypeSystem],
) -> Self {
let rows = client.get_all::<Row>(query.to_string());
let data = rt.block_on(rows).map_err(TrinoSourceError::PrustoError)?;
let rows = data.clone().into_vec();
let results = rt
.block_on(client.get::<Row>(query.to_string()))
.map_err(TrinoSourceError::PrustoError)?;

let rows = match results.data_set {
Some(x) => x.into_vec(),
_ => vec![],
};

Self {
rt,
client,
next_uri: results.next_uri,
rows,
ncols: schema.len(),
current_row: 0,
Expand All @@ -283,8 +293,25 @@ impl<'a> PartitionParser<'a> for TrinoSourcePartitionParser<'a> {
fn fetch_next(&mut self) -> (usize, bool) {
assert!(self.current_col == 0);

// results are always fetched in a single batch for Prusto
(self.rows.len(), true)
match self.next_uri.clone() {
Some(uri) => {
let results = self
.rt
.block_on(self.client.get_next::<Row>(&uri))
.map_err(TrinoSourceError::PrustoError)?;

self.rows = match results.data_set {
Some(x) => x.into_vec(),
_ => vec![],
};

self.current_row = 0;
self.next_uri = results.next_uri;

(self.rows.len(), false)
}
None => return (self.rows.len(), true),
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions connectorx/src/sources/trino/typesystem.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::errors::TrinoSourceError;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use fehler::{throw, throws};
use prusto::{Presto, PrestoFloat, PrestoInt, PrestoTy};
use prusto::{PrestoFloat, PrestoInt, PrestoTy};
use std::convert::TryFrom;

// TODO: implement Tuple, Row, Array and Map as well as UUID
// TODO: implement Tuple, Row, Array and Map
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum TrinoTypeSystem {
Date(bool),
Expand Down

0 comments on commit 52da49a

Please sign in to comment.