Skip to content

Commit

Permalink
fix issue #153 for zdb-rust
Browse files Browse the repository at this point in the history
This gets all the various aggregate functions capable of finding the field in the proper index and then retargeting the query (and the index to be queried) to be that field's index
  • Loading branch information
eeeebbbbrrrr committed Oct 30, 2020
1 parent 5786e89 commit fdb939a
Show file tree
Hide file tree
Showing 41 changed files with 254 additions and 132 deletions.
2 changes: 1 addition & 1 deletion src/access_method/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub extern "C" fn amrescan(
let elasticsearch = Elasticsearch::new(&indexrel);

let response = elasticsearch
.open_search(query.prepare(&indexrel))
.open_search(query.prepare(&indexrel, None).0)
.execute()
.expect("failed to execute ES query");

Expand Down
22 changes: 17 additions & 5 deletions src/access_method/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ fn remove_aborted_xids(
}
}
})
.prepare(index),
.prepare(index, None)
.0,
)
.execute()
.expect("failed to count xmin values");
Expand All @@ -161,7 +162,8 @@ fn remove_aborted_xids(
}
}
})
.prepare(index),
.prepare(index, None)
.0,
)
.execute()
.expect("failed to count xmax values");
Expand Down Expand Up @@ -191,7 +193,9 @@ fn vacuum_xmax(
let mut cnt = 0;
let vacuum_xmax_docs = elasticsearch
.open_search(
vac_by_aborted_xmax(&es_index_name, xid_to_64bit(oldest_xmin) as i64).prepare(index),
vac_by_aborted_xmax(&es_index_name, xid_to_64bit(oldest_xmin) as i64)
.prepare(index, None)
.0,
)
.execute_with_fields(vec!["zdb_xmax"])
.expect("failed to search by xmax");
Expand Down Expand Up @@ -226,7 +230,11 @@ fn delete_by_xmax(
) -> usize {
let mut cnt = 0;
let delete_by_xmax_docs = elasticsearch
.open_search(vac_by_xmax(&es_index_name, xid_to_64bit(oldest_xmin) as i64).prepare(index))
.open_search(
vac_by_xmax(&es_index_name, xid_to_64bit(oldest_xmin) as i64)
.prepare(index, None)
.0,
)
.execute_with_fields(vec!["zdb_xmax"])
.expect("failed to search by xmax");
for (_, ctid, fields, _) in delete_by_xmax_docs.into_iter() {
Expand Down Expand Up @@ -260,7 +268,11 @@ fn delete_by_xmin(
) -> usize {
let mut cnt = 0;
let delete_by_xmin_docs = elasticsearch
.open_search(vac_by_xmin(&es_index_name, xid_to_64bit(oldest_xmin) as i64).prepare(index))
.open_search(
vac_by_xmin(&es_index_name, xid_to_64bit(oldest_xmin) as i64)
.prepare(index, None)
.0,
)
.execute_with_fields(vec!["zdb_xmin"])
.expect("failed to search by xmin");
for (_, ctid, fields, _) in delete_by_xmin_docs.into_iter() {
Expand Down
2 changes: 1 addition & 1 deletion src/elasticsearch/aggregates/adjacency_matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn adjacency_matrix(

filters_map.insert(
label,
apply_visibility_clause(&elasticsearch, filter.prepare(&index), false),
apply_visibility_clause(&elasticsearch, filter.prepare(&index, None).0, false),
);
}

Expand Down
8 changes: 6 additions & 2 deletions src/elasticsearch/aggregates/arbitrary_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use pgx::*;
fn arbitrary_agg(index: PgRelation, query: ZDBQuery, json: Json) -> Json {
let elasticsearch = Elasticsearch::new(&index);

let request =
elasticsearch.aggregate::<serde_json::Value>(None, false, query.prepare(&index), json.0);
let request = elasticsearch.aggregate::<serde_json::Value>(
None,
false,
query.prepare(&index, None).0,
json.0,
);

Json(
request
Expand Down
4 changes: 2 additions & 2 deletions src/elasticsearch/aggregates/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::convert::TryInto;
fn count(index: PgRelation, query: ZDBQuery) -> i64 {
let es = Elasticsearch::new(&index);

es.count(query.prepare(&index))
es.count(query.prepare(&index, None).0)
.execute()
.expect("failed to execute count query")
.try_into()
Expand All @@ -18,7 +18,7 @@ fn count(index: PgRelation, query: ZDBQuery) -> i64 {
fn raw_count(index: PgRelation, query: ZDBQuery) -> i64 {
let es = Elasticsearch::new(&index);

es.raw_count(query.prepare(&index))
es.raw_count(query.prepare(&index, None).0)
.execute()
.expect("failed to execute raw count query")
.try_into()
Expand Down
7 changes: 4 additions & 3 deletions src/elasticsearch/aggregates/date_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ fn date_histogram(
buckets: Vec<BucketEntry>,
}

let elasticsearch = Elasticsearch::new(&index);

let date_histogram = DateHistogram {
field,
calendar_interval,
Expand All @@ -75,10 +73,13 @@ fn date_histogram(
error!("Both calendar interval and fixed interval have something. Should be mutually exclusive")
};

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<DateHistogramAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"date_histogram":
Expand Down
10 changes: 5 additions & 5 deletions src/elasticsearch/aggregates/date_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::*;
#[pg_extern(immutable, parallel_safe)]
fn date_range(
index: PgRelation,
field_name: &str,
field: &str,
query: ZDBQuery,
date_range_array: Json,
) -> impl std::iter::Iterator<
Expand Down Expand Up @@ -36,16 +36,16 @@ fn date_range(
doc_count: i64,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<DateRangesAggData>(
Some(field_name.into()),
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"date_range": {
"field": field_name,
"field": field,
"ranges": date_range_array
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/elasticsearch/aggregates/extended_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ fn extended_stats(
lower: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<ExtendedStatsAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"extended_stats": {
Expand Down
2 changes: 1 addition & 1 deletion src/elasticsearch/aggregates/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn filters(

filters_map.insert(
label,
apply_visibility_clause(&elasticsearch, filter.prepare(&index), false),
apply_visibility_clause(&elasticsearch, filter.prepare(&index, None).0, false),
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/elasticsearch/aggregates/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ fn histogram(
buckets: Vec<BucketEntry>,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<HistogramAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"histogram": {
Expand Down
10 changes: 5 additions & 5 deletions src/elasticsearch/aggregates/ip_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::*;
#[pg_extern(immutable, parallel_safe)]
fn ip_range(
index: PgRelation,
field_name: &str,
field: &str,
query: ZDBQuery,
range_array: Json,
) -> impl std::iter::Iterator<
Expand All @@ -32,16 +32,16 @@ fn ip_range(
doc_count: i64,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<IPRangesAggData>(
Some(field_name.into()),
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"ip_range": {
"field": field_name,
"field": field,
"ranges": range_array
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/elasticsearch/aggregates/matrix_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn matrix_stats(
let request = elasticsearch.aggregate::<MatrixStatsAggData>(
None,
false,
query.prepare(&index),
query.prepare(&index, None).0,
json! {
{
"matrix_stats": {
Expand Down
28 changes: 14 additions & 14 deletions src/elasticsearch/aggregates/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ fn sum(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<SumAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"sum": {
Expand All @@ -40,12 +40,12 @@ fn avg(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<AvgAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"avg": {
Expand All @@ -69,12 +69,12 @@ fn cardinality(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<CardinalityAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"cardinality": {
Expand All @@ -98,12 +98,12 @@ fn max(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<MaxAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"max": {
Expand All @@ -127,12 +127,12 @@ fn min(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<MinAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"min": {
Expand All @@ -156,12 +156,12 @@ fn missing(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
doc_count: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<MissingAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"missing": {
Expand All @@ -185,12 +185,12 @@ fn value_count(index: PgRelation, field: &str, query: ZDBQuery) -> Numeric {
value: Numeric,
}

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);

let request = elasticsearch.aggregate::<ValueCountAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"value_count": {
Expand Down
6 changes: 3 additions & 3 deletions src/elasticsearch/aggregates/percentile_ranks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ fn percentile_ranks(
keyed: bool,
}

let elasticsearch = Elasticsearch::new(&index);

let percentile_ranks = PercentileRanks {
field,
values,
keyed: false,
};

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);
let request = elasticsearch.aggregate::<PercentileRankAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"percentile_ranks": percentile_ranks,
Expand Down
6 changes: 3 additions & 3 deletions src/elasticsearch/aggregates/percentiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ fn percentiles(
keyed: bool,
}

let elasticsearch = Elasticsearch::new(&index);

let percentiles = Percentiles {
field,
percents,
keyed: false,
};

let (prepared_query, index) = query.prepare(&index, Some(field.into()));
let elasticsearch = Elasticsearch::new(&index);
let request = elasticsearch.aggregate::<PercentilesAggData>(
Some(field.into()),
true,
query.prepare(&index),
prepared_query,
json! {
{
"percentiles": percentiles,
Expand Down
2 changes: 1 addition & 1 deletion src/elasticsearch/aggregates/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pgx::*;
fn query(index: PgRelation, query: ZDBQuery) -> impl Iterator<Item = pg_sys::ItemPointerData> {
let es = Elasticsearch::new(&index);
let result = es
.open_search(query.prepare(&index))
.open_search(query.prepare(&index, None).0)
.execute()
.expect("failed to execute search");

Expand Down

0 comments on commit fdb939a

Please sign in to comment.