Skip to content

Commit

Permalink
Add support for readPreferenceTags in mongodb
Browse files Browse the repository at this point in the history
Add support for readPreferenceTags that lead the mongodb connector to read a specific sharded cluster to configuration properties. The properties are split tag sets as a character '&' and specified a tag set as a comma-separated list of colon-separated key-value paris. For example, mongodb.read-preference-tags=dc:east,use:reporting&use:reporting
  • Loading branch information
ciscoring authored and rschlussel committed Apr 18, 2019
1 parent 56722ff commit a0f64ed
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.collect.ImmutableList;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.Tag;
import com.mongodb.TagSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.DefunctConfig;

Expand All @@ -36,6 +38,8 @@ public class MongoClientConfig
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private static final Splitter PORT_SPLITTER = Splitter.on(':').trimResults().omitEmptyStrings();
private static final Splitter USER_SPLITTER = Splitter.onPattern("[:@]").trimResults().omitEmptyStrings();
private static final Splitter TAGSET_SPLITTER = Splitter.on('&').trimResults().omitEmptyStrings();
private static final Splitter TAG_SPLITTER = Splitter.on(':').trimResults().omitEmptyStrings();

private String schemaCollection = "_schema";
private List<ServerAddress> seeds = ImmutableList.of();
Expand All @@ -53,6 +57,7 @@ public class MongoClientConfig
private int cursorBatchSize; // use driver default

private ReadPreferenceType readPreference = ReadPreferenceType.PRIMARY;
private List<TagSet> readPreferenceTagSets = ImmutableList.of();
private WriteConcernType writeConcern = WriteConcernType.ACKNOWLEDGED;
private String requiredReplicaSetName;
private String implicitRowFieldPrefix = "_pos";
Expand Down Expand Up @@ -226,6 +231,38 @@ public MongoClientConfig setReadPreference(ReadPreferenceType readPreference)
return this;
}

public List<TagSet> getReadPreferenceTags()
{
return readPreferenceTagSets;
}

@Config("mongodb.read-preference-tags")
public MongoClientConfig setReadPreferenceTags(String readPreferenceTags)
{
this.readPreferenceTagSets = buildTagSets(TAGSET_SPLITTER.split(readPreferenceTags));
return this;
}

private List<TagSet> buildTagSets(Iterable<String> tagSets)
{
ImmutableList.Builder<TagSet> builder = ImmutableList.builder();
for (String tagSet : tagSets) {
builder.add(new TagSet(buildTags(SPLITTER.split(tagSet))));
}
return builder.build();
}

private List<Tag> buildTags(Iterable<String> tags)
{
ImmutableList.Builder<Tag> builder = ImmutableList.builder();
for (String tag : tags) {
List<String> values = TAG_SPLITTER.splitToList(tag);
checkArgument(values.size() == 2, "Invalid Tag format. Requires tagName:tagValue");
builder.add(new Tag(values.get(0), values.get(1)));
}
return builder.build();
}

public WriteConcernType getWriteConcern()
{
return writeConcern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ public static MongoSession createMongoSession(TypeManager typeManager, MongoClie
.sslEnabled(config.getSslEnabled())
.maxWaitTime(config.getMaxWaitTime())
.minConnectionsPerHost(config.getMinConnectionsPerHost())
.readPreference(config.getReadPreference().getReadPreference())
.writeConcern(config.getWriteConcern().getWriteConcern());

if (config.getRequiredReplicaSetName() != null) {
options.requiredReplicaSetName(config.getRequiredReplicaSetName());
}

if (config.getReadPreferenceTags().isEmpty()) {
options.readPreference(config.getReadPreference().getReadPreference());
}
else {
options.readPreference(config.getReadPreference().getReadPreferenceWithTags(config.getReadPreferenceTags()));
}

MongoClient client = new MongoClient(config.getSeeds(), config.getCredentials(), options.build());

return new MongoSession(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
*/
package com.facebook.presto.mongodb;

import com.facebook.presto.spi.PrestoException;
import com.mongodb.ReadPreference;
import com.mongodb.TagSet;

import java.util.List;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static java.util.Objects.requireNonNull;

public enum ReadPreferenceType
Expand All @@ -36,4 +41,13 @@ public ReadPreference getReadPreference()
{
return readPreference;
}

public ReadPreference getReadPreferenceWithTags(List<TagSet> tagSets)
{
if (PRIMARY.equals(this)) {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Primary read preference can not specify tag sets");
}

return ReadPreference.valueOf(readPreference.getName(), tagSets);
}
}

0 comments on commit a0f64ed

Please sign in to comment.