Skip to content

Commit

Permalink
feat: implement in-memory distinct with JOINs (#4739)
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Mar 11, 2024
1 parent 715c8eb commit b1d3505
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,51 @@ mod distinct {

// Returns Users 1, 3, 4, 5 top
// 1 => ["3", "1", "2"]
// 4 => ["1"]
// 3 => []
// 4 => ["1"]
// 5 => ["2", "3"]
match_connector_result!(
insta::assert_snapshot!(run_query!(
&runner,
indoc!("{
findManyUser(distinct: [first_name, last_name])
{
findManyUser(
distinct: [first_name, last_name],
orderBy: { id: asc }
) {
id
posts(distinct: [title], orderBy: { id: asc }) {
title
}
}}"),
Postgres(_) => r###"{"data":{"findManyUser":[{"id":1,"posts":[{"title":"3"},{"title":"1"},{"title":"2"}]},{"id":4,"posts":[{"title":"1"}]},{"id":3,"posts":[]},{"id":5,"posts":[{"title":"2"},{"title":"3"}]}]}}"###,
_ => r###"{"data":{"findManyUser":[{"id":1,"posts":[{"title":"3"},{"title":"1"},{"title":"2"}]},{"id":3,"posts":[]},{"id":4,"posts":[{"title":"1"}]},{"id":5,"posts":[{"title":"2"},{"title":"3"}]}]}}"###
}}")
),
@r###"{"data":{"findManyUser":[{"id":1,"posts":[{"title":"3"},{"title":"1"},{"title":"2"}]},{"id":3,"posts":[]},{"id":4,"posts":[{"title":"1"}]},{"id":5,"posts":[{"title":"2"},{"title":"3"}]}]}}"###
);

Ok(())
}

#[connector_test]
async fn nested_distinct_order_by_field(runner: Runner) -> TestResult<()> {
nested_dataset(&runner).await?;

// Returns Users 1, 3, 4, 5 top
// 1 => ["1", "2", "3"]
// 4 => ["1"]
// 3 => []
// 5 => ["2", "3"]
insta::assert_snapshot!(run_query!(
&runner,
indoc!("{
findManyUser(
distinct: [first_name, last_name],
orderBy: [{ first_name: asc }, { last_name: asc }]
) {
id
posts(distinct: [title], orderBy: { title: asc }) {
title
}
}}")
),
@r###"{"data":{"findManyUser":[{"id":1,"posts":[{"title":"1"},{"title":"2"},{"title":"3"}]},{"id":4,"posts":[{"title":"1"}]},{"id":3,"posts":[]},{"id":5,"posts":[{"title":"2"},{"title":"3"}]}]}}"###
);

Ok(())
Expand Down Expand Up @@ -228,6 +258,25 @@ mod distinct {
Ok(())
}

/// Tests nested distinct with non-matching orderBy and selection that doesn't include the
/// distinct fields.
#[connector_test]
async fn nested_distinct_not_in_selection(runner: Runner) -> TestResult<()> {
nested_dataset(&runner).await?;

insta::assert_snapshot!(
run_query!(&runner, r#"{
findManyUser(orderBy: { id: asc }) {
id
posts(distinct: title, orderBy: { id: desc }) { id }
}
}"#),
@r###"{"data":{"findManyUser":[{"id":1,"posts":[{"id":5},{"id":4},{"id":1}]},{"id":2,"posts":[{"id":7},{"id":6}]},{"id":3,"posts":[]},{"id":4,"posts":[{"id":9}]},{"id":5,"posts":[{"id":12},{"id":11}]}]}}"###
);

Ok(())
}

/// Dataset:
/// User (id) => Posts (titles, id asc)
/// 1 => ["3", "1", "1", "2", "1"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#[cfg(feature = "relation_joins")]
pub mod coerce;
pub mod read;
pub(crate) mod update;
pub mod upsert;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#[cfg(feature = "relation_joins")]
mod coerce;
#[cfg(feature = "relation_joins")]
mod process;

use crate::{
column_metadata,
model_extensions::*,
Expand Down Expand Up @@ -36,7 +41,8 @@ async fn get_single_record_joins(
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<Option<SingleRecord>> {
use super::coerce::coerce_record_with_json_relation;
use coerce::coerce_record_with_json_relation;

let selected_fields = selected_fields.to_virtuals_last();
let field_names: Vec<_> = selected_fields.prisma_names_grouping_virtuals().collect();
let idents = selected_fields.type_identifiers_with_arities_grouping_virtuals();
Expand Down Expand Up @@ -139,7 +145,9 @@ async fn get_many_records_joins(
selected_fields: &FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<ManyRecords> {
use super::coerce::coerce_record_with_json_relation;
use coerce::coerce_record_with_json_relation;
use std::borrow::Cow;

let selected_fields = selected_fields.to_virtuals_last();
let field_names: Vec<_> = selected_fields.prisma_names_grouping_virtuals().collect();
let idents = selected_fields.type_identifiers_with_arities_grouping_virtuals();
Expand Down Expand Up @@ -177,9 +185,10 @@ async fn get_many_records_joins(
records.push(record)
}

// Reverses order when using negative take
if query_arguments.needs_reversed_order() {
records.reverse();
if query_arguments.needs_inmemory_processing_with_joins() {
records.records = process::InMemoryProcessorForJoins::new(&query_arguments, records.records)
.process(|record| Some((Cow::Borrowed(record), Cow::Borrowed(&records.field_names))))
.collect();
}

Ok(records)
Expand Down Expand Up @@ -423,8 +432,9 @@ fn get_selection_indexes<'a>(
relations: Vec<&'a RelationSelection>,
virtuals: Vec<&'a VirtualSelection>,
field_names: &'a [String],
) -> Vec<(usize, super::coerce::IndexedSelection<'a>)> {
use super::coerce::IndexedSelection;
) -> Vec<(usize, coerce::IndexedSelection<'a>)> {
use coerce::IndexedSelection;

field_names
.iter()
.enumerate()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use bigdecimal::{BigDecimal, FromPrimitive, ParseBigDecimalError};
use itertools::{Either, Itertools};
use itertools::Itertools;
use query_structure::*;
use std::{io, str::FromStr};
use std::{borrow::Cow, io, str::FromStr};

use crate::{query_arguments_ext::QueryArgumentsExt, SqlError};
use crate::SqlError;

use super::process::InMemoryProcessorForJoins;

pub(crate) enum IndexedSelection<'a> {
Relation(&'a RelationSelection),
Expand Down Expand Up @@ -69,11 +71,18 @@ fn coerce_json_relation_to_pv(value: serde_json::Value, rs: &RelationSelection)
}
});

// Reverses order when using negative take.
let iter = match rs.args.needs_reversed_order() {
true => Either::Left(iter.rev()),
false => Either::Right(iter),
};
let iter = InMemoryProcessorForJoins::new(&rs.args, iter).process(|maybe_value| {
maybe_value.as_ref().ok().map(|value| {
let object = value
.clone()
.into_object()
.expect("Expected coerced_json_relation_to_pv to return list of objects");

let (field_names, values) = object.into_iter().unzip();

(Cow::Owned(Record::new(values)), Cow::Owned(field_names))
})
});

Ok(PrismaValue::List(iter.collect::<crate::Result<Vec<_>>>()?))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::borrow::Cow;

use itertools::{Either, Itertools};
use query_structure::{QueryArguments, Record};

use crate::query_arguments_ext::QueryArgumentsExt;

macro_rules! processor_state {
($name:ident $(-> $transition:ident($bound:ident))?) => {
struct $name<T>(T);

impl<T, U> Iterator for $name<T>
where
T: Iterator<Item = U>,
{
type Item = U;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<T, U> DoubleEndedIterator for $name<T>
where
T: DoubleEndedIterator<Item = U>,
{
fn next_back(&mut self) -> Option<Self::Item> {
self.0.next_back()
}
}

$(
impl<T, U> $transition<T> for $name<U> where U: $bound<Item = T> {}
)?
};
}

processor_state!(Initial -> ApplyReverseOrder(DoubleEndedIterator));
processor_state!(WithReverseOrder -> ApplyDistinct(Iterator));
processor_state!(WithDistinct -> ApplyPagination(Iterator));
processor_state!(WithPagination);

trait ApplyReverseOrder<T>: DoubleEndedIterator<Item = T>
where
Self: Sized,
{
fn apply_reverse_order(self, args: &QueryArguments) -> WithReverseOrder<impl DoubleEndedIterator<Item = T>> {
WithReverseOrder(match args.needs_reversed_order() {
true => Either::Left(self.rev()),
false => Either::Right(self),
})
}
}

trait ApplyDistinct<T>: Iterator<Item = T>
where
Self: Sized,
{
fn apply_distinct<'a>(
self,
args: &'a QueryArguments,
mut get_record_and_fields: impl for<'b> FnMut(&'b Self::Item) -> Option<(Cow<'b, Record>, Cow<'a, [String]>)> + 'a,
) -> WithDistinct<impl Iterator<Item = T>> {
WithDistinct(match args.distinct.as_ref() {
Some(distinct) if args.requires_inmemory_distinct_with_joins() => {
Either::Left(self.unique_by(move |value| {
get_record_and_fields(value).map(|(record, field_names)| {
record
.extract_selection_result_from_prisma_name(&field_names, distinct)
.unwrap()
})
}))
}
_ => Either::Right(self),
})
}
}

trait ApplyPagination<T>: Iterator<Item = T>
where
Self: Sized,
{
fn apply_pagination(self, args: &QueryArguments) -> WithPagination<impl Iterator<Item = T>> {
let iter = match args.skip {
Some(skip) if args.requires_inmemory_pagination_with_joins() => Either::Left(self.skip(skip as usize)),
_ => Either::Right(self),
};

let iter = match args.take_abs() {
Some(take) if args.requires_inmemory_pagination_with_joins() => Either::Left(iter.take(take as usize)),
_ => Either::Right(iter),
};

WithPagination(iter)
}
}

pub struct InMemoryProcessorForJoins<'a, I> {
args: &'a QueryArguments,
records: I,
}

impl<'a, T, I> InMemoryProcessorForJoins<'a, I>
where
T: 'a,
I: DoubleEndedIterator<Item = T> + 'a,
{
pub fn new(args: &'a QueryArguments, records: impl IntoIterator<IntoIter = I>) -> Self {
Self {
args,
records: records.into_iter(),
}
}

pub fn process(
self,
get_record_and_fields: impl for<'b> FnMut(&'b T) -> Option<(Cow<'b, Record>, Cow<'a, [String]>)> + 'a,
) -> impl Iterator<Item = T> + 'a {
Initial(self.records)
.apply_reverse_order(self.args)
.apply_distinct(self.args, get_record_and_fields)
.apply_pagination(self.args)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@ use query_structure::QueryArguments;
pub(crate) trait QueryArgumentsExt {
/// If we need to take rows before a cursor position, then we need to reverse the order in SQL.
fn needs_reversed_order(&self) -> bool;

/// Checks whether any form of memory processing is needed, or we could just return the records
/// as they are. This is useful to avoid turning an existing collection of records into an
/// iterator and re-collecting it back with no changes.
fn needs_inmemory_processing_with_joins(&self) -> bool;
}

impl QueryArgumentsExt for QueryArguments {
fn needs_reversed_order(&self) -> bool {
self.take.map(|t| t < 0).unwrap_or(false)
}

fn needs_inmemory_processing_with_joins(&self) -> bool {
self.needs_reversed_order()
|| self.requires_inmemory_distinct_with_joins()
|| self.requires_inmemory_pagination_with_joins()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ impl JoinSelectBuilder for LateralJoinSelectBuilder {
parent_alias: Alias,
ctx: &Context<'_>,
) -> Expression<'static> {
let build_obj_params = rs
.selections
.iter()
let build_obj_params = json_obj_selections(rs)
.filter_map(|field| match field {
SelectedField::Scalar(sf) => Some((
Cow::from(sf.name().to_owned()),
Expand Down Expand Up @@ -245,7 +243,7 @@ impl LateralJoinSelectBuilder {
.with_filters(rs.args.filter.clone(), Some(m2m_join_alias), ctx) // adds query filters
.with_distinct(&rs.args, m2m_join_alias)
.with_ordering(&rs.args, Some(m2m_join_alias.to_table_string()), ctx) // adds ordering stmts
.with_pagination(rs.args.take_abs(), rs.args.skip)
.with_pagination(&rs.args, None)
.comment("inner"); // adds pagination

let mut outer = Select::from_table(Table::from(inner).alias(outer_alias.to_table_string()))
Expand Down
Loading

0 comments on commit b1d3505

Please sign in to comment.