Skip to content

Commit

Permalink
Add builder for inmemproc
Browse files Browse the repository at this point in the history
Add capability for db distinct in:
- read_many
- m2m
- one2m
  • Loading branch information
Druue committed Nov 10, 2023
1 parent 9f5a5a3 commit 0de5bc2
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 32 deletions.
10 changes: 6 additions & 4 deletions query-engine/connectors/query-connector/src/query_arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ impl QueryArguments {
/// retrieved by the connector or if it requires the query engine to fetch a raw set
/// of records and perform certain operations itself, in-memory.
pub fn requires_inmemory_processing(&self) -> bool {
let has_distinct = self.distinct.is_some()
self.contains_unstable_cursor() || self.contains_null_cursor()
}

pub fn requires_inmemory_distinct(&self) -> bool {
self.distinct.is_some()
&& !self
.model()
.dm
.schema
.connector
.has_capability(ConnectorCapability::Distinct);

has_distinct || self.contains_unstable_cursor() || self.contains_null_cursor()
.has_capability(ConnectorCapability::Distinct)
}

/// An unstable cursor is a cursor that is used in conjunction with an unstable (non-unique) combination of orderBys.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use connector::QueryArguments;
use connector::{Filter, QueryArguments};
use itertools::Itertools;
use prisma_models::{FieldSelection, ManyRecords, Record, SelectionResult};
use prisma_models::{FieldSelection, ManyRecords, Model, OrderBy, Record, SelectionResult};
use std::ops::Deref;

#[derive(Debug)]
Expand Down Expand Up @@ -80,7 +80,7 @@ impl InMemoryRecordProcessor {
records.records.first().map(|x| x.parent_id.is_some()).unwrap_or(false)
}

fn apply_distinct(&self, mut records: ManyRecords) -> ManyRecords {
pub(crate) fn apply_distinct(&self, mut records: ManyRecords) -> ManyRecords {
let field_names = &records.field_names;

let distinct_selection = if let Some(ref distinct) = self.distinct {
Expand Down Expand Up @@ -189,3 +189,53 @@ impl InMemoryRecordProcessor {
self.take.or(self.skip).is_some() || self.cursor.is_some()
}
}

pub struct InMemoryRecordProcessorBuilder {
args: QueryArguments,
}

impl InMemoryRecordProcessorBuilder {
pub fn new(model: Model, order_by: Vec<OrderBy>, ignore_skip: bool, ignore_take: bool) -> Self {
Self {
args: QueryArguments {
model,
cursor: None,
take: None,
skip: None,
filter: None,
order_by,
distinct: None,
ignore_skip,
ignore_take,
},
}
}
pub fn _cursor(mut self, cursor: SelectionResult) -> Self {
self.args.cursor = Some(cursor);
self
}

pub fn _take(mut self, take: i64) -> Self {
self.args.take = Some(take);
self
}

pub fn _skip(mut self, skip: i64) -> Self {
self.args.skip = Some(skip);
self
}

pub fn _filter(mut self, filter: Filter) -> Self {
self.args.filter = Some(filter);
self
}

pub fn distinct(mut self, distinct: FieldSelection) -> Self {
self.args.distinct = Some(distinct);
self
}

pub fn build(self) -> InMemoryRecordProcessor {
InMemoryRecordProcessor { args: self.args }
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{inmemory_record_processor::InMemoryRecordProcessor, read};
use super::{inmemory_record_processor::InMemoryRecordProcessorBuilder, read};
use crate::{interpreter::InterpretationResult, query_ast::*};
use connector::{
self, filter::Filter, ConditionListValue, ConnectionLike, QueryArguments, RelAggregationRow,
Expand All @@ -13,7 +13,25 @@ pub(crate) async fn m2m(
parent_result: Option<&ManyRecords>,
trace_id: Option<String>,
) -> InterpretationResult<(ManyRecords, Option<Vec<RelAggregationRow>>)> {
let processor = InMemoryRecordProcessor::new_from_query_args(&mut query.args);
let inm_builder = InMemoryRecordProcessorBuilder::new(
query.args.model.clone(),
query.args.order_by.clone(),
query.args.ignore_skip,
query.args.ignore_take,
);

let processor = match query.args.distinct {
Some(ref fs) => {
if query.args.requires_inmemory_distinct() {
inm_builder.distinct(fs.clone())
} else {
inm_builder
}
}
None => inm_builder,
}
.build();

let parent_field = &query.parent_field;
let child_link_id = parent_field.related_field().linking_fields();

Expand Down Expand Up @@ -138,7 +156,7 @@ pub async fn one2m(
parent_field: &RelationFieldRef,
parent_selections: Option<Vec<SelectionResult>>,
parent_result: Option<&ManyRecords>,
mut query_args: QueryArguments,
query_args: QueryArguments,
selected_fields: &FieldSelection,
aggr_selections: Vec<RelAggregationSelection>,
trace_id: Option<String>,
Expand Down Expand Up @@ -192,10 +210,31 @@ pub async fn one2m(
// If we're fetching related records from a single parent, then we can apply normal pagination instead of in-memory processing.
// However, we can't just apply a LIMIT/OFFSET for multiple parents as we need N related records PER parent.
// We could use ROW_NUMBER() but it requires further refactoring so we're still using in-memory processing for now.

let req_inmem_distinct = query_args.requires_inmemory_distinct();

let processor = if uniq_selections.len() == 1 && !query_args.requires_inmemory_processing() {
None
} else {
Some(InMemoryRecordProcessor::new_from_query_args(&mut query_args))
let inm_builder = InMemoryRecordProcessorBuilder::new(
query_args.model.clone(),
query_args.order_by.clone(),
query_args.ignore_skip,
query_args.ignore_take,
);

let inm_builder = match query_args.distinct {
Some(ref fs) => {
if req_inmem_distinct {
inm_builder.distinct(fs.clone())
} else {
inm_builder
}
}
None => inm_builder,
};

Some(inm_builder.build())
};

let mut scalars = {
Expand Down Expand Up @@ -265,6 +304,12 @@ pub async fn one2m(
}

let scalars = if let Some(processor) = processor {
let scalars = if req_inmem_distinct {
processor.apply_distinct(scalars)
} else {
scalars
};

processor.apply(scalars)
} else {
scalars
Expand Down
52 changes: 31 additions & 21 deletions query-engine/core/src/interpreter/query_interpreters/read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{inmemory_record_processor::InMemoryRecordProcessor, *};
use super::{inmemory_record_processor::InMemoryRecordProcessorBuilder, *};
use crate::{interpreter::InterpretationResult, query_ast::*, result_ast::*};
use connector::{self, error::ConnectorError, ConnectionLike, RelAggregationRow, RelAggregationSelection};
use futures::future::{BoxFuture, FutureExt};
Expand Down Expand Up @@ -88,27 +88,31 @@ fn read_many(
query: ManyRecordsQuery,
trace_id: Option<String>,
) -> BoxFuture<'_, InterpretationResult<QueryResult>> {
// let req_inmem_proc = query.args.requires_inmemory_processing();

// let processor = if req_inmem_proc {
// let inm_builder = InMemoryRecordProcessorBuilder::new(
// query.args.model.clone(),
// query.args.order_by.clone(),
// query.args.ignore_skip.clone(),
// query.args.ignore_take.clone(),
// );

// let inmemory_record_processor = match query.args.distinct {
// Some(ref fs) => inm_builder.distinct(fs.clone()).build(),
// None => inm_builder.build(),
// };

// Some(inmemory_record_processor)
// } else {
// None
// };
let req_inmem_distinct = query.args.requires_inmemory_distinct();

let processor = if query.args.requires_inmemory_processing() {
let inm_builder = InMemoryRecordProcessorBuilder::new(
query.args.model.clone(),
query.args.order_by.clone(),
query.args.ignore_skip,
query.args.ignore_take,
);

let inm_builder = match query.args.distinct {
Some(ref fs) => {
if req_inmem_distinct {
inm_builder.distinct(fs.clone())
} else {
inm_builder
}
}
None => inm_builder,
};

let processor: Option<InMemoryRecordProcessor> = None;
Some(inm_builder.build())
} else {
None
};

let fut = async move {
let scalars = tx
Expand All @@ -122,6 +126,12 @@ fn read_many(
.await?;

let scalars = if let Some(p) = processor {
let scalars = if req_inmem_distinct {
p.apply_distinct(scalars)
} else {
scalars
};

p.apply(scalars)
} else {
scalars
Expand Down

0 comments on commit 0de5bc2

Please sign in to comment.