Skip to content

Commit

Permalink
Introduced ObjectSet abstraction into internal APIs, to allow indexes…
Browse files Browse the repository at this point in the history
… to close iterators/connections preemptively. Closes #74.
  • Loading branch information
npgall committed Jul 6, 2016
1 parent 1036183 commit cf34091
Show file tree
Hide file tree
Showing 24 changed files with 794 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.googlecode.cqengine.index.support.CloseableRequestResources;
import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.persistence.support.ObjectStoreAsSet;
import com.googlecode.cqengine.persistence.support.PersistenceFlags;
Expand Down Expand Up @@ -313,7 +314,7 @@ public O next() {
@Override
public void remove() {
collectionIterator.remove();
indexEngine.removeAll(singleton(currentObject), queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(singleton(currentObject)), queryOptions);
}

@Override
Expand All @@ -334,7 +335,7 @@ public boolean add(O o) {
// Add the object to the index.
// Indexes handle gracefully the case that the objects supplied already exist in the index...
boolean modified = objectStore.add(o, queryOptions);
indexEngine.addAll(singleton(o), queryOptions);
indexEngine.addAll(ObjectSet.fromCollection(singleton(o)), queryOptions);
return modified;
}
finally {
Expand All @@ -352,7 +353,7 @@ public boolean remove(Object object) {
@SuppressWarnings({"unchecked"})
O o = (O) object;
boolean modified = objectStore.remove(o, queryOptions);
indexEngine.removeAll(singleton(o), queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(singleton(o)), queryOptions);
return modified;
}
finally {
Expand All @@ -370,7 +371,7 @@ public boolean addAll(Collection<? extends O> c) {
@SuppressWarnings({"unchecked"})
Collection<O> objects = (Collection<O>) c;
boolean modified = objectStore.addAll(objects, queryOptions);
indexEngine.addAll(objects, queryOptions);
indexEngine.addAll(ObjectSet.fromCollection(objects), queryOptions);
return modified;
}
finally {
Expand All @@ -388,7 +389,7 @@ public boolean removeAll(Collection<?> c) {
@SuppressWarnings({"unchecked"})
Collection<O> objects = (Collection<O>) c;
boolean modified = objectStore.removeAll(objects, queryOptions);
indexEngine.removeAll(objects, queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(objects), queryOptions);
return modified;
}
finally {
Expand Down Expand Up @@ -440,14 +441,14 @@ boolean doAddAll(Iterable<O> objects, QueryOptions queryOptions) {
if (objects instanceof Collection) {
Collection<O> c = (Collection<O>) objects;
boolean modified = objectStore.addAll(c, queryOptions);
indexEngine.addAll(c, queryOptions);
indexEngine.addAll(ObjectSet.fromCollection(c), queryOptions);
return modified;
}
else {
boolean modified = false;
for (O object : objects) {
boolean added = objectStore.add(object, queryOptions);
indexEngine.addAll(singleton(object), queryOptions);
indexEngine.addAll(ObjectSet.fromCollection(singleton(object)), queryOptions);
modified = added || modified;
}
return modified;
Expand All @@ -458,13 +459,13 @@ boolean doRemoveAll(Iterable<O> objects, QueryOptions queryOptions) {
if (objects instanceof Collection) {
Collection<O> c = (Collection<O>) objects;
boolean modified = objectStore.removeAll(c, queryOptions);
indexEngine.removeAll(c, queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(c), queryOptions);
return modified;
} else {
boolean modified = false;
for (O object : objects) {
boolean removed = objectStore.remove(object, queryOptions);
indexEngine.removeAll(singleton(object), queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(singleton(object)), queryOptions);
modified = removed || modified;
}
return modified;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.googlecode.cqengine.index.support.CloseableRequestResources;
import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.query.option.QueryOptions;

import java.util.Collection;
Expand Down Expand Up @@ -160,7 +161,7 @@ public void remove() {
lock.lock();
try {
collectionIterator.remove();
indexEngine.removeAll(Collections.singleton(currentObject), queryOptions);
indexEngine.removeAll(ObjectSet.fromCollection(Collections.singleton(currentObject)), queryOptions);
}
finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.option.ArgumentValidationOption;
import com.googlecode.cqengine.query.option.FlagsEnabled;
Expand Down Expand Up @@ -239,7 +240,7 @@ public boolean update(final Iterable<O> objectsToRemove, final Iterable<O> objec
return false;
}
if (FlagsEnabled.isFlagEnabled(queryOptions, STRICT_REPLACEMENT)) {
if (!collectionContainsAllIterable(getObjectStoreAsSet(queryOptions), objectsToRemove)) {
if (!objectStoreContainsAllIterable(objectStore, objectsToRemove, queryOptions)) {
return false;
}
}
Expand Down Expand Up @@ -410,12 +411,12 @@ else if (objects instanceof ResultSet) {
}
}

static <O> boolean collectionContainsAllIterable(Collection<O> collection, Iterable<O> candidates) {
static <O> boolean objectStoreContainsAllIterable(ObjectStore<O> objectStore, Iterable<O> candidates, QueryOptions queryOptions) {
if (candidates instanceof Collection) {
return collection.containsAll((Collection<?>) candidates);
return objectStore.containsAll((Collection<?>) candidates, queryOptions);
}
for (O candidate : candidates) {
if (!collection.contains(candidate)) {
if (!objectStore.contains(candidate, queryOptions)) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.googlecode.cqengine.index.fallback.FallbackIndex;
import com.googlecode.cqengine.index.standingquery.StandingQueryIndex;
import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.persistence.support.ObjectStoreResultSet;
import com.googlecode.cqengine.persistence.support.sqlite.SQLiteObjectStore;
Expand Down Expand Up @@ -1115,13 +1116,13 @@ public ResultSet<O> next() {
* {@inheritDoc}
*/
@Override
public boolean addAll(final Collection<O> objects, final QueryOptions queryOptions) {
public boolean addAll(final ObjectSet<O> objectSet, final QueryOptions queryOptions) {
ensureMutable();
final FlagHolder modified = new FlagHolder();
forEachIndexDo(new IndexOperation<O>() {
@Override
public boolean perform(Index<O> index) {
modified.value |= index.addAll(objects, queryOptions);
modified.value |= index.addAll(objectSet, queryOptions);
return true;
}
});
Expand All @@ -1132,13 +1133,13 @@ public boolean perform(Index<O> index) {
* {@inheritDoc}
*/
@Override
public boolean removeAll(final Collection<O> objects, final QueryOptions queryOptions) {
public boolean removeAll(final ObjectSet<O> objectSet, final QueryOptions queryOptions) {
ensureMutable();
final FlagHolder modified = new FlagHolder();
forEachIndexDo(new IndexOperation<O>() {
@Override
public boolean perform(Index<O> index) {
modified.value |= index.removeAll(objects, queryOptions);
modified.value |= index.removeAll(objectSet, queryOptions);
return true;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
*/
package com.googlecode.cqengine.engine;

import com.googlecode.cqengine.persistence.Persistence;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.query.option.QueryOptions;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
* @author Niall Gallagher
*/
Expand All @@ -31,20 +27,18 @@ public interface ModificationListener<O> {
/**
* Notifies the listener that the specified objects are being added to the collection, and so it can take action
* and update its internal data structures.
*
* @param objects The objects being added
* @param objectSet The objects being added
* @param queryOptions Optional parameters for the update
*/
public boolean addAll(Collection<O> objects, QueryOptions queryOptions);
public boolean addAll(ObjectSet<O> objectSet, QueryOptions queryOptions);

/**
* Notifies the listener that the specified objects are being removed from the collection, and so it can take action
* and update its internal data structures.
*
* @param objects The objects being removed
* @param objectSet The objects being removed
* @param queryOptions Optional parameters for the update
*/
public boolean removeAll(Collection<O> objects, QueryOptions queryOptions);
public boolean removeAll(ObjectSet<O> objectSet, QueryOptions queryOptions);

/**
* Notifies the listener that all objects have been removed from the collection, and so it can take action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.googlecode.cqengine.index.fallback;

import com.googlecode.cqengine.index.Index;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.option.QueryOptions;
Expand Down Expand Up @@ -92,17 +93,18 @@ public boolean isQuantized() {
*/
@Override
public ResultSet<O> retrieve(final Query<O> query, final QueryOptions queryOptions) {
final ObjectSet<O> objectSet = ObjectSet.fromObjectStore(objectStore, queryOptions);
return new ResultSet<O>() {
@Override
public Iterator<O> iterator() {
if (query instanceof All) {
return IteratorUtil.wrapAsUnmodifiable(objectStore.iterator(queryOptions));
return IteratorUtil.wrapAsUnmodifiable(objectSet.iterator());
}
else if (query instanceof None) {
return Collections.<O>emptyList().iterator();
}
else {
return new FilteringIterator<O>(objectStore.iterator(queryOptions), queryOptions) {
return new FilteringIterator<O>(objectSet.iterator(), queryOptions) {
@Override
public boolean isValid(O object, QueryOptions queryOptions) {
return query.matches(object, queryOptions);
Expand Down Expand Up @@ -134,7 +136,7 @@ public int getMergeCost() {
}
@Override
public void close() {
// No op.
objectSet.close();
}
@Override
public Query<O> getQuery() {
Expand All @@ -153,7 +155,7 @@ public QueryOptions getQueryOptions() {
* <b>In this implementation, does nothing.</b>
*/
@Override
public boolean addAll(Collection<O> objects, QueryOptions queryOptions) {
public boolean addAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
// No need to take any action
return false;
}
Expand All @@ -164,7 +166,7 @@ public boolean addAll(Collection<O> objects, QueryOptions queryOptions) {
* <b>In this implementation, does nothing.</b>
*/
@Override
public boolean removeAll(Collection<O> objects, QueryOptions queryOptions) {
public boolean removeAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
// No need to take any action
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.googlecode.cqengine.index.Index;
import com.googlecode.cqengine.index.support.AbstractAttributeIndex;
import com.googlecode.cqengine.index.support.indextype.OnHeapTypeIndex;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.persistence.support.ObjectStoreAsSet;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.option.QueryOptions;
import com.googlecode.cqengine.query.simple.Equal;
Expand Down Expand Up @@ -246,60 +246,70 @@ public StoredResultSet<O> createValueSet() {
* {@inheritDoc}
*/
@Override
public boolean addAll(Collection<O> objects, QueryOptions queryOptions) {
boolean modified = false;
final RadixTree<StoredResultSet<O>> tree = this.tree;
for (O object : objects) {
Iterable<A> attributeValues = getAttribute().getValues(object, queryOptions);
for (A attributeValue : attributeValues) {
public boolean addAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
try {
boolean modified = false;
final RadixTree<StoredResultSet<O>> tree = this.tree;
for (O object : objectSet) {
Iterable<A> attributeValues = getAttribute().getValues(object, queryOptions);
for (A attributeValue : attributeValues) {

// Look up StoredResultSet for the value...
StoredResultSet<O> valueSet = tree.getValueForExactKey(attributeValue);
if (valueSet == null) {
// No StoredResultSet, create and add one...
valueSet = createValueSet();
StoredResultSet<O> existingValueSet = tree.putIfAbsent(attributeValue, valueSet);
if (existingValueSet != null) {
// Another thread won race to add new value set, use that one...
valueSet = existingValueSet;
// Look up StoredResultSet for the value...
StoredResultSet<O> valueSet = tree.getValueForExactKey(attributeValue);
if (valueSet == null) {
// No StoredResultSet, create and add one...
valueSet = createValueSet();
StoredResultSet<O> existingValueSet = tree.putIfAbsent(attributeValue, valueSet);
if (existingValueSet != null) {
// Another thread won race to add new value set, use that one...
valueSet = existingValueSet;
}
}
// Add the object to the StoredResultSet for this value...
modified |= valueSet.add(object);
}
// Add the object to the StoredResultSet for this value...
modified |= valueSet.add(object);
}
return modified;
}
finally {
objectSet.close();
}
return modified;
}

/**
* {@inheritDoc}
*/
@Override
public boolean removeAll(Collection<O> objects, QueryOptions queryOptions) {
boolean modified = false;
final RadixTree<StoredResultSet<O>> tree = this.tree;
for (O object : objects) {
Iterable<A> attributeValues = getAttribute().getValues(object, queryOptions);
for (A attributeValue : attributeValues) {
StoredResultSet<O> valueSet = tree.getValueForExactKey(attributeValue);
if (valueSet == null) {
continue;
}
modified |= valueSet.remove(object);
if (valueSet.isEmpty()) {
tree.remove(attributeValue);
public boolean removeAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
try {
boolean modified = false;
final RadixTree<StoredResultSet<O>> tree = this.tree;
for (O object : objectSet) {
Iterable<A> attributeValues = getAttribute().getValues(object, queryOptions);
for (A attributeValue : attributeValues) {
StoredResultSet<O> valueSet = tree.getValueForExactKey(attributeValue);
if (valueSet == null) {
continue;
}
modified |= valueSet.remove(object);
if (valueSet.isEmpty()) {
tree.remove(attributeValue);
}
}
}
return modified;
}
finally {
objectSet.close();
}
return modified;
}

/**
* {@inheritDoc}
*/
@Override
public void init(ObjectStore<O> objectStore, QueryOptions queryOptions) {
addAll(new ObjectStoreAsSet<O>(objectStore, queryOptions), queryOptions);
addAll(ObjectSet.fromObjectStore(objectStore, queryOptions), queryOptions);
}

/**
Expand Down
Loading

0 comments on commit cf34091

Please sign in to comment.