Skip to content

Commit

Permalink
ESQL: Page shouldn't close a block twice (elastic#100370)
Browse files Browse the repository at this point in the history
Page now takes into account that a block can be used in multiple
 positions (such as the same column aliased under multiple names).
Introduce newPageAndRelease method that handles clean-up of blocks that
 are not-used when creating a new page

Relates elastic#100001
Fix elastic#100365
Fix elastic#100356
  • Loading branch information
costin committed Oct 6, 2023
1 parent 8d654d0 commit 44068cb
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 51 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/100370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 100370
summary: "ESQL: Page shouldn't close a block twice"
area: ES|QL
type: bug
issues:
- 100356
- 100365
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;

/**
Expand Down Expand Up @@ -88,9 +90,7 @@ private Page(Page prev, Block[] toAdd) {
this.positionCount = prev.positionCount;

this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length);
for (int i = 0; i < toAdd.length; i++) {
this.blocks[prev.blocks.length + i] = toAdd[i];
}
System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length);
}

public Page(StreamInput in) throws IOException {
Expand Down Expand Up @@ -177,8 +177,8 @@ public Page appendPage(Page toAdd) {
@Override
public int hashCode() {
int result = Objects.hash(positionCount);
for (int i = 0; i < blocks.length; i++) {
result = 31 * result + Objects.hashCode(blocks[i]);
for (Block block : blocks) {
result = 31 * result + Objects.hashCode(block);
}
return result;
}
Expand Down Expand Up @@ -229,7 +229,48 @@ public void writeTo(StreamOutput out) throws IOException {
* Release all blocks in this page, decrementing any breakers accounting for these blocks.
*/
public void releaseBlocks() {
if (blocksReleased) {
return;
}

blocksReleased = true;
Releasables.closeExpectNoException(blocks);

// blocks can be used as multiple columns
var map = new IdentityHashMap<Block, Boolean>(mapSize(blocks.length));
for (Block b : blocks) {
if (map.putIfAbsent(b, Boolean.TRUE) == null) {
Releasables.closeExpectNoException(b);
}
}
}

/**
* Returns a Page from the given blocks and closes all blocks that are not included, from the current Page.
* That is, allows clean-up of the current page _after_ external manipulation of the blocks.
* The current page should no longer be used and be considered closed.
*/
public Page newPageAndRelease(Block... keep) {
if (blocksReleased) {
throw new IllegalStateException("can't create new page from already released page");
}

blocksReleased = true;

var newPage = new Page(positionCount, keep);
var set = Collections.newSetFromMap(new IdentityHashMap<Block, Boolean>(mapSize(keep.length)));
set.addAll(Arrays.asList(keep));

// close blocks that have been left out
for (Block b : blocks) {
if (set.contains(b) == false) {
Releasables.closeExpectNoException(b);
}
}

return newPage;
}

static int mapSize(int expectedSize) {
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -70,30 +68,7 @@ protected Page process(Page page) {
var block = page.getBlock(source);
blocks[b++] = block;
}
closeUnused(page, blocks);
return new Page(page.getPositionCount(), blocks);
}

/**
* Close all {@link Block}s that are in {@code page} but are not in {@code blocks}.
*/
public static void closeUnused(Page page, Block[] blocks) {
List<Releasable> blocksToRelease = new ArrayList<>();

for (int i = 0; i < page.getBlockCount(); i++) {
boolean used = false;
var current = page.getBlock(i);
for (int j = 0; j < blocks.length; j++) {
if (current == blocks[j]) {
used = true;
break;
}
}
if (used == false) {
blocksToRelease.add(current);
}
}
Releasables.close(blocksToRelease);
return page.newPageAndRelease(blocks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ public void testSerializationListPages() throws IOException {
}
}

public void testPageMultiRelease() {
int positions = randomInt(1024);
var block = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
Page page = new Page(block);
page.releaseBlocks();
assertThat(block.isReleased(), is(true));
page.releaseBlocks();
}

public void testNewPageAndRelease() {
int positions = randomInt(1024);
var blockA = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
var blockB = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
Page page = new Page(blockA, blockB);
Page newPage = page.newPageAndRelease(blockA);
assertThat(blockA.isReleased(), is(false));
assertThat(blockB.isReleased(), is(true));
}

BytesRefArray bytesRefArrayOf(String... values) {
var array = new BytesRefArray(values.length, bigArrays);
Arrays.stream(values).map(BytesRef::new).forEach(array::append);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.breaker.CircuitBreakerService;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.LongStream;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -59,15 +56,12 @@ public void testProjection() {
var out = projection.getOutput();
assertThat(randomProjection.size(), lessThanOrEqualTo(out.getBlockCount()));

Set<Block> blks = new HashSet<>();
for (int i = 0; i < out.getBlockCount(); i++) {
var block = out.<IntBlock>getBlock(i);
assertEquals(block, page.getBlock(randomProjection.get(i)));
blks.add(block);
assertEquals(blocks[randomProjection.get(i)], block);
}

// close all blocks separately since the same block can be used by multiple columns (aliased)
Releasables.closeWhileHandlingException(blks.toArray(new Block[0]));
out.releaseBlocks();
}

private List<Integer> randomProjection(int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ x:integer | z:integer
4 | 8
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameProjectEval-Ignore
renameProjectEval
from employees | sort emp_no | eval y = languages | rename languages as x | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand All @@ -94,8 +93,7 @@ x:integer | y:integer | x2:integer | y2:integer
4 | 4 | 5 | 6
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
duplicateProjectEval-Ignore
duplicateProjectEval
from employees | eval y = languages, x = languages | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand Down Expand Up @@ -160,8 +158,7 @@ y:integer | x:date
10061 | 1985-09-17T00:00:00.000Z
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameIntertwinedWithSort-Ignore
renameIntertwinedWithSort
FROM employees | eval x = salary | rename x as y | rename y as x | sort x | rename x as y | limit 10;

avg_worked_seconds:l | birth_date:date | emp_no:i | first_name:s | gender:s | height:d | height.float:d | height.half_float:d | height.scaled_float:d| hire_date:date | is_rehired:bool | job_positions:s | languages:i | languages.byte:i | languages.long:l | languages.short:i | last_name:s | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:s | salary_change.long:l | still_hired:bool | y:i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.Build;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100365")
public class EsqlActionIT extends AbstractEsqlIntegTestCase {

long epoch = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
Expand Down Expand Up @@ -334,8 +333,7 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
for (int i = 0; i < blocks.length; i++) {
blocks[i] = p.getBlock(mappedPosition[i]);
}
ProjectOperator.closeUnused(p, blocks);
return new Page(blocks);
return p.newPageAndRelease(blocks);
} : Function.identity();

return transformer;
Expand Down

0 comments on commit 44068cb

Please sign in to comment.