Skip to content

Commit

Permalink
DATAMONGO-2341 - Polishing.
Browse files Browse the repository at this point in the history
Inline MongoPersistentEntity.idPropertyIsShardKey() into UpdateContext. Move mapped shard key cache to QueryOperations level. Simplify conditionals. Tweak documentation.

Original pull request: #833.
  • Loading branch information
mp911de committed Feb 17, 2020
1 parent 6259cd2 commit 22ca597
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 117 deletions.
Expand Up @@ -1640,7 +1640,7 @@ protected UpdateResult doUpdate(String collectionName, Query query, UpdateDefini
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first());
}
}

ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entityClass);
return collection.replaceOne(filter, updateObj, replaceOptions);
} else {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapping.ShardKey;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Query;
Expand All @@ -58,9 +59,10 @@
/**
* {@link QueryOperations} centralizes common operations required before an operation is actually ready to be executed.
* This involves mapping {@link Query queries} into their respective MongoDB representation, computing execution options
* for {@literal count}, {@literal remove}, ... <br />
* for {@literal count}, {@literal remove}, and other methods.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 3.0
*/
class QueryOperations {
Expand All @@ -71,6 +73,7 @@ class QueryOperations {
private final CodecRegistryProvider codecRegistryProvider;
private final MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
private final AggregationUtil aggregationUtil;
private final Map<Class<?>, Document> mappedShardKey = new ConcurrentHashMap<>(1);

/**
* Create a new instance of {@link QueryOperations}.
Expand Down Expand Up @@ -503,7 +506,6 @@ class UpdateContext extends QueryContext {
private final boolean upsert;
private final @Nullable UpdateDefinition update;
private final @Nullable MappedDocument mappedDocument;
private final Map<Class<?>, Document> mappedShardKey = new ConcurrentHashMap<>(1);

/**
* Create a new {@link UpdateContext} instance.
Expand Down Expand Up @@ -624,41 +626,49 @@ <T> Document getMappedQuery(@Nullable MongoPersistentEntity<T> domainType) {
return mappedQuery;
}

<T> Document applyShardKey(@Nullable MongoPersistentEntity<T> domainType, Document filter,
@Nullable Document existing) {
<T> Document applyShardKey(MongoPersistentEntity<T> domainType, Document filter, @Nullable Document existing) {

Document shardKeySource = existing != null ? existing
: mappedDocument != null ? mappedDocument.getDocument() : getMappedUpdate(domainType);

Document filterWithShardKey = new Document(filter);
for (String key : getMappedShardKeyFields(domainType)) {
if (!filterWithShardKey.containsKey(key)) {
filterWithShardKey.append(key, shardKeySource.get(key));
}
}
getMappedShardKeyFields(domainType).forEach(key -> filterWithShardKey.putIfAbsent(key, shardKeySource.get(key)));

return filterWithShardKey;
}

<T> boolean requiresShardKey(Document filter, @Nullable MongoPersistentEntity<T> domainType) {
boolean requiresShardKey(Document filter, @Nullable MongoPersistentEntity<?> domainType) {

if (multi || domainType == null || !domainType.isSharded() || domainType.idPropertyIsShardKey()) {
return !multi && domainType != null && domainType.isSharded() && !shardedById(domainType)
&& !filter.keySet().containsAll(getMappedShardKeyFields(domainType));
}

/**
* @return {@literal true} if the {@link MongoPersistentEntity#getShardKey() shard key} is the entities
* {@literal id} property.
* @since 3.0
*/
private boolean shardedById(MongoPersistentEntity<?> domainType) {

ShardKey shardKey = domainType.getShardKey();
if (shardKey.size() != 1) {
return false;
}

if (filter.keySet().containsAll(getMappedShardKeyFields(domainType))) {
return false;
String key = shardKey.getPropertyNames().iterator().next();
if ("_id".equals(key)) {
return true;
}

return true;
MongoPersistentProperty idProperty = domainType.getIdProperty();
return idProperty != null && idProperty.getName().equals(key);
}

Set<String> getMappedShardKeyFields(@Nullable MongoPersistentEntity<?> entity) {
Set<String> getMappedShardKeyFields(MongoPersistentEntity<?> entity) {
return getMappedShardKey(entity).keySet();
}

Document getMappedShardKey(@Nullable MongoPersistentEntity<?> entity) {

Document getMappedShardKey(MongoPersistentEntity<?> entity) {
return mappedShardKey.computeIfAbsent(entity.getType(),
key -> queryMapper.getMappedFields(entity.getShardKey().getDocument(), entity));
}
Expand Down
Expand Up @@ -1637,7 +1637,7 @@ protected Mono<Object> saveDocument(String collectionName, Document document, Cl
? collection //
: collection.withWriteConcern(writeConcernToUse);

Publisher<?> publisher = null;
Publisher<?> publisher;
if (!mapped.hasId()) {
publisher = collectionToUse.insertOne(document);
} else {
Expand All @@ -1647,20 +1647,23 @@ protected Mono<Object> saveDocument(String collectionName, Document document, Cl
Document filter = updateContext.getMappedQuery(entity);
Document replacement = updateContext.getMappedUpdate(entity);

Mono<Document> theFilter = Mono.just(filter);
Mono<Document> deferredFilter;

if(updateContext.requiresShardKey(filter, entity)) {
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
theFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
theFilter = Mono.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
deferredFilter = Mono
.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(replacement).map(it -> updateContext.applyShardKey(entity, filter, it));
}
} else {
deferredFilter = Mono.just(filter);
}

publisher = theFilter.flatMap(
it -> Mono.from(collectionToUse.replaceOne(it, replacement, updateContext.getReplaceOptions(entityClass))));
publisher = deferredFilter.flatMapMany(
it -> collectionToUse.replaceOne(it, replacement, updateContext.getReplaceOptions(entityClass)));
}

return Mono.from(publisher).map(o -> mapped.getId());
Expand Down Expand Up @@ -1800,20 +1803,22 @@ protected Mono<UpdateResult> doUpdate(String collectionName, Query query, @Nulla
if (!UpdateMapper.isUpdateObject(updateObj)) {

Document filter = new Document(queryObj);
Mono<Document> theFilter = Mono.just(filter);
Mono<Document> deferredFilter;

if(updateContext.requiresShardKey(filter, entity)) {
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
theFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
deferredFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
theFilter = Mono.from(
deferredFilter = Mono.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(updateObj).map(it -> updateContext.applyShardKey(entity, filter, it));
}
} else {
deferredFilter = Mono.just(filter);
}

ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entityClass);
return theFilter.flatMap(it -> Mono.from(collectionToUse.replaceOne(it, updateObj, replaceOptions)));
return deferredFilter.flatMap(it -> Mono.from(collectionToUse.replaceOne(it, updateObj, replaceOptions)));
}

return multi ? collectionToUse.updateMany(queryObj, updateObj, updateOptions)
Expand Down
Expand Up @@ -96,7 +96,26 @@ public BasicMongoPersistentEntity(TypeInformation<T> typeInformation) {
this.collationExpression = null;
}

this.shardKey = detectShardKey(this);
this.shardKey = detectShardKey();
}

private ShardKey detectShardKey() {

if (!isAnnotationPresent(Sharded.class)) {
return ShardKey.none();
}

Sharded sharded = getRequiredAnnotation(Sharded.class);

String[] keyProperties = sharded.shardKey();
if (ObjectUtils.isEmpty(keyProperties)) {
keyProperties = new String[] { "_id" };
}

ShardKey shardKey = ShardingStrategy.HASH.equals(sharded.shardingStrategy()) ? ShardKey.hash(keyProperties)
: ShardKey.range(keyProperties);

return sharded.immutableKey() ? ShardKey.immutable(shardKey) : shardKey;
}

/*
Expand Down Expand Up @@ -307,26 +326,6 @@ private static Expression detectExpression(@Nullable String potentialExpression)
return expression instanceof LiteralExpression ? null : expression;
}

@Nullable
private static ShardKey detectShardKey(BasicMongoPersistentEntity<?> entity) {

if (!entity.isAnnotationPresent(Sharded.class)) {
return ShardKey.none();
}

Sharded sharded = entity.getRequiredAnnotation(Sharded.class);

String[] keyProperties = sharded.shardKey();
if (ObjectUtils.isEmpty(keyProperties)) {
keyProperties = new String[] { "_id" };
}

ShardKey shardKey = ShardingStrategy.HASH.equals(sharded.shardingStrategy()) ? ShardKey.hash(keyProperties)
: ShardKey.range(keyProperties);

return sharded.immutableKey() ? ShardKey.immutable(shardKey) : shardKey;
}

/**
* Handler to collect {@link MongoPersistentProperty} instances and check that each of them is mapped to a distinct
* field name.
Expand Down
Expand Up @@ -86,29 +86,11 @@ default boolean hasCollation() {
ShardKey getShardKey();

/**
* @return {@literal true} if the {@link #getShardKey() shard key} does not match {@link ShardKey#none()}.
* @return {@literal true} if the {@link #getShardKey() shard key} is sharded.
* @since 3.0
*/
default boolean isSharded() {
return !ShardKey.none().equals(getShardKey());
return getShardKey().isSharded();
}

/**
* @return {@literal true} if the {@link #getShardKey() shard key} is the entities {@literal id} property.
* @since 3.0
*/
default boolean idPropertyIsShardKey() {

ShardKey shardKey = getShardKey();
if (shardKey.size() != 1) {
return false;
}
String key = shardKey.getPropertyNames().iterator().next();
if ("_id".equals(key)) {
return true;
}

MongoPersistentProperty idProperty = getIdProperty();
return idProperty != null && idProperty.getName().equals(key);
}
}
Expand Up @@ -28,12 +28,13 @@
* Value object representing an entities <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/">Shard
* Key</a> used to distribute documents across a sharded MongoDB cluster.
* <p />
* {@link ShardKey#isImmutable() Immutable} shard keys indicate a fixed value that is not updated (see
* {@link ShardKey#isImmutable() Immutable} shard keys indicates a fixed value that is not updated (see
* <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>), which allows to skip server round trips in cases where a
* Reference: Change a Document's Shard Key Value</a>), which allows to skip server round trips in cases where a
* potential shard key change might have occurred.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 3.0
*/
public class ShardKey {
Expand Down Expand Up @@ -68,14 +69,23 @@ public Collection<String> getPropertyNames() {
/**
* @return {@literal true} if the shard key of an document does not change.
* @see <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>
* Reference: Change a Document's Shard Key Value</a>
*/
public boolean isImmutable() {
return immutable;
}

/**
* Get the unmapped MongoDB representation of the {@link ShardKey}.
* Return whether the shard key represents a sharded key. Return {@literal false} if the key is not sharded.
*
* @return {@literal true} if the key is sharded; {@literal false} otherwise.
*/
public boolean isSharded() {
return !propertyNames.isEmpty();
}

/**
* Get the raw MongoDB representation of the {@link ShardKey}.
*
* @return never {@literal null}.
*/
Expand Down
Expand Up @@ -25,26 +25,29 @@
import org.springframework.data.annotation.Persistent;

/**
* The {@link Sharded} annotation provides meta information about the actual distribution of data across multiple
* machines. The {@link #shardKey()} is used to distribute documents across shards. <br />
* Please visit the <a href="https://docs.mongodb.com/manual/sharding/">MongoDB Documentation</a> for more information
* about requirements and limitations of sharding. <br />
* Spring Data will automatically add the shard key to filter queries used for
* The {@link Sharded} annotation provides meta information about the actual distribution of data. The
* {@link #shardKey()} is used to distribute documents across shards. <br />
* Please see the <a href="https://docs.mongodb.com/manual/sharding/">MongoDB Documentation</a> for more information
* about requirements and limitations of sharding.
* <p/>
* Spring Data adds the shard key to filter queries used for
* {@link com.mongodb.client.MongoCollection#replaceOne(org.bson.conversions.Bson, Object)} operations triggered by
* {@code save} operations on {@link org.springframework.data.mongodb.core.MongoOperations} and
* {@link org.springframework.data.mongodb.core.ReactiveMongoOperations} as well as {@code update/upsert} operation
* {@link org.springframework.data.mongodb.core.ReactiveMongoOperations} as well as {@code update/upsert} operations
* replacing/upserting a single existing document as long as the given
* {@link org.springframework.data.mongodb.core.query.UpdateDefinition} holds a full copy of the entity. <br />
* {@link org.springframework.data.mongodb.core.query.UpdateDefinition} holds a full copy of the entity.
* <p/>
* All other operations that require the presence of the {@literal shard key} in the filter query need to provide the
* information via the {@link org.springframework.data.mongodb.core.query.Query} parameter when invoking the method.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 3.0
*/
@Persistent
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
public @interface Sharded {

/**
Expand All @@ -57,15 +60,15 @@
String[] value() default {};

/**
* The shard key determines the distribution of the collection’s documents among the cluster’s shards. The shard key
* is either a single or multiple indexed properties that exist in every document in the collection. <br />
* The shard key determines the distribution of the collection's documents among the cluster's shards. The shard key
* is either a single or multiple indexed properties that exist in every document in the collection.
* <p/>
* By default the {@literal id} property is used for sharding. <br />
* <strong>NOTE</strong> Required indexes will not be created automatically. Use
* {@link org.springframework.data.mongodb.core.index.Indexed} or
* <strong>NOTE</strong> Required indexes are not created automatically. Create these either externally, via
* {@link org.springframework.data.mongodb.core.index.IndexOperations#ensureIndex(org.springframework.data.mongodb.core.index.IndexDefinition)}
* or by annotating your domain model with {@link org.springframework.data.mongodb.core.index.Indexed}/
* {@link org.springframework.data.mongodb.core.index.CompoundIndex} along with enabled
* {@link org.springframework.data.mongodb.config.MongoConfigurationSupport#autoIndexCreation() auto index creation}
* or set up them up via
* {@link org.springframework.data.mongodb.core.index.IndexOperations#ensureIndex(org.springframework.data.mongodb.core.index.IndexDefinition)}.
* {@link org.springframework.data.mongodb.config.MongoConfigurationSupport#autoIndexCreation() auto index creation}.
*
* @return an empty key by default. Which indicates to use the entities {@literal id} property.
*/
Expand All @@ -82,10 +85,10 @@
/**
* As of MongoDB 4.2 it is possible to change the shard key using update. Using immutable shard keys avoids server
* round trips to obtain an entities actual shard key from the database.
*
* @return {@literal false} by default;
*
* @return {@literal false} by default.
* @see <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>
* Reference: Change a Document's Shard Key Value</a>
*/
boolean immutableKey() default false;

Expand Down

0 comments on commit 22ca597

Please sign in to comment.