Skip to content

Commit

Permalink
Merge branch 'main' into feat/17178-disable-default-validators-api
Browse files Browse the repository at this point in the history
  • Loading branch information
tepi committed Jan 30, 2024
2 parents b964c86 + ec47e68 commit d0f483f
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,10 @@ public T getItem(int index) {
* because the backend can have the item on that index (we simply
* not yet fetched this item during the scrolling).
*/
return (T) getDataProvider().fetch(buildQuery(index, 1)).findFirst()
.orElse(null);
try (Stream<T> stream = getDataProvider()
.fetch(buildQuery(index, 1))) {
return stream.findFirst().orElse(null);
}
}
}

Expand Down Expand Up @@ -1010,18 +1012,20 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
int page = 0;
do {
final int newOffset = offset + page * pageSize;
Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize);
// Stream.Builder is not thread safe, so for parallel stream
// we need to first collect items before adding them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
try (Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize)) {
// Stream.Builder is not thread safe, so for parallel
// stream we need to first collect items before adding
// them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
}
}
page++;
} while (page < pages
Expand All @@ -1040,8 +1044,10 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
stream = stream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
try (Stream<T> parallelStream = stream) {
stream = parallelStream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
}
}

SizeVerifier verifier = new SizeVerifier<>(limit);
Expand Down Expand Up @@ -1476,17 +1482,20 @@ private Activation activate(Range range) {

// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());
fetchFromProvider(range.getStart(), range.length()).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchFromProvider(range.getStart(),
range.length())) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
boolean needsSizeRecheck = activeKeys.size() < range.length();
return new Activation(activeKeys, needsSizeRecheck);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ public interface DataView<T> extends Serializable {
/**
* Get the full data available to the component. Data is filtered and sorted
* the same way as in the component.
* <p>
* Consumers of the returned stream are responsible for closing it when all
* the stream operations are done to ensure that any resources feeding the
* stream are properly released. Failure to close the stream might lead to
* resource leaks.
* <p>
* It is strongly recommended to use a try-with-resources block to
* automatically close the stream after its terminal operation has been
* executed. Below is an example of how to properly use and close the
* stream:
*
* <pre>{@code
* try (Stream<T> stream = dataView.getItems()) {
* stream.forEach(System.out::println); // Example terminal operation
* }
* }</pre>
*
* @return filtered and sorted data set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ private List<String> activate(Range range) {
// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());

fetchItems.apply(parentKey, range).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchItems.apply(parentKey, range)) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
return activeKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ private Stream<T> getFlatChildrenStream(T parent) {
private Stream<T> getFlatChildrenStream(T parent, boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down Expand Up @@ -563,8 +564,9 @@ private Stream<T> getChildrenStream(T parent, Range range,
boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent, range)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent, range)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,18 @@ public void getItem_withUndefinedSizeAndSorting() {
dataCommunicator.getItem(2));
}

@Test
public void getItem_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);

fakeClientCommunication();
dataCommunicator.getItem(0);

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void itemCountEstimateAndStep_defaults() {
Assert.assertEquals(dataCommunicator.getItemCountEstimate(),
Expand Down Expand Up @@ -1353,6 +1365,18 @@ public void fetchFromProvider_itemCountLessThanTwoPages_correctItemsReturned() {

}

@Test
public void fetchFromProvider_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);
dataCommunicator.setRequestedRange(0, 50);

fakeClientCommunication();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchEnabled_getItemCount_stillReturnsItemsCount() {
dataCommunicator.setFetchEnabled(false);
Expand Down Expand Up @@ -1737,6 +1761,11 @@ public Stream<Item> fetch(Query<Item, Object> query) {
}

private AbstractDataProvider<Item, Object> createDataProvider() {
return createDataProvider(new AtomicBoolean());
}

private AbstractDataProvider<Item, Object> createDataProvider(
AtomicBoolean streamIsClosed) {
return new AbstractDataProvider<Item, Object>() {
@Override
public boolean isInMemory() {
Expand All @@ -1752,7 +1781,8 @@ public int size(Query<Item, Object> query) {
public Stream<Item> fetch(Query<Item, Object> query) {
return asParallelIfRequired(IntStream.range(query.getOffset(),
query.getLimit() + query.getOffset()))
.mapToObj(Item::new);
.mapToObj(Item::new)
.onClose(() -> streamIsClosed.set(true));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -221,6 +223,33 @@ public void uniqueKeyProviderIsNotSet_keysGeneratedByKeyMapper() {
communicator.getKeyMapper().get(i)));
}

@Test
public void expandRoot_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();

dataProvider = new TreeDataProvider<>(treeData) {

@Override
public Stream<Item> fetchChildren(
HierarchicalQuery<Item, SerializablePredicate<Item>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
};

communicator.setDataProvider(dataProvider, null);

communicator.expand(ROOT);
fakeClientCommunication();

communicator.setParentRequestedRange(0, 50, ROOT);
fakeClientCommunication();

communicator.reset();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void expandRoot_filterOutAllChildren_clearCalled() {
parentClearCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -288,6 +289,42 @@ public void getExpandedItems_tryToAddItemsToCollection_shouldThrowException() {
expandedItems.add(new TreeNode("third-1"));
}

@Test
public void fetchHierarchyItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchHierarchyItems(rootNode, Range.between(0, 10)).count();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchChildItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchChildItems(rootNode, Range.between(0, 10));

Assert.assertTrue(streamIsClosed.get());
}

private void expand(Node node) {
insertRows(mapper.expand(node, mapper.getIndexOf(node).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ private void doSetTree(StateTree tree) {
} else {
id = -1;
}
} else if (id > -1 && getOwner() == NullOwner.get()) {
// When id is set but owner is NullOwner, removeFromTree has been
// called, so we should clear node id and attached state, thus allow
// moving the node to another StateTree
reset(false);
}
owner = tree;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -449,8 +450,30 @@ public static boolean hashAndBundleModulesEqual(JsonObject statsJson,
}

private static boolean versionAccepted(String expected, String actual) {
FrontendVersion expectedVersion = new FrontendVersion(expected);
FrontendVersion actualVersion = new FrontendVersion(actual);
FrontendVersion expectedVersion;
try {
expectedVersion = new FrontendVersion(expected);
} catch (NumberFormatException ex) {
expectedVersion = null;
}
FrontendVersion actualVersion;
try {
actualVersion = new FrontendVersion(actual);
} catch (NumberFormatException ex) {
actualVersion = null;
}

if (expectedVersion == null && actualVersion == null) {
return Objects.equals(expected, actual);
} else if (expectedVersion == null || actualVersion == null) {
// expected or actual version is referencing a local package
// while the other one is a parsable version
getLogger().debug(
"Version '{}' cannot be parsed and compared to '{}'",
expectedVersion == null ? expected : actual,
expectedVersion == null ? actual : expected);
return false;
}

if (expected.startsWith("~")) {
boolean correctRange = expectedVersion
Expand Down
Loading

0 comments on commit d0f483f

Please sign in to comment.