diff --git a/pom.xml b/pom.xml index 7c8054f4eacb..e603a28a5b07 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ presto-base-jdbc presto-mysql presto-postgresql + presto-mongodb presto-bytecode presto-client presto-parser diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst index 10dbb9d6628e..e11859910119 100644 --- a/presto-docs/src/main/sphinx/connector.rst +++ b/presto-docs/src/main/sphinx/connector.rst @@ -14,6 +14,7 @@ from different data sources. connector/jmx connector/kafka connector/kafka-tutorial + connector/mongodb connector/mysql connector/postgresql connector/redis diff --git a/presto-docs/src/main/sphinx/connector/mongodb.rst b/presto-docs/src/main/sphinx/connector/mongodb.rst new file mode 100644 index 000000000000..cda6a20f20fc --- /dev/null +++ b/presto-docs/src/main/sphinx/connector/mongodb.rst @@ -0,0 +1,244 @@ +================= +MongoDB Connector +================= + +This connector allows the use of Mongodb collections as tables in Presto. + +.. note:: + + Mongodb 2.6+ is supported although it is highly recommend to use 3.0 or later. + +Configuration +------------- + +To configure the MongoDB connector, create a catalog properties file +``etc/catalog/mongodb.properties`` with the following contents, +replacing the properties as appropriate: + +.. code-block:: none + + connector.name=mongodb + mongodb.seeds=host1,host:port + +Multiple MongoDB Clusters +^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can have as many catalogs as you need, so if you have additional +MongoDB clusters, simply add another properties file to ``etc/catalog`` +with a different name (making sure it ends in ``.properties``). For +example, if you name the property file ``sales.properties``, Presto +will create a catalog named ``sales`` using the configured connector. + +Configuration Properties +------------------------ + +The following configuration properties are available: + +===================================== ============================================================== +Property Name Description +===================================== ============================================================== +``mongodb.seeds`` List of all mongod servers +``mongodb.schema-collection`` A collection which contains schema information +``mongodb.credentials`` List of credentials +``mongodb.min-connections-per-host`` The minimum size of the connection pool per host +``mongodb.connections-per-host`` The maximum size of the connection pool per host +``mongodb.max-wait-time`` The maximum wait time +``mongodb.connection-timeout`` The socket connect timeout +``mongodb.socket-timeout`` The socket timeout +``mongodb.socket-keep-alive`` Whether keep-alive is enabled on each socket +``mongodb.read-preference`` The read preference +``mongodb.write-concern`` The write concern +``mongodb.required-replica-set`` The required replica set name +``mongodb.cursor-batch-size`` The number of elements to return in a batch +===================================== ============================================================== + +``Mongodb.seeds`` +^^^^^^^^^^^^^^^^^ + +Comma-separated list of ``hostname[:port]`` all mongod servers in the same replica set or a list of mongos servers in the same sharded cluster. If port is not specified, port 27017 will be used. + +This property is required; there is no default and at least one seed must be defined. + +``mongodb.schema-collection`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +As the MongoDB is a document database, there's no fixed schema information in the system. So a special collection in each MongoDB database should defines the schema of all tables. Please refer the :ref:`table-definition-label` section for the details. + +At startup, this plugin tries guessin fields' types, but it might not be correct for your collection. In that case, you need to modify it manually. ``CREATE TABLE`` and ``CREATE TABLE AS SELECT`` will create an entry for you. + +This property is optional; the default is ``_schema``. + +``mongodb.credentials`` +^^^^^^^^^^^^^^^^^^^^^^^ + +A comma separated list of ``username:password@collection`` credentials + +This property is optional; no default value + +``mongodb.min-connections-per-host`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The minimum number of connections per host for this MongoClient instance. Those connections will be kept in a pool when idle, and the pool will ensure over time that it contains at least this minimum number. + +This property is optional; the default is ``0``. + +``mongodb.connections-per-host`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The maximum number of connections allowed per host for this MongoClient instance. Those connections will be kept in a pool when idle. Once the pool is exhausted, any operation requiring a connection will block waiting for an available connection. + +This property is optional; the default is ``100``. + +``mongodb.max-wait-time`` +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The maximum wait time in milliseconds that a thread may wait for a connection to become available. +A value of 0 means that it will not wait. A negative value means to wait indefinitely for a connection to become available. + +This property is optional; the default is ``120000``. + +``mongodb.connection-timeout`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The connection timeout in milliseconds. A value of 0 means no timeout. It is used solely when establishing a new connection Socket.connect(java.net.SocketAddress, int) + +This property is optional; the default is ``10000``. + +``mongodb.socket-timeout`` +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The socket timeout in milliseconds. It is used for I/O socket read and write operations Socket.setSoTimeout(int) + +This property is optional; the default is ``0`` and means no timeout. + +``mongodb.socket-keep-alive`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This flag controls the socket keep alive feature that keeps a connection alive through firewalls Socket.setKeepAlive(boolean) + +This property is optional; the default is ``false``. + +``mongodb.read-preference`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The read preference to use for queries, map-reduce, aggregation, and count. The available values are PRIMARY, PRIMARY_PREFERRED, SECONDARY, SECONDARY_PREFERRED and NEAREST. + +This property is optional; the default is ``PRIMARY``. + +``mongodb.write-concern`` +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The write concern to use. The available values are ACKNOWLEDGED, FSYNC_SAFE, FSYNCED, JOURNAL_SAFEY, JOURNALED, MAJORITY, NORMAL, REPLICA_ACKNOWLEDGED , REPLICAS_SAFE and UNACKNOWLEDGED. + +This property is optional; the default is ``ACKNOWLEDGED``. + +``mongodb.required-replica-set`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The required replica set name. With this option set, the MongoClient instance will + +#. Connect in replica set mode, and discover all members of the set based on the given servers +#. Make sure that the set name reported by all members matches the required set name. +#. Refuse to service any requests if any member of the seed list is not part of a replica set with the required name. + +This property is optional; no default value + +``mongodb.required-replica-set`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Limits the number of elements returned in one batch. A cursor typically fetches a batch of result objects and stores them locally. +If batchSize is 0, Driver's default will be used. +If batchSize is positive, it represents the size of each batch of objects retrieved. It can be adjusted to optimize performance and limit data transfer. +If batchSize is negative, it will limit of number objects returned, that fit within the max batch size limit (usually 4MB), and cursor will be closed. For example if batchSize is -10, then the server will return a maximum of 10 documents and as many as can fit in 4MB, then close the cursor. + +.. note:: + Do not use a batch size of 1. + +This property is optional; the default is ``0``. + +.. _table-definition-label: + +Table Definition +---------------- + +MongoDB maintains table definitions on the special collection where ``mongodb.schema-collection`` configuration value specifies. + +.. note:: + There's no way for the plugin to detect a collection is deleted. + You need to delete the entry by ``db.getCollection("_schema").remove( { table: deleted_table_name })`` at the Mongo Shell. Or please drop a collection by ``drop table table_name`` at Presto shell. + +A schema collection consists of a MongoDB document for a table. + +.. code-block:: json + + { + "table": ..., + "fields": [ + { "name" : ..., + "type" : "varchar|bigint|boolean|double|date|array|...", + "hidden" : false }, + ... + ] + } + } + +=============== ========= ============== ============================= +Field Required Type Description +=============== ========= ============== ============================= +``table`` required string Presto table name +``fields`` required array A list of field definitions. Each field definition creates a new column in the Presto table. +=============== ========= ============== ============================= + +Each field definition: + +.. code-block:: json + + { + "name": ..., + "type": ..., + "hidden": ... + } + +=============== ========= ========= ============================= +Field Required Type Description +=============== ========= ========= ============================= +``name`` required string Name of the column in the Presto table. +``type`` required string Presto type of the column. +``hidden`` optional boolean Hides the column from ``DESCRIBE `` and ``SELECT *``. Defaults to ``false``. +=============== ========= ========= ============================= + +There is no limit on field descriptions for either key or message. + +ObjectId +-------- +MongoDB collection has the special filed ``_id``. The plugin tries to follow the same rules for this special field, so there will be hidden field ``_id``. + + +.. code-block:: sql + + CREATE TABLE IF NOT EXISTS orders ( + orderkey bigint, + orderstatus varchar, + totalprice double, + orderdate date + ); + + insert into orders values( 1, 'bad', 50.0, current_date); + insert into orders values( 2, 'good', 100.0, current_date); + select _id, * from orders3; + + _id | orderkey | orderstatus | totalprice | orderdate + -------------------------------------+----------+-------------+------------+------------ + 55 b1 51 63 38 64 d6 43 8c 61 a9 ce | 1 | bad | 50.0 | 2015-07-23 + 55 b1 51 67 38 64 d6 43 8c 61 a9 cf | 2 | good | 100.0 | 2015-07-23 + (2 rows) + + select _id, * from orders3 where _id = ObjectId('55b151633864d6438c61a9ce'); + + _id | orderkey | orderstatus | totalprice | orderdate + -------------------------------------+----------+-------------+------------+------------ + 55 b1 51 63 38 64 d6 43 8c 61 a9 ce | 1 | bad | 50.0 | 2015-07-23 + (1 row) + +.. note:: + Unfortunately there's no way to represent _id field more fancy like `55b151633864d6438c61a9ce` diff --git a/presto-mongodb/pom.xml b/presto-mongodb/pom.xml new file mode 100644 index 000000000000..ca6cd02d55ad --- /dev/null +++ b/presto-mongodb/pom.xml @@ -0,0 +1,218 @@ + + + 4.0.0 + + com.facebook.presto + presto-root + 0.145-SNAPSHOT + + + presto-mongodb + Presto - mongodb Connector + presto-plugin + + + ${project.parent.basedir} + 3.1.0 + 1.5.0 + 4.0.32.Final + + + + + org.mongodb + mongo-java-driver + ${mongo-java.version} + + + + joda-time + joda-time + + + + javax.validation + validation-api + + + + io.airlift + bootstrap + + + + io.airlift + json + + + + io.airlift + log + + + + io.airlift + configuration + + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-main + provided + + + + io.airlift + slice + provided + + + + javax.inject + javax.inject + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + + com.facebook.presto + presto-tests + test + + + + com.facebook.presto + presto-tpch + test + + + + io.airlift.tpch + tpch + test + + + + io.airlift + testing + test + + + + javax.annotation + javax.annotation-api + test + + + + de.bwaldvogel + mongo-java-server + ${mongo-server.version} + + + org.mongodb + mongo-java-driver + + + test + + + + de.bwaldvogel + mongo-java-server-core + ${mongo-server.version} + test + + + + de.bwaldvogel + mongo-java-server-memory-backend + ${mongo-server.version} + test + + + + org.testng + testng + test + + + + io.netty + netty-transport + ${netty.version} + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + 1 + + **/TestMongoDistributed.java + + + + + + + + + ci + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + + + + diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java new file mode 100644 index 000000000000..11af6423e6a0 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java @@ -0,0 +1,279 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import io.airlift.configuration.Config; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import java.util.Arrays; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.mongodb.MongoCredential.createCredential; + +public class MongoClientConfig +{ + private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); + private static final Splitter PORT_SPLITTER = Splitter.on(':').trimResults().omitEmptyStrings(); + private static final Splitter USER_SPLITTER = Splitter.onPattern("[:@]").trimResults().omitEmptyStrings(); + + private String schemaCollection = "_schema"; + private List seeds = ImmutableList.of(); + private List credentials = ImmutableList.of(); + + private int minConnectionsPerHost = 0; + private int connectionsPerHost = 100; + private int maxWaitTime = 120_000; + private int connectionTimeout = 10_000; + private int socketTimeout = 0; + private boolean socketKeepAlive = false; + + // query configurations + private int cursorBatchSize = 0; // use driver default + + private ReadPreferenceType readPreference = ReadPreferenceType.PRIMARY; + private WriteConcernType writeConcern = WriteConcernType.ACKNOWLEDGED; + private String requiredReplicaSetName; + private String implicitRowFieldPrefix = "_pos"; + + @NotNull + public String getSchemaCollection() + { + return schemaCollection; + } + + @Config("mongodb.schema-collection") + public MongoClientConfig setSchemaCollection(String schemaCollection) + { + this.schemaCollection = schemaCollection; + return this; + } + + @NotNull + @Size(min = 1) + public List getSeeds() + { + return seeds; + } + + @Config("mongodb.seeds") + public MongoClientConfig setSeeds(String commaSeparatedList) + { + this.seeds = buildSeeds(SPLITTER.split(commaSeparatedList)); + return this; + } + + public MongoClientConfig setSeeds(String... seeds) + { + this.seeds = buildSeeds(Arrays.asList(seeds)); + return this; + } + + @NotNull + @Size(min = 0) + public List getCredentials() + { + return credentials; + } + + @Config("mongodb.credentials") + public MongoClientConfig setCredentials(String credentials) + { + this.credentials = buildCredentials(SPLITTER.split(credentials)); + return this; + } + + public MongoClientConfig setCredentils(String... credentials) + { + this.credentials = buildCredentials(Arrays.asList(credentials)); + return this; + } + + private List buildSeeds(Iterable hostPorts) + { + ImmutableList.Builder builder = ImmutableList.builder(); + for (String hostPort : hostPorts) { + List values = PORT_SPLITTER.splitToList(hostPort); + checkArgument(values.size() == 1 || values.size() == 2, "Invalid ServerAddress format. Requires host[:port]"); + try { + if (values.size() == 1) { + builder.add(new ServerAddress(values.get(0))); + } + else { + builder.add(new ServerAddress(values.get(0), Integer.parseInt(values.get(1)))); + } + } + catch (NumberFormatException e) { + throw Throwables.propagate(e); + } + } + return builder.build(); + } + + private List buildCredentials(Iterable userPasses) + { + ImmutableList.Builder builder = ImmutableList.builder(); + for (String userPass : userPasses) { + List values = USER_SPLITTER.splitToList(userPass); + checkArgument(values.size() == 3, "Invalid Credential format. Requires user:password@collection"); + builder.add(createCredential(values.get(0), values.get(2), values.get(1).toCharArray())); + } + return builder.build(); + } + + @Min(0) + public int getMinConnectionsPerHost() + { + return minConnectionsPerHost; + } + + @Config("mongodb.min-connections-per-host") + public MongoClientConfig setMinConnectionsPerHost(int minConnectionsPerHost) + { + this.minConnectionsPerHost = minConnectionsPerHost; + return this; + } + + @Min(1) + public int getConnectionsPerHost() + { + return connectionsPerHost; + } + + @Config("mongodb.connection-per-host") + public MongoClientConfig setConnectionsPerHost(int connectionsPerHost) + { + this.connectionsPerHost = connectionsPerHost; + return this; + } + + @Min(0) + public int getMaxWaitTime() + { + return maxWaitTime; + } + + @Config("mongodb.max-wait-time") + public MongoClientConfig setMaxWaitTime(int maxWaitTime) + { + this.maxWaitTime = maxWaitTime; + return this; + } + + @Min(0) + public int getConnectionTimeout() + { + return connectionTimeout; + } + + @Config("mongodb.connection-timeout") + public MongoClientConfig setConnectionTimeout(int connectionTimeout) + { + this.connectionTimeout = connectionTimeout; + return this; + } + + @Min(0) + public int getSocketTimeout() + { + return socketTimeout; + } + + @Config("mongodb.socket-timeout") + public MongoClientConfig setSocketTimeout(int socketTimeout) + { + this.socketTimeout = socketTimeout; + return this; + } + + public boolean getSocketKeepAlive() + { + return socketKeepAlive; + } + + @Config("mongodb.socket-keep-alive") + public MongoClientConfig setSocketKeepAlive(boolean socketKeepAlive) + { + this.socketKeepAlive = socketKeepAlive; + return this; + } + + public ReadPreferenceType getReadPreference() + { + return readPreference; + } + + @Config("mongodb.read-preference") + public MongoClientConfig setReadPreference(ReadPreferenceType readPreference) + { + this.readPreference = readPreference; + return this; + } + + public WriteConcernType getWriteConcern() + { + return writeConcern; + } + + @Config("mongodb.write-concern") + public MongoClientConfig setWriteConcern(WriteConcernType writeConcern) + { + this.writeConcern = writeConcern; + return this; + } + + public String getRequiredReplicaSetName() + { + return requiredReplicaSetName; + } + + @Config("mongodb.required-replica-set") + public MongoClientConfig setRequiredReplicaSetName(String requiredReplicaSetName) + { + this.requiredReplicaSetName = requiredReplicaSetName; + return this; + } + + public int getCursorBatchSize() + { + return cursorBatchSize; + } + + @Config("mongodb.cursor-batch-size") + public MongoClientConfig setCursorBatchSize(int cursorBatchSize) + { + this.cursorBatchSize = cursorBatchSize; + return this; + } + + public String getImplicitRowFieldPrefix() + { + return implicitRowFieldPrefix; + } + + @Config("mongodb.implicit-row-field-prefix") + public MongoClientConfig setImplicitRowFieldPrefix(String implicitRowFieldPrefix) + { + this.implicitRowFieldPrefix = implicitRowFieldPrefix; + return this; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java new file mode 100644 index 000000000000..0efb3322722b --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.type.TypeManager; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; + +import javax.inject.Singleton; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.util.Objects.requireNonNull; + +public class MongoClientModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(MongoConnector.class).in(Scopes.SINGLETON); + binder.bind(MongoMetadata.class).in(Scopes.SINGLETON); + binder.bind(MongoSplitManager.class).in(Scopes.SINGLETON); + binder.bind(MongoPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(MongoPageSinkProvider.class).in(Scopes.SINGLETON); + + configBinder(binder).bindConfig(MongoClientConfig.class); + } + + @Singleton + @Provides + public static MongoSession createMongoSession( + TypeManager typeManager, + MongoConnectorId connectorId, + MongoClientConfig config) + { + requireNonNull(config, "config is null"); + + MongoClientOptions.Builder options = MongoClientOptions.builder(); + + options.connectionsPerHost(config.getConnectionsPerHost()) + .connectTimeout(config.getConnectionTimeout()) + .socketTimeout(config.getSocketTimeout()) + .socketKeepAlive(config.getSocketKeepAlive()) + .maxWaitTime(config.getMaxWaitTime()) + .minConnectionsPerHost(config.getMinConnectionsPerHost()) + .readPreference(config.getReadPreference().getReadPreference()) + .writeConcern(config.getWriteConcern().getWriteConcern()); + + if (config.getRequiredReplicaSetName() != null) { + options.requiredReplicaSetName(config.getRequiredReplicaSetName()); + } + + MongoClient client = new MongoClient(config.getSeeds(), config.getCredentials(), options.build()); + + return new MongoSession( + typeManager, + connectorId.toString(), + client, + config); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java new file mode 100644 index 000000000000..e7effdf0bb98 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects.ToStringHelper; +import org.bson.Document; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class MongoColumnHandle + implements ColumnHandle +{ + public static final String SAMPLE_WEIGHT_COLUMN_NAME = "presto_sample_weight"; + + private final String connectorId; + private final String name; + private final Type type; + private final boolean hidden; + + @JsonCreator + public MongoColumnHandle( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("name") String name, + @JsonProperty("columnType") Type type, + @JsonProperty("hidden") boolean hidden) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.name = requireNonNull(name, "name is null"); + this.type = requireNonNull(type, "columnType is null"); + this.hidden = hidden; + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty("columnType") + public Type getType() + { + return type; + } + + @JsonProperty + public boolean isHidden() + { + return hidden; + } + + public ColumnMetadata toColumnMetadata() + { + return new ColumnMetadata(name, type, null, hidden); + } + + public Document getDocument() + { + return new Document().append("name", name) + .append("type", type.getTypeSignature().toString()) + .append("hidden", hidden); + } + + @Override + public int hashCode() + { + return Objects.hash( + connectorId, + name, + hidden); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MongoColumnHandle other = (MongoColumnHandle) obj; + return Objects.equals(this.connectorId, other.connectorId) && + Objects.equals(this.type, other.type) && + Objects.equals(this.name, other.name); + } + + @Override + public String toString() + { + ToStringHelper helper = toStringHelper(this) + .add("connectorId", connectorId) + .add("name", name) + .add("type", type) + .add("hidden", hidden); + + return helper.toString(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java new file mode 100644 index 000000000000..354ffdbbf44f --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.Connector; +import com.facebook.presto.spi.ConnectorMetadata; +import com.facebook.presto.spi.ConnectorPageSinkProvider; +import com.facebook.presto.spi.ConnectorPageSourceProvider; +import com.facebook.presto.spi.ConnectorSplitManager; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class MongoConnector + implements Connector +{ + private final MongoSession mongoSession; + private final MongoMetadata metadata; + private final MongoSplitManager splitManager; + private final MongoPageSourceProvider pageSourceProvider; + private final MongoPageSinkProvider pageSinkProvider; + + @Inject + public MongoConnector( + MongoSession mongoSession, + MongoMetadata metadata, + MongoSplitManager splitManager, + MongoPageSourceProvider pageSourceProvider, + MongoPageSinkProvider pageSinkProvider) + { + this.mongoSession = mongoSession; + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); + this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); + } + + @Override + public ConnectorMetadata getMetadata() + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() + { + return pageSourceProvider; + } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() + { + return pageSinkProvider; + } + + @Override + public void shutdown() + { + mongoSession.shutdown(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java new file mode 100644 index 000000000000..aff295ef16cb --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.Connector; +import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.type.TypeManager; +import com.google.common.base.Throwables; +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.json.JsonModule; + +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.requireNonNull; + +public class MongoConnectorFactory + implements ConnectorFactory +{ + private final String name; + private final Map optionalConfig; + private final TypeManager typeManager; + + public MongoConnectorFactory(String name, TypeManager typeManager, Map optionalConfig) + { + checkArgument(!isNullOrEmpty(name), "name is null or empty"); + this.name = name; + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.optionalConfig = requireNonNull(optionalConfig, "optionalConfig is null"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new MongoHandleResolver(); + } + + @Override + public Connector create(String connectorId, Map config) + { + requireNonNull(config, "config is null"); + + try { + Bootstrap app = new Bootstrap( + new JsonModule(), + new MongoClientModule(), + binder -> { + binder.bind(TypeManager.class).toInstance(typeManager); + binder.bind(MongoConnectorId.class).toInstance(new MongoConnectorId(connectorId)); + }); + + Injector injector = app.strictConfig().doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .setOptionalConfigurationProperties(optionalConfig) + .initialize(); + + return injector.getInstance(MongoConnector.class); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java new file mode 100644 index 000000000000..e2ceb4ece1a4 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class MongoConnectorId +{ + private final String connectorId; + + public MongoConnectorId(String connectorId) + { + requireNonNull(connectorId, "connectorId is null"); + checkArgument(!connectorId.isEmpty(), "connectorId is empty"); + this.connectorId = connectorId; + } + + @Override + public int hashCode() + { + return connectorId.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MongoConnectorId other = (MongoConnectorId) obj; + return this.connectorId.equals(other.connectorId); + } + + @Override + public String toString() + { + return connectorId; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java new file mode 100644 index 000000000000..1a84224b3562 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.metadata.FunctionFactory; +import com.facebook.presto.metadata.FunctionListBuilder; +import com.facebook.presto.metadata.SqlFunction; +import com.facebook.presto.spi.type.TypeManager; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class MongoFunctionFactory + implements FunctionFactory +{ + private final TypeManager typeManager; + + public MongoFunctionFactory(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public List listFunctions() + { + return new FunctionListBuilder(typeManager) + .scalar(ObjectIdFunctions.class) + .getFunctions(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java new file mode 100644 index 000000000000..956d1062b51c --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; + +public class MongoHandleResolver + implements ConnectorHandleResolver +{ + public MongoHandleResolver() + { + } + + @Override + public Class getTableHandleClass() + { + return MongoTableHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return MongoColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return MongoSplit.class; + } + + public Class getOutputTableHandleClass() + { + return MongoOutputTableHandle.class; + } + + public Class getInsertTableHandleClass() + { + return MongoInsertTableHandle.class; + } + + public Class getTableLayoutHandleClass() + { + return MongoTableLayoutHandle.class; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java new file mode 100644 index 000000000000..dd12d72b99c3 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java @@ -0,0 +1,133 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.block.SortOrder; +import com.google.common.collect.ImmutableList; +import com.mongodb.client.ListIndexesIterable; +import org.bson.Document; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class MongoIndex +{ + private final String name; + private final List keys; + private final boolean unique; + + public static List parse(ListIndexesIterable indexes) + { + ImmutableList.Builder builder = ImmutableList.builder(); + for (Document index : indexes) { + // TODO: v, ns, sparse fields + Document key = (Document) index.get("key"); + String name = index.getString("name"); + boolean unique = index.getBoolean("unique", false); + + if (key.containsKey("_fts")) { // Full Text Search + continue; + } + builder.add(new MongoIndex(name, parseKey(key), unique)); + } + + return builder.build(); + } + + private static List parseKey(Document key) + { + ImmutableList.Builder builder = ImmutableList.builder(); + + for (String name : key.keySet()) { + Object value = key.get(name); + if (value instanceof Number) { + int order = ((Number) value).intValue(); + checkState(order == 1 || order == -1, "Unknown index sort order"); + builder.add(new MongodbIndexKey(name, order == 1 ? SortOrder.ASC_NULLS_LAST : SortOrder.DESC_NULLS_LAST)); + } + else if (value instanceof String) { + builder.add(new MongodbIndexKey(name, (String) value)); + } + else { + throw new UnsupportedOperationException("Unknown index type: " + value.toString()); + } + } + + return builder.build(); + } + + public MongoIndex(String name, List keys, boolean unique) + { + this.name = name; + this.keys = keys; + this.unique = unique; + } + + public String getName() + { + return name; + } + + public List getKeys() + { + return keys; + } + + public boolean isUnique() + { + return unique; + } + + public static class MongodbIndexKey + { + private final String name; + private final Optional sortOrder; + private final Optional type; + + public MongodbIndexKey(String name, SortOrder sortOrder) + { + this(name, Optional.of(sortOrder), Optional.empty()); + } + + public MongodbIndexKey(String name, String type) + { + this(name, Optional.empty(), Optional.of(type)); + } + + public MongodbIndexKey(String name, Optional sortOrder, Optional type) + { + this.name = requireNonNull(name, "name is null"); + this.sortOrder = sortOrder; + this.type = type; + } + + public String getName() + { + return name; + } + + public Optional getSortOrder() + { + return sortOrder; + } + + public Optional getType() + { + return type; + } + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java new file mode 100644 index 000000000000..413780d640a7 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class MongoInsertTableHandle + implements ConnectorInsertTableHandle +{ + private final String connectorId; + private final SchemaTableName schemaTableName; + private final List columns; + + @JsonCreator + public MongoInsertTableHandle(@JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("columns") List columns) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public List getColumns() + { + return columns; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java new file mode 100644 index 000000000000..e164704ecc8d --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.mongodb.MongoIndex.MongodbIndexKey; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorMetadata; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.LocalProperty; +import com.facebook.presto.spi.NotFoundException; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.SortingProperty; +import com.facebook.presto.spi.TableNotFoundException; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.airlift.slice.Slice; + +import javax.inject.Inject; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.mongodb.MongoColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME; +import static com.facebook.presto.mongodb.TypeUtils.checkType; +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; + +public class MongoMetadata + implements ConnectorMetadata +{ + private static final Logger log = Logger.get(MongoMetadata.class); + + private final String connectorId; + private final MongoSession mongoSession; + + @Inject + public MongoMetadata(MongoConnectorId connectorId, + MongoSession mongoSession) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); + this.mongoSession = requireNonNull(mongoSession, "mongoSession is null"); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return mongoSession.getAllSchemas(); + } + + @Override + public MongoTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + requireNonNull(tableName, "tableName is null"); + try { + return mongoSession.getTable(tableName).getTableHandle(); + } + catch (TableNotFoundException e) { + log.debug(e, "Table(%s) not found", tableName); + return null; + } + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) + { + requireNonNull(tableHandle, "tableHandle is null"); + SchemaTableName tableName = getTableName(tableHandle); + return getTableMetadata(session, tableName); + } + + @Override + public List listTables(ConnectorSession session, String schemaNameOrNull) + { + ImmutableList.Builder tableNames = ImmutableList.builder(); + + for (String schemaName : listSchemas(session, schemaNameOrNull)) { + for (String tableName : mongoSession.getAllTables(schemaName)) { + tableNames.add(new SchemaTableName(schemaName, tableName.toLowerCase(ENGLISH))); + } + } + return tableNames.build(); + } + + @Override + public ColumnHandle getSampleWeightColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return getColumnHandles(tableHandle, true).get(SAMPLE_WEIGHT_COLUMN_NAME); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return getColumnHandles(tableHandle, false); + } + + private Map getColumnHandles(ConnectorTableHandle tableHandle, boolean includeSampleWeight) + { + MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle"); + List columns = mongoSession.getTable(table.getSchemaTableName()).getColumns(); + + ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + for (MongoColumnHandle columnHandle : columns) { + if (includeSampleWeight || !columnHandle.getName().equals(SAMPLE_WEIGHT_COLUMN_NAME)) { + columnHandles.put(columnHandle.getName(), columnHandle); + } + } + return columnHandles.build(); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + requireNonNull(prefix, "prefix is null"); + ImmutableMap.Builder> columns = ImmutableMap.builder(); + for (SchemaTableName tableName : listTables(session, prefix)) { + try { + columns.put(tableName, getTableMetadata(session, tableName).getColumns()); + } + catch (NotFoundException e) { + // table disappeared during listing operation + } + } + return columns.build(); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + checkType(tableHandle, MongoTableHandle.class, "tableHandle"); + return checkType(columnHandle, MongoColumnHandle.class, "columnHandle").toColumnMetadata(); + } + + @Override + public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) + { + MongoTableHandle tableHandle = checkType(table, MongoTableHandle.class, "table"); + + Optional> partitioningColumns = Optional.empty(); //TODO: sharding key + ImmutableList.Builder> localProperties = ImmutableList.builder(); + + MongoTable tableInfo = mongoSession.getTable(tableHandle.getSchemaTableName()); + Map columns = getColumnHandles(session, tableHandle); + + for (MongoIndex index : tableInfo.getIndexes()) { + for (MongodbIndexKey key : index.getKeys()) { + if (!key.getSortOrder().isPresent()) { + continue; + } + if (columns.get(key.getName()) != null) { + localProperties.add(new SortingProperty<>(columns.get(key.getName()), key.getSortOrder().get())); + } + } + } + + ConnectorTableLayout layout = new ConnectorTableLayout( + new MongoTableLayoutHandle(tableHandle, constraint.getSummary()), + Optional.empty(), + TupleDomain.all(), + Optional.empty(), + partitioningColumns, + Optional.empty(), + localProperties.build()); + + return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + MongoTableLayoutHandle layout = checkType(handle, MongoTableLayoutHandle.class, "layout"); + + // tables in this connector have a single layout + return getTableLayouts(session, layout.getTable(), Constraint.alwaysTrue(), Optional.empty()) + .get(0) + .getTableLayout(); + } + + @Override + public boolean canCreateSampledTables(ConnectorSession session) + { + return true; + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + mongoSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata)); + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle"); + + mongoSession.dropTable(table.getSchemaTableName()); + } + + @Override + public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) + { + throw new UnsupportedOperationException(); + } + + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + List columns = buildColumnHandles(tableMetadata); + + mongoSession.createTable(tableMetadata.getTable(), columns); + + return new MongoOutputTableHandle(connectorId, + tableMetadata.getTable(), + columns.stream().filter(c -> !c.isHidden()).collect(toList())); + } + + @Override + public void commitCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) + { + } + + @Override + public void rollbackCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle) + { + MongoOutputTableHandle table = checkType(tableHandle, MongoOutputTableHandle.class, "tableHandle"); + mongoSession.dropTable(table.getSchemaTableName()); + } + + @Override + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) + { + MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle"); + List columns = mongoSession.getTable(table.getSchemaTableName()).getColumns(); + + return new MongoInsertTableHandle(connectorId, + table.getSchemaTableName(), + columns.stream().filter(c -> !c.isHidden()).collect(toList())); + } + + @Override + public void commitInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments) + { + } + + private static SchemaTableName getTableName(ConnectorTableHandle tableHandle) + { + return checkType(tableHandle, MongoTableHandle.class, "tableHandle").getSchemaTableName(); + } + + private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName) + { + MongoTableHandle tableHandle = mongoSession.getTable(tableName).getTableHandle(); + + List columns = ImmutableList.copyOf( + getColumnHandles(session, tableHandle).values().stream() + .map(MongoColumnHandle.class::cast) + .map(MongoColumnHandle::toColumnMetadata) + .collect(toList())); + + return new ConnectorTableMetadata(tableName, columns); + } + + private List listSchemas(ConnectorSession session, String schemaNameOrNull) + { + if (schemaNameOrNull == null) { + return listSchemaNames(session); + } + return ImmutableList.of(schemaNameOrNull); + } + + private List listTables(ConnectorSession session, SchemaTablePrefix prefix) + { + if (prefix.getSchemaName() == null) { + return listTables(session, prefix.getSchemaName()); + } + return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); + } + + private List buildColumnHandles(ConnectorTableMetadata tableMetadata) + { + return tableMetadata.getColumns().stream() + .map(m -> new MongoColumnHandle(connectorId, m.getName(), m.getType(), m.isHidden())) + .collect(toList()); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("connectorId", connectorId) + .toString(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java new file mode 100644 index 000000000000..d542f028a849 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class MongoOutputTableHandle + implements ConnectorOutputTableHandle +{ + private final String connectorId; + private final SchemaTableName schemaTableName; + private final List columns; + + @JsonCreator + public MongoOutputTableHandle(@JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("columns") List columns) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public List getColumns() + { + return columns; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java new file mode 100644 index 000000000000..d6db05ccb0d7 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java @@ -0,0 +1,207 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorPageSink; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.type.NamedTypeSignature; +import com.facebook.presto.spi.type.SqlDate; +import com.facebook.presto.spi.type.SqlTime; +import com.facebook.presto.spi.type.SqlTimestamp; +import com.facebook.presto.spi.type.SqlTimestampWithTimeZone; +import com.facebook.presto.spi.type.SqlVarbinary; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeSignatureParameter; +import com.google.common.collect.ImmutableList; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.InsertManyOptions; +import io.airlift.slice.Slice; +import org.bson.Document; +import org.bson.types.Binary; +import org.bson.types.ObjectId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; +import static com.facebook.presto.mongodb.TypeUtils.containsType; +import static com.facebook.presto.mongodb.TypeUtils.isArrayType; +import static com.facebook.presto.mongodb.TypeUtils.isMapType; +import static com.facebook.presto.mongodb.TypeUtils.isRowType; +import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY; +import static java.util.stream.Collectors.toList; + +public class MongoPageSink + implements ConnectorPageSink +{ + private final MongoSession mongoSession; + private final ConnectorSession session; + private final SchemaTableName schemaTableName; + private final List columns; + private final List requireTranslate; + private final String implicitPrefix; + + public MongoPageSink(MongoClientConfig config, + MongoSession mongoSession, + ConnectorSession session, + SchemaTableName schemaTableName, + List columns) + { + this.mongoSession = mongoSession; + this.session = session; + this.schemaTableName = schemaTableName; + this.columns = columns; + this.requireTranslate = columns.stream() + .map(c -> containsType(c.getType(), TypeUtils::isDateType, + TypeUtils::isMapType, TypeUtils::isRowType, + OBJECT_ID::equals, VARBINARY::equals)) + .collect(toList()); + this.implicitPrefix = config.getImplicitRowFieldPrefix(); + } + + @Override + public CompletableFuture appendPage(Page page, Block sampleWeightBlock) + { + // TODO: handle sampleWeightBlock + MongoCollection collection = mongoSession.getCollection(schemaTableName); + List batch = new ArrayList<>(page.getPositionCount()); + + for (int position = 0; position < page.getPositionCount(); position++) { + Document doc = new Document(); + + for (int channel = 0; channel < page.getChannelCount(); channel++) { + MongoColumnHandle column = columns.get(channel); + doc.append(column.getName(), getObjectValue(columns.get(channel).getType(), page.getBlock(channel), position, requireTranslate.get(channel))); + } + batch.add(doc); + } + + collection.insertMany(batch, new InsertManyOptions().ordered(true)); + return NOT_BLOCKED; + } + + private Object getObjectValue(Type type, Block block, int position, boolean translate) + { + if (block.isNull(position)) { + return null; + } + + Object value = type.getObjectValue(session, block, position); + + if (translate) { + value = translateValue(type, value); + } + + return value; + } + + private Object translateValue(Type type, Object value) + { + if (type.equals(OBJECT_ID)) { + value = value == null ? new ObjectId() : new ObjectId(((SqlVarbinary) value).getBytes()); + } + + if (value == null) { + return null; + } + + if (type.getJavaType() == long.class) { + if (value instanceof SqlDate) { + return new Date(TimeUnit.DAYS.toMillis(((SqlDate) value).getDays())); + } + if (value instanceof SqlTime) { + return new Date(((SqlTime) value).getMillisUtc()); + } + if (value instanceof SqlTimestamp) { + return new Date(((SqlTimestamp) value).getMillisUtc()); + } + if (value instanceof SqlTimestampWithTimeZone) { + return new Date(((SqlTimestampWithTimeZone) value).getMillisUtc()); + } + } + else if (type.getJavaType() == Slice.class) { + if (type.equals(VARBINARY)) { + value = new Binary(((SqlVarbinary) value).getBytes()); + } + } + else if (type.getJavaType() == Block.class) { + if (isArrayType(type)) { + value = ((List) value).stream() + .map(v -> translateValue(type.getTypeParameters().get(0), v)) + .collect(toList()); + } + else if (isMapType(type)) { + // map type is converted into list of fixed keys document + ImmutableList.Builder> builder = ImmutableList.builder(); + for (Map.Entry entry : ((Map) value).entrySet()) { + Map mapValue = new HashMap<>(); + mapValue.put("key", translateValue(type.getTypeParameters().get(0), entry.getKey())); + mapValue.put("value", translateValue(type.getTypeParameters().get(1), entry.getValue())); + + builder.add(mapValue); + } + value = builder.build(); + } + else if (isRowType(type)) { + List fieldValues = (List) value; + if (isImplicitRowType(type)) { + ArrayList rowValue = new ArrayList<>(); + for (int index = 0; index < fieldValues.size(); index++) { + rowValue.add(translateValue(type.getTypeParameters().get(index), fieldValues.get(index))); + } + value = rowValue; + } + else { + HashMap rowValue = new HashMap<>(); + for (int index = 0; index < fieldValues.size(); index++) { + rowValue.put(type.getTypeSignature().getParameters().get(index).getNamedTypeSignature().getName(), + translateValue(type.getTypeParameters().get(index), fieldValues.get(index))); + } + value = rowValue; + } + } + } + + return value; + } + + private boolean isImplicitRowType(Type type) + { + return type.getTypeSignature().getParameters() + .stream() + .map(TypeSignatureParameter::getNamedTypeSignature) + .map(NamedTypeSignature::getName) + .allMatch(name -> name.startsWith(implicitPrefix)); + } + + @Override + public Collection finish() + { + return ImmutableList.of(); + } + + @Override + public void abort() + { + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSinkProvider.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSinkProvider.java new file mode 100644 index 000000000000..3654afeb8858 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSinkProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorPageSink; +import com.facebook.presto.spi.ConnectorPageSinkProvider; +import com.facebook.presto.spi.ConnectorSession; +import com.google.inject.Inject; + +import static com.facebook.presto.mongodb.TypeUtils.checkType; + +public class MongoPageSinkProvider + implements ConnectorPageSinkProvider +{ + private final MongoClientConfig config; + private final MongoSession mongoSession; + + @Inject + public MongoPageSinkProvider(MongoClientConfig config, MongoSession mongoSession) + { + this.config = config; + this.mongoSession = mongoSession; + } + + @Override + public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorOutputTableHandle outputTableHandle) + { + MongoOutputTableHandle handle = checkType(outputTableHandle, MongoOutputTableHandle.class, "outputTableHandle"); + return new MongoPageSink(config, mongoSession, session, handle.getSchemaTableName(), handle.getColumns()); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorInsertTableHandle insertTableHandle) + { + MongoInsertTableHandle handle = checkType(insertTableHandle, MongoInsertTableHandle.class, "insertTableHandle"); + return new MongoPageSink(config, mongoSession, session, handle.getSchemaTableName(), handle.getColumns()); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java new file mode 100644 index 000000000000..5be7af41614b --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSource.java @@ -0,0 +1,298 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PageBuilder; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.block.InterleavedBlockBuilder; +import com.facebook.presto.spi.type.NamedTypeSignature; +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeSignatureParameter; +import com.mongodb.client.MongoCursor; +import io.airlift.slice.Slice; +import org.bson.Document; +import org.bson.types.Binary; +import org.bson.types.ObjectId; +import org.joda.time.chrono.ISOChronology; + +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; +import static com.facebook.presto.mongodb.TypeUtils.isArrayType; +import static com.facebook.presto.mongodb.TypeUtils.isMapType; +import static com.facebook.presto.mongodb.TypeUtils.isRowType; +import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.DateType.DATE; +import static com.facebook.presto.spi.type.IntegerType.INTEGER; +import static com.facebook.presto.spi.type.TimeType.TIME; +import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY; +import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.Slices.utf8Slice; +import static io.airlift.slice.Slices.wrappedBuffer; +import static java.util.stream.Collectors.toList; +import static org.joda.time.DateTimeZone.UTC; + +public class MongoPageSource + implements ConnectorPageSource +{ + private static final ISOChronology UTC_CHRONOLOGY = ISOChronology.getInstance(UTC); + private static final int ROWS_PER_REQUEST = 1024; + + private final MongoCursor cursor; + private final List columnNames; + private final List columnTypes; + private Document currentDoc; + private long count; + private long totalCount; + private boolean finished; + + public MongoPageSource( + MongoSession mongoSession, + MongoSplit split, + List columns) + { + this.columnNames = columns.stream().map(MongoColumnHandle::getName).collect(toList()); + this.columnTypes = columns.stream().map(MongoColumnHandle::getType).collect(toList()); + this.cursor = mongoSession.execute(split, columns); + currentDoc = null; + } + + @Override + public long getTotalBytes() + { + return totalCount; + } + + @Override + public long getCompletedBytes() + { + return count; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return finished; + } + + @Override + public long getSystemMemoryUsage() + { + return 0L; + } + + @Override + public Page getNextPage() + { + PageBuilder pageBuilder = new PageBuilder(columnTypes); + + count = 0; + for (int i = 0; i < ROWS_PER_REQUEST; i++) { + if (!cursor.hasNext()) { + finished = true; + break; + } + currentDoc = cursor.next(); + count++; + + pageBuilder.declarePosition(); + for (int column = 0; column < columnTypes.size(); column++) { + BlockBuilder output = pageBuilder.getBlockBuilder(column); + appendTo(columnTypes.get(column), currentDoc.get(columnNames.get(column)), output); + } + } + + totalCount += count; + return pageBuilder.build(); + } + + private void appendTo(Type type, Object value, BlockBuilder output) + { + if (value == null) { + output.appendNull(); + return; + } + + Class javaType = type.getJavaType(); + try { + if (javaType == boolean.class) { + type.writeBoolean(output, (Boolean) value); + } + else if (javaType == long.class) { + if (type.equals(BIGINT)) { + type.writeLong(output, ((Number) value).longValue()); + } + else if (type.equals(INTEGER)) { + type.writeLong(output, ((Number) value).intValue()); + } + else if (type.equals(DATE)) { + long utcMillis = ((Date) value).getTime(); + type.writeLong(output, TimeUnit.MILLISECONDS.toDays(utcMillis)); + } + else if (type.equals(TIME)) { + type.writeLong(output, UTC_CHRONOLOGY.millisOfDay().get(((Date) value).getTime())); + } + else if (type.equals(TIMESTAMP)) { + type.writeLong(output, ((Date) value).getTime()); + } + else { + throw new PrestoException(INTERNAL_ERROR, "Unhandled type for " + javaType.getSimpleName() + ":" + type.getTypeSignature()); + } + } + else if (javaType == double.class) { + type.writeDouble(output, ((Number) value).doubleValue()); + } + else if (javaType == Slice.class) { + writeSlice(output, type, value); + } + else if (javaType == Block.class) { + writeBlock(output, type, value); + } + else { + throw new PrestoException(INTERNAL_ERROR, "Unhandled type for " + javaType.getSimpleName() + ":" + type.getTypeSignature()); + } + } + catch (ClassCastException ignore) { + // returns null instead of raising exception + output.appendNull(); + } + } + + private void writeSlice(BlockBuilder output, Type type, Object value) + { + String base = type.getTypeSignature().getBase(); + if (base.equals(StandardTypes.VARCHAR)) { + type.writeSlice(output, utf8Slice(value.toString())); + } + else if (type.equals(OBJECT_ID)) { + type.writeSlice(output, wrappedBuffer(((ObjectId) value).toByteArray())); + } + else if (type.equals(VARBINARY)) { + if (value instanceof Binary) { + type.writeSlice(output, wrappedBuffer(((Binary) value).getData())); + } + else { + output.appendNull(); + } + } + else { + throw new PrestoException(INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); + } + } + + private void writeBlock(BlockBuilder output, Type type, Object value) + { + if (isArrayType(type)) { + if (value instanceof List) { + BlockBuilder builder = createParametersBlockBuilder(type, ((List) value).size()); + + ((List) value).forEach(element -> + appendTo(type.getTypeParameters().get(0), element, builder)); + + type.writeObject(output, builder.build()); + return; + } + } + else if (isMapType(type)) { + if (value instanceof List) { + BlockBuilder builder = createParametersBlockBuilder(type, ((List) value).size()); + for (Object element : (List) value) { + if (!(element instanceof Map)) { + continue; + } + + Map document = (Map) element; + if (document.containsKey("key") && document.containsKey("value")) { + appendTo(type.getTypeParameters().get(0), document.get("key"), builder); + appendTo(type.getTypeParameters().get(1), document.get("value"), builder); + } + } + + type.writeObject(output, builder.build()); + return; + } + } + else if (isRowType(type)) { + if (value instanceof Map) { + Map mapValue = (Map) value; + BlockBuilder builder = createParametersBlockBuilder(type, mapValue.size()); + List fieldNames = type.getTypeSignature().getParameters().stream() + .map(TypeSignatureParameter::getNamedTypeSignature) + .map(NamedTypeSignature::getName) + .collect(Collectors.toList()); + checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type); + for (int index = 0; index < type.getTypeParameters().size(); index++) { + appendTo(type.getTypeParameters().get(index), mapValue.get(fieldNames.get(index).toString()), builder); + } + type.writeObject(output, builder.build()); + return; + } + else if (value instanceof List) { + List listValue = (List) value; + BlockBuilder builder = createParametersBlockBuilder(type, listValue.size()); + for (int index = 0; index < type.getTypeParameters().size(); index++) { + if (index < listValue.size()) { + appendTo(type.getTypeParameters().get(index), listValue.get(index), builder); + } + else { + builder.appendNull(); + } + } + type.writeObject(output, builder.build()); + return; + } + } + else { + throw new PrestoException(INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); + } + + // not a convertible value + output.appendNull(); + } + + private BlockBuilder createParametersBlockBuilder(Type type, int size) + { + List params = type.getTypeParameters(); + if (isArrayType(type)) { + return params.get(0).createBlockBuilder(new BlockBuilderStatus(), size); + } + + return new InterleavedBlockBuilder(params, new BlockBuilderStatus(), size * params.size()); + } + + @Override + public void close() throws IOException + { + cursor.close(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSourceProvider.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSourceProvider.java new file mode 100644 index 000000000000..ec52cdf33d52 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSourceProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorPageSourceProvider; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; + +import java.util.List; + +import static com.facebook.presto.mongodb.TypeUtils.checkType; +import static java.util.Objects.requireNonNull; + +public class MongoPageSourceProvider + implements ConnectorPageSourceProvider +{ + private final String connectorId; + private final MongoSession mongoSession; + + @Inject + public MongoPageSourceProvider(MongoConnectorId connectorId, MongoSession mongoSession) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); + this.mongoSession = requireNonNull(mongoSession, "mongoSession is null"); + } + + @Override + public ConnectorPageSource createPageSource(ConnectorSession session, ConnectorSplit split, List columns) + { + MongoSplit mongodbSplit = checkType(split, MongoSplit.class, "split"); + + ImmutableList.Builder handles = ImmutableList.builder(); + for (ColumnHandle handle : requireNonNull(columns, "columns is null")) { + handles.add(checkType(handle, MongoColumnHandle.class, "columnHandle")); + } + + return new MongoPageSource(mongoSession, mongodbSplit, handles.build()); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPlugin.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPlugin.java new file mode 100644 index 000000000000..ea73b2808895 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPlugin.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.metadata.FunctionFactory; +import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeManager; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; + +import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; +import static java.util.Objects.requireNonNull; + +public class MongoPlugin + implements Plugin +{ + private Map optionalConfig = ImmutableMap.of(); + private TypeManager typeManager; + + @Override + public void setOptionalConfig(Map optionalConfig) + { + this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, "optionalConfig is null")); + } + + @Inject + public synchronized void setTypeManager(TypeManager typeManager) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public List getServices(Class type) + { + if (type == ConnectorFactory.class) { + return ImmutableList.of(type.cast(new MongoConnectorFactory("mongodb", typeManager, optionalConfig))); + } + if (type == Type.class) { + return ImmutableList.of(type.cast(OBJECT_ID)); + } + if (type == FunctionFactory.class) { + return ImmutableList.of(type.cast(new MongoFunctionFactory(typeManager))); + } + return ImmutableList.of(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSession.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSession.java new file mode 100644 index 000000000000..d6225c574c53 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSession.java @@ -0,0 +1,576 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.SchemaNotFoundException; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableNotFoundException; +import com.facebook.presto.spi.predicate.Domain; +import com.facebook.presto.spi.predicate.Range; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.type.NamedTypeSignature; +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.spi.type.TypeSignature; +import com.facebook.presto.spi.type.TypeSignatureParameter; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.mongodb.MongoClient; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.result.DeleteResult; +import io.airlift.log.Logger; +import io.airlift.slice.Slice; +import org.bson.Document; +import org.bson.types.ObjectId; + +import javax.inject.Inject; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; +import static com.facebook.presto.mongodb.TypeUtils.checkType; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; +import static com.facebook.presto.spi.type.DoubleType.DOUBLE; +import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +public class MongoSession +{ + private static final Logger log = Logger.get(MongoSession.class); + private static final List SYSTEM_TABLES = Arrays.asList("system.indexes", "system.users", "system.version"); + + private static final String TABLE_NAME_KEY = "table"; + private static final String FIELDS_KEY = "fields"; + private static final String FIELDS_NAME_KEY = "name"; + private static final String FIELDS_TYPE_KEY = "type"; + private static final String FIELDS_HIDDEN_KEY = "hidden"; + + private static final String OR_OP = "$or"; + private static final String AND_OP = "$and"; + private static final String NOT_OP = "$not"; + private static final String NOR_OP = "$nor"; + + private static final String EQ_OP = "$eq"; + private static final String NOT_EQ_OP = "$ne"; + private static final String EXISTS_OP = "$exists"; + private static final String GTE_OP = "$gte"; + private static final String GT_OP = "$gt"; + private static final String LT_OP = "$lt"; + private static final String LTE_OP = "$lte"; + private static final String IN_OP = "$in"; + private static final String NOTIN_OP = "$nin"; + + protected final String connectorId; + + private final TypeManager typeManager; + private final MongoClient client; + + private final String schemaCollection; + private final int cursorBatchSize; + + private final LoadingCache tableCache; + private final String implicitPrefix; + + @Inject + public MongoSession(TypeManager typeManager, + String connectorId, + final MongoClient client, + MongoClientConfig config) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.client = requireNonNull(client, "client is null"); + this.schemaCollection = config.getSchemaCollection(); + this.cursorBatchSize = config.getCursorBatchSize(); + this.implicitPrefix = config.getImplicitRowFieldPrefix(); + + this.tableCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, HOURS) // TODO: Configure + .refreshAfterWrite(1, MINUTES) + .build(new CacheLoader() + { + @Override + public MongoTable load(SchemaTableName key) + throws TableNotFoundException + { + return loadTableSchema(key); + } + }); + } + + public void shutdown() + { + client.close(); + } + + public List getAllSchemas() + { + return ImmutableList.copyOf(client.listDatabaseNames()); + } + + public Set getAllTables(String schema) + throws SchemaNotFoundException + { + ImmutableSet.Builder builder = ImmutableSet.builder(); + + builder.addAll(ImmutableList.copyOf(client.getDatabase(schema).listCollectionNames()).stream() + .filter(name -> !name.equals(schemaCollection)) + .filter(name -> !SYSTEM_TABLES.contains(name)) + .collect(toSet())); + builder.addAll(getTableMetadataNames(schema)); + + return builder.build(); + } + + public MongoTable getTable(SchemaTableName tableName) + throws TableNotFoundException + { + return getCacheValue(tableCache, tableName, TableNotFoundException.class); + } + + public void createTable(SchemaTableName name, List columns) + { + createTableMetadata(name, columns); + // collection is created implicitly + } + + public void dropTable(SchemaTableName tableName) + { + deleteTableMetadata(tableName); + getCollection(tableName).drop(); + + tableCache.invalidate(tableName); + } + + private MongoTable loadTableSchema(SchemaTableName tableName) + throws TableNotFoundException + { + Document tableMeta = getTableMetadata(tableName); + + ImmutableList.Builder columnHandles = ImmutableList.builder(); + + for (Document columnMetadata : getColumnMetadata(tableMeta)) { + MongoColumnHandle columnHandle = buildColumnHandle(columnMetadata); + columnHandles.add(columnHandle); + } + + MongoTableHandle tableHandle = new MongoTableHandle(connectorId, tableName); + return new MongoTable(tableHandle, columnHandles.build(), getIndexes(tableName)); + } + + private MongoColumnHandle buildColumnHandle(Document columnMeta) + { + String name = columnMeta.getString(FIELDS_NAME_KEY); + String typeString = columnMeta.getString(FIELDS_TYPE_KEY); + boolean hidden = columnMeta.getBoolean(FIELDS_HIDDEN_KEY, false); + + Type type = typeManager.getType(TypeSignature.parseTypeSignature(typeString)); + + return new MongoColumnHandle(connectorId, name, type, hidden); + } + + private List getColumnMetadata(Document doc) + { + if (!doc.containsKey(FIELDS_KEY)) { + return ImmutableList.of(); + } + + return (List) doc.get(FIELDS_KEY); + } + + public MongoCollection getCollection(SchemaTableName tableName) + { + return getCollection(tableName.getSchemaName(), tableName.getTableName()); + } + + private MongoCollection getCollection(String schema, String table) + { + return client.getDatabase(schema).getCollection(table); + } + + public List getIndexes(SchemaTableName tableName) + { + return MongoIndex.parse(getCollection(tableName).listIndexes()); + } + + private static V getCacheValue(LoadingCache cache, K key, Class exceptionClass) + throws E + { + try { + return cache.get(key); + } + catch (ExecutionException | UncheckedExecutionException e) { + Throwable t = e.getCause(); + Throwables.propagateIfInstanceOf(t, exceptionClass); + throw Throwables.propagate(t); + } + } + + public MongoCursor execute(MongoSplit split, List columns) + { + Document output = new Document(); + for (MongoColumnHandle column : columns) { + output.append(column.getName(), 1); + } + MongoCollection collection = getCollection(split.getSchemaTableName()); + FindIterable iterable = collection.find(buildQuery(split.getTupleDomain())).projection(output); + + if (cursorBatchSize != 0) { + iterable.batchSize(cursorBatchSize); + } + + return iterable.iterator(); + } + + @VisibleForTesting + static Document buildQuery(TupleDomain tupleDomain) + { + Document query = new Document(); + if (tupleDomain.getDomains().isPresent()) { + for (Map.Entry entry : tupleDomain.getDomains().get().entrySet()) { + MongoColumnHandle column = checkType(entry.getKey(), MongoColumnHandle.class, "columnHandle"); + query.putAll(buildPredicate(column, entry.getValue())); + } + } + + return query; + } + + private static Document buildPredicate(MongoColumnHandle column, Domain domain) + { + String name = column.getName(); + if (domain.getValues().isNone() && domain.isNullAllowed()) { + return documentOf(name, isNullPredicate()); + } + if (domain.getValues().isAll() && !domain.isNullAllowed()) { + return documentOf(name, isNotNullPredicate()); + } + + List singleValues = new ArrayList<>(); + List disjuncts = new ArrayList<>(); + for (Range range : domain.getValues().getRanges().getOrderedRanges()) { + if (range.isSingleValue()) { + singleValues.add(range.getSingleValue()); + } + else { + Document rangeConjuncts = new Document(); + if (!range.getLow().isLowerUnbounded()) { + switch (range.getLow().getBound()) { + case ABOVE: + rangeConjuncts.put(GT_OP, range.getLow().getValue()); + break; + case EXACTLY: + rangeConjuncts.put(GTE_OP, range.getLow().getValue()); + break; + case BELOW: + throw new IllegalArgumentException("Low Marker should never use BELOW bound: " + range); + default: + throw new AssertionError("Unhandled bound: " + range.getLow().getBound()); + } + } + if (!range.getHigh().isUpperUnbounded()) { + switch (range.getHigh().getBound()) { + case ABOVE: + throw new IllegalArgumentException("High Marker should never use ABOVE bound: " + range); + case EXACTLY: + rangeConjuncts.put(LTE_OP, range.getHigh().getValue()); + break; + case BELOW: + rangeConjuncts.put(LT_OP, range.getHigh().getValue()); + break; + default: + throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); + } + } + // If rangeConjuncts is null, then the range was ALL, which should already have been checked for + verify(!rangeConjuncts.isEmpty()); + disjuncts.add(rangeConjuncts); + } + } + + // Add back all of the possible single values either as an equality or an IN predicate + if (singleValues.size() == 1) { + disjuncts.add(documentOf(EQ_OP, translateValue(singleValues.get(0)))); + } + else if (singleValues.size() > 1) { + disjuncts.add(documentOf(IN_OP, singleValues.stream() + .map(MongoSession::translateValue) + .collect(toList()))); + } + + if (domain.isNullAllowed()) { + disjuncts.add(isNullPredicate()); + } + + return orPredicate(disjuncts.stream() + .map(disjunct -> new Document(name, disjunct)) + .collect(toList())); + } + + private static Object translateValue(Object source) + { + if (source instanceof Slice) { + return ((Slice) source).toStringUtf8(); + } + + return source; + } + + private static Document documentOf(String key, Object value) + { + return new Document(key, value); + } + + private static Document orPredicate(List values) + { + checkState(!values.isEmpty()); + if (values.size() == 1) { + return values.get(0); + } + return new Document(OR_OP, values); + } + + private static Document isNullPredicate() + { + return documentOf(EXISTS_OP, true).append(EQ_OP, null); + } + + private static Document isNotNullPredicate() + { + return documentOf(NOT_EQ_OP, null); + } + + // Internal Schema management + private Document getTableMetadata(SchemaTableName schemaTableName) + throws TableNotFoundException + { + String schemaName = schemaTableName.getSchemaName(); + String tableName = schemaTableName.getTableName(); + + MongoDatabase db = client.getDatabase(schemaName); + MongoCollection schema = db.getCollection(schemaCollection); + + Document doc = schema + .find(new Document(TABLE_NAME_KEY, tableName)).first(); + + if (doc == null) { + if (!collectionExists(db, tableName)) { + throw new TableNotFoundException(schemaTableName); + } + else { + Document metadata = new Document(TABLE_NAME_KEY, tableName); + metadata.append(FIELDS_KEY, guessTableFields(schemaTableName)); + + schema.createIndex(new Document(TABLE_NAME_KEY, 1), new IndexOptions().unique(true)); + schema.insertOne(metadata); + + return metadata; + } + } + + return doc; + } + + public boolean collectionExists(MongoDatabase db, String collectionName) + { + for (String name : db.listCollectionNames()) { + if (name.equalsIgnoreCase(collectionName)) { + return true; + } + } + return false; + } + + private Set getTableMetadataNames(String schemaName) + throws TableNotFoundException + { + MongoDatabase db = client.getDatabase(schemaName); + MongoCursor cursor = db.getCollection(schemaCollection) + .find().projection(new Document(TABLE_NAME_KEY, true)).iterator(); + + HashSet names = new HashSet<>(); + while (cursor.hasNext()) { + names.add((cursor.next()).getString(TABLE_NAME_KEY)); + } + + return names; + } + + private void createTableMetadata(SchemaTableName schemaTableName, List columns) + throws TableNotFoundException + { + String schemaName = schemaTableName.getSchemaName(); + String tableName = schemaTableName.getTableName(); + + MongoDatabase db = client.getDatabase(schemaName); + Document metadata = new Document(TABLE_NAME_KEY, tableName); + + ArrayList fields = new ArrayList<>(); + if (!columns.stream().anyMatch(c -> c.getName().equals("_id"))) { + fields.add(new MongoColumnHandle(connectorId, "_id", OBJECT_ID, true).getDocument()); + } + + fields.addAll(columns.stream() + .map(MongoColumnHandle::getDocument) + .collect(toList())); + + metadata.append(FIELDS_KEY, fields); + + MongoCollection schema = db.getCollection(schemaCollection); + schema.createIndex(new Document(TABLE_NAME_KEY, 1), new IndexOptions().unique(true)); + schema.insertOne(metadata); + } + + private boolean deleteTableMetadata(SchemaTableName schemaTableName) + { + String schemaName = schemaTableName.getSchemaName(); + String tableName = schemaTableName.getTableName(); + + MongoDatabase db = client.getDatabase(schemaName); + if (!collectionExists(db, tableName)) { + return false; + } + + DeleteResult result = db.getCollection(schemaCollection) + .deleteOne(new Document(TABLE_NAME_KEY, tableName)); + + return result.getDeletedCount() == 1; + } + + private List guessTableFields(SchemaTableName schemaTableName) + { + String schemaName = schemaTableName.getSchemaName(); + String tableName = schemaTableName.getTableName(); + + MongoDatabase db = client.getDatabase(schemaName); + Document doc = db.getCollection(tableName).find().first(); + if (doc == null) { + // no records at the collection + return ImmutableList.of(); + } + + ImmutableList.Builder builder = ImmutableList.builder(); + + for (String key : doc.keySet()) { + Object value = doc.get(key); + Optional fieldType = guessFieldType(value); + if (fieldType.isPresent()) { + Document metadata = new Document(); + metadata.append(FIELDS_NAME_KEY, key); + metadata.append(FIELDS_TYPE_KEY, fieldType.get().toString()); + metadata.append(FIELDS_HIDDEN_KEY, + key.equals("_id") && fieldType.get().equals(OBJECT_ID.getTypeSignature())); + + builder.add(metadata); + } + else { + log.debug("Unable to guess field type from %s : %s", value.getClass().getName(), value); + } + } + + return builder.build(); + } + + private Optional guessFieldType(Object value) + { + TypeSignature typeSignature = null; + if (value instanceof String) { + typeSignature = VARCHAR.getTypeSignature(); + } + else if (value instanceof Integer || value instanceof Long) { + typeSignature = BIGINT.getTypeSignature(); + } + else if (value instanceof Boolean) { + typeSignature = BOOLEAN.getTypeSignature(); + } + else if (value instanceof Float || value instanceof Double) { + typeSignature = DOUBLE.getTypeSignature(); + } + else if (value instanceof Date) { + typeSignature = TIMESTAMP.getTypeSignature(); + } + else if (value instanceof ObjectId) { + typeSignature = OBJECT_ID.getTypeSignature(); + } + else if (value instanceof List) { + List> subTypes = ((List) value).stream() + .map(this::guessFieldType) + .collect(toList()); + + if (subTypes.isEmpty() || subTypes.stream().anyMatch(t -> !t.isPresent())) { + return Optional.empty(); + } + + Set signatures = subTypes.stream().map(t -> t.get()).collect(toSet()); + if (signatures.size() == 1) { + typeSignature = new TypeSignature(StandardTypes.ARRAY, signatures.stream() + .map(s -> TypeSignatureParameter.of(s)) + .collect(Collectors.toList())); + } + else { + // TODO: presto cli doesn't handle empty field name row type yet + typeSignature = new TypeSignature(StandardTypes.ROW, + IntStream.range(0, subTypes.size()) + .mapToObj(idx -> TypeSignatureParameter.of( + new NamedTypeSignature(String.format("%s%d", implicitPrefix, idx + 1), subTypes.get(idx).get()))) + .collect(toList())); + } + } + else if (value instanceof Document) { + List parameters = new ArrayList<>(); + + for (String key : ((Document) value).keySet()) { + Optional fieldType = guessFieldType(((Document) value).get(key)); + if (!fieldType.isPresent()) { + return Optional.empty(); + } + + parameters.add(TypeSignatureParameter.of(new NamedTypeSignature(key, fieldType.get()))); + } + typeSignature = new TypeSignature(StandardTypes.ROW, parameters); + } + + return Optional.ofNullable(typeSignature); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplit.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplit.java new file mode 100644 index 000000000000..9fdabe7dce7a --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplit.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class MongoSplit + implements ConnectorSplit +{ + private final String connectorId; + private final SchemaTableName schemaTableName; + private final TupleDomain tupleDomain; + private final List addresses; + + @JsonCreator + public MongoSplit( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("tupleDomain") TupleDomain tupleDomain, + @JsonProperty("addresses") List addresses) + { + this.connectorId = requireNonNull(connectorId, "connector id is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null"); + this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public TupleDomain getTupleDomain() + { + return tupleDomain; + } + + @Override + public boolean isRemotelyAccessible() + { + return true; + } + + @Override + @JsonProperty + public List getAddresses() + { + return addresses; + } + + @Override + public Object getInfo() + { + return this; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplitManager.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplitManager.java new file mode 100644 index 000000000000..8f3f99e3f9d5 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoSplitManager.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitManager; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.HostAddress; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.util.List; + +import static com.facebook.presto.mongodb.TypeUtils.checkType; +import static com.facebook.presto.spi.HostAddress.fromParts; +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; + +public class MongoSplitManager + implements ConnectorSplitManager +{ + private final String connectorId; + private final List addresses; + + @Inject + public MongoSplitManager(MongoConnectorId connectorId, + MongoClientConfig config) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); + + this.addresses = config.getSeeds().stream() + .map(s -> fromParts(s.getHost(), s.getPort())) + .collect(toList()); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLayoutHandle layout) + { + MongoTableLayoutHandle tableLayout = checkType(layout, MongoTableLayoutHandle.class, "layout"); + MongoTableHandle tableHandle = tableLayout.getTable(); + + MongoSplit split = new MongoSplit( + connectorId, + tableHandle.getSchemaTableName(), + tableLayout.getTupleDomain(), + addresses); + + return new FixedSplitSource(connectorId, ImmutableList.of(split)); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("clientId", connectorId) + .toString(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTable.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTable.java new file mode 100644 index 000000000000..909737e1e836 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTable.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class MongoTable +{ + private final MongoTableHandle tableHandle; + private final List columns; + private final List indexes; + + public MongoTable(MongoTableHandle tableHandle, List columns, List indexes) + { + this.tableHandle = tableHandle; + this.columns = ImmutableList.copyOf(columns); + this.indexes = ImmutableList.copyOf(indexes); + } + + public MongoTableHandle getTableHandle() + { + return tableHandle; + } + + public List getColumns() + { + return columns; + } + + public List getIndexes() + { + return indexes; + } + + @Override + public int hashCode() + { + return tableHandle.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (!(obj instanceof MongoTable)) { + return false; + } + MongoTable that = (MongoTable) obj; + return this.tableHandle.equals(that.tableHandle); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("tableHandle", tableHandle) + .toString(); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableHandle.java new file mode 100644 index 000000000000..a816125225b7 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableHandle.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class MongoTableHandle + implements ConnectorTableHandle +{ + private final String connectorId; + private final SchemaTableName schemaTableName; + + @JsonCreator + public MongoTableHandle( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, schemaTableName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + MongoTableHandle other = (MongoTableHandle) obj; + return Objects.equals(this.connectorId, other.connectorId) && + Objects.equals(this.schemaTableName, other.schemaTableName); + } + + @Override + public String toString() + { + return connectorId + ":" + schemaTableName; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableLayoutHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableLayoutHandle.java new file mode 100644 index 000000000000..1b8eadb3bb46 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoTableLayoutHandle.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class MongoTableLayoutHandle + implements ConnectorTableLayoutHandle +{ + private final MongoTableHandle table; + private final TupleDomain tupleDomain; + + @JsonCreator + public MongoTableLayoutHandle(@JsonProperty("table") MongoTableHandle table, + @JsonProperty("tupleDomain") TupleDomain tupleDomain) + { + this.table = requireNonNull(table, "table is null"); + this.tupleDomain = requireNonNull(tupleDomain, "tuple is null"); + } + + public String getConnectorId() + { + return table.getConnectorId(); + } + + @JsonProperty + public MongoTableHandle getTable() + { + return table; + } + + @JsonProperty + public TupleDomain getTupleDomain() + { + return tupleDomain; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MongoTableLayoutHandle that = (MongoTableLayoutHandle) o; + return Objects.equals(table, that.table) && + Objects.equals(tupleDomain, that.tupleDomain); + } + + @Override + public int hashCode() + { + return Objects.hash(table, tupleDomain); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdFunctions.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdFunctions.java new file mode 100644 index 000000000000..0f1acd43855f --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdFunctions.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.operator.Description; +import com.facebook.presto.operator.scalar.ScalarFunction; +import com.facebook.presto.operator.scalar.ScalarOperator; +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.type.SqlType; +import com.google.common.base.CharMatcher; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.bson.types.ObjectId; + +import static com.facebook.presto.metadata.OperatorType.BETWEEN; +import static com.facebook.presto.metadata.OperatorType.EQUAL; +import static com.facebook.presto.metadata.OperatorType.GREATER_THAN; +import static com.facebook.presto.metadata.OperatorType.GREATER_THAN_OR_EQUAL; +import static com.facebook.presto.metadata.OperatorType.HASH_CODE; +import static com.facebook.presto.metadata.OperatorType.LESS_THAN; +import static com.facebook.presto.metadata.OperatorType.LESS_THAN_OR_EQUAL; +import static com.facebook.presto.metadata.OperatorType.NOT_EQUAL; + +public class ObjectIdFunctions +{ + private ObjectIdFunctions() {} + + @Description("mongodb ObjectId") + @ScalarFunction("objectid") + @SqlType("ObjectId") + public static Slice ObjectId() + { + return Slices.wrappedBuffer(new ObjectId().toByteArray()); + } + + @Description("mongodb ObjectId from the given string") + @ScalarFunction("objectid") + @SqlType("ObjectId") + public static Slice ObjectId(@SqlType(StandardTypes.VARCHAR) Slice value) + { + return Slices.wrappedBuffer(new ObjectId(CharMatcher.is(' ').removeFrom(value.toStringUtf8())).toByteArray()); + } + + @ScalarOperator(EQUAL) + @SqlType(StandardTypes.BOOLEAN) + public static boolean equal(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return left.equals(right); + } + + @ScalarOperator(NOT_EQUAL) + @SqlType(StandardTypes.BOOLEAN) + public static boolean notEqual(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return !left.equals(right); + } + + @ScalarOperator(GREATER_THAN) + @SqlType(StandardTypes.BOOLEAN) + public static boolean greaterThan(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return compareTo(left, right) > 0; + } + + @ScalarOperator(GREATER_THAN_OR_EQUAL) + @SqlType(StandardTypes.BOOLEAN) + public static boolean greaterThanOrEqual(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return compareTo(left, right) >= 0; + } + + @ScalarOperator(LESS_THAN) + @SqlType(StandardTypes.BOOLEAN) + public static boolean lessThan(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return compareTo(left, right) < 0; + } + + @ScalarOperator(LESS_THAN_OR_EQUAL) + @SqlType(StandardTypes.BOOLEAN) + public static boolean lessThanOrEqual(@SqlType("ObjectId") Slice left, @SqlType("ObjectId") Slice right) + { + return compareTo(left, right) <= 0; + } + + @ScalarOperator(BETWEEN) + @SqlType(StandardTypes.BOOLEAN) + public static boolean between(@SqlType("ObjectId") Slice value, @SqlType("ObjectId") Slice min, @SqlType("ObjectId") Slice max) + { + return compareTo(value, min) >= 0 && compareTo(value, max) <= 0; + } + + @ScalarOperator(HASH_CODE) + @SqlType(StandardTypes.BIGINT) + public static long hashCode(@SqlType("ObjectId") Slice value) + { + return new ObjectId(value.getBytes()).hashCode(); + } + + private static int compareTo(Slice left, Slice right) + { + return new ObjectId(left.getBytes()).compareTo(new ObjectId(right.getBytes())); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdType.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdType.java new file mode 100644 index 000000000000..b4ceb6907281 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ObjectIdType.java @@ -0,0 +1,131 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.type.AbstractVariableWidthType; +import com.facebook.presto.spi.type.SqlVarbinary; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import io.airlift.slice.Slice; +import org.bson.types.ObjectId; + +import java.io.IOException; + +import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; + +public class ObjectIdType + extends AbstractVariableWidthType +{ + public static final ObjectIdType OBJECT_ID = new ObjectIdType(); + + @JsonCreator + public ObjectIdType() + { + super(parseTypeSignature("ObjectId"), Slice.class); + } + + @Override + public boolean isComparable() + { + return true; + } + + @Override + public boolean isOrderable() + { + return true; + } + + @Override + public Object getObjectValue(ConnectorSession session, Block block, int position) + { + if (block.isNull(position)) { + return null; + } + + // TODO: There's no way to represent string value of a custom type + return new SqlVarbinary(block.getSlice(position, 0, block.getLength(position)).getBytes()); + } + + @Override + public boolean equalTo(Block leftBlock, int leftPosition, Block rightBlock, int rightPosition) + { + int leftLength = leftBlock.getLength(leftPosition); + int rightLength = rightBlock.getLength(rightPosition); + if (leftLength != rightLength) { + return false; + } + return leftBlock.equals(leftPosition, 0, rightBlock, rightPosition, 0, leftLength); + } + + @Override + public long hash(Block block, int position) + { + return block.hash(position, 0, block.getLength(position)); + } + + @Override + public int compareTo(Block leftBlock, int leftPosition, Block rightBlock, int rightPosition) + { + int leftLength = leftBlock.getLength(leftPosition); + int rightLength = rightBlock.getLength(rightPosition); + return leftBlock.compareTo(leftPosition, 0, leftLength, rightBlock, rightPosition, 0, rightLength); + } + + @Override + public void appendTo(Block block, int position, BlockBuilder blockBuilder) + { + if (block.isNull(position)) { + blockBuilder.appendNull(); + } + else { + block.writeBytesTo(position, 0, block.getLength(position), blockBuilder); + blockBuilder.closeEntry(); + } + } + + @Override + public Slice getSlice(Block block, int position) + { + return block.getSlice(position, 0, block.getLength(position)); + } + + @Override + public void writeSlice(BlockBuilder blockBuilder, Slice value) + { + writeSlice(blockBuilder, value, 0, value.length()); + } + + @Override + public void writeSlice(BlockBuilder blockBuilder, Slice value, int offset, int length) + { + blockBuilder.writeBytes(value, offset, length).closeEntry(); + } + + public static class ObjectIdSerializer + extends JsonSerializer + { + @Override + public void serialize(ObjectId objectId, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + jsonGenerator.writeString(objectId.toString()); + } + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ReadPreferenceType.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ReadPreferenceType.java new file mode 100644 index 000000000000..0c779f6ff45f --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/ReadPreferenceType.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.mongodb.ReadPreference; + +import static java.util.Objects.requireNonNull; + +public enum ReadPreferenceType +{ + PRIMARY(ReadPreference.primary()), + PRIMARY_PREFERRED(ReadPreference.primaryPreferred()), + SECONDARY(ReadPreference.secondary()), + SECONDARY_PREFERRED(ReadPreference.secondaryPreferred()), + NEAREST(ReadPreference.nearest()); + + private final ReadPreference readPreference; + + ReadPreferenceType(ReadPreference readPreference) + { + this.readPreference = requireNonNull(readPreference, "readPreference is null"); + } + + public ReadPreference getReadPreference() + { + return readPreference; + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java new file mode 100644 index 000000000000..49d9fb56db82 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/TypeUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; + +import java.util.function.Predicate; + +import static com.facebook.presto.spi.type.DateType.DATE; +import static com.facebook.presto.spi.type.TimeType.TIME; +import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public final class TypeUtils +{ + private TypeUtils() {} + + public static B checkType(A value, Class target, String name) + { + requireNonNull(value, String.format("%s is null", name)); + checkArgument(target.isInstance(value), + "%s must be of type %s, not %s", + name, + target.getName(), + value.getClass().getName()); + return target.cast(value); + } + + public static boolean isArrayType(Type type) + { + return type.getTypeSignature().getBase().equals(StandardTypes.ARRAY); + } + + public static boolean isMapType(Type type) + { + return type.getTypeSignature().getBase().equals(StandardTypes.MAP); + } + + public static boolean isRowType(Type type) + { + return type.getTypeSignature().getBase().equals(StandardTypes.ROW); + } + + public static boolean isDateType(Type type) + { + return type.equals(DATE) || + type.equals(TIME) || + type.equals(TIMESTAMP) || + type.equals(TIMESTAMP_WITH_TIME_ZONE); + } + + public static boolean containsType(Type type, Predicate predicate, Predicate... orPredicates) + { + for (Predicate orPredicate : orPredicates) { + predicate = predicate.or(orPredicate); + } + if (predicate.test(type)) { + return true; + } + + return type.getTypeParameters().stream().anyMatch(predicate); + } +} diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/WriteConcernType.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/WriteConcernType.java new file mode 100644 index 000000000000..2216480996b7 --- /dev/null +++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/WriteConcernType.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.mongodb.WriteConcern; + +import static java.util.Objects.requireNonNull; + +public enum WriteConcernType +{ + ACKNOWLEDGED(WriteConcern.ACKNOWLEDGED), + FSYNC_SAFE(WriteConcern.FSYNC_SAFE), + FSYNCED(WriteConcern.FSYNCED), + JOURNAL_SAFEY(WriteConcern.JOURNAL_SAFE), + JOURNALED(WriteConcern.JOURNALED), + MAJORITY(WriteConcern.MAJORITY), + NORMAL(WriteConcern.NORMAL), + REPLICA_ACKNOWLEDGED(WriteConcern.REPLICA_ACKNOWLEDGED), + REPLICAS_SAFE(WriteConcern.REPLICAS_SAFE), + SAFE(WriteConcern.SAFE), + UNACKNOWLEDGED(WriteConcern.UNACKNOWLEDGED); + + private final WriteConcern writeConcern; + + WriteConcernType(WriteConcern writeConcern) + { + this.writeConcern = requireNonNull(writeConcern, "writeConcern is null"); + } + + public WriteConcern getWriteConcern() + { + return writeConcern; + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/MongoQueryRunner.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/MongoQueryRunner.java new file mode 100644 index 000000000000..e1d9e61ed8c4 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/MongoQueryRunner.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.Session; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.facebook.presto.tpch.TpchPlugin; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import de.bwaldvogel.mongo.MongoServer; +import io.airlift.tpch.TpchTable; + +import java.net.InetSocketAddress; +import java.util.Map; + +import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static com.facebook.presto.tests.QueryAssertions.copyTpchTables; +import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.airlift.testing.Closeables.closeAllSuppress; +import static java.util.Locale.ENGLISH; + +public class MongoQueryRunner + extends DistributedQueryRunner +{ + private static final String TPCH_SCHEMA = "tpch"; + + private final MongoServer server; + private final InetSocketAddress address; + + private MongoQueryRunner(Session session, int workers) throws Exception + { + super(session, workers); + + server = new MongoServer(new SyncMemoryBackend()); + address = server.bind(); + } + + public static MongoQueryRunner createMongoQueryRunner(TpchTable... tables) + throws Exception + { + return createMongoQueryRunner(ImmutableList.copyOf(tables)); + } + + public static MongoQueryRunner createMongoQueryRunner(Iterable> tables) + throws Exception + { + MongoQueryRunner queryRunner = null; + try { + queryRunner = new MongoQueryRunner(createSession(), 3); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + Map properties = ImmutableMap.of( + "mongodb.seeds", queryRunner.getAddress().getHostString() + ":" + queryRunner.getAddress().getPort(), + "mongodb.socket-keep-alive", "true" + ); + + queryRunner.installPlugin(new MongoPlugin()); + queryRunner.createCatalog("mongodb", "mongodb", properties); + + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); + + return queryRunner; + } + catch (Throwable e) { + closeAllSuppress(e, queryRunner); + throw e; + } + } + + public static Session createSession() + { + return testSessionBuilder() + .setCatalog("mongodb") + .setSchema(TPCH_SCHEMA) + .setTimeZoneKey(UTC_KEY) + .setLocale(ENGLISH) + .build(); + } + + public InetSocketAddress getAddress() + { + return address; + } + + public void shutdown() + { + close(); + server.shutdown(); + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/SyncMemoryBackend.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/SyncMemoryBackend.java new file mode 100644 index 000000000000..259c808e0399 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/SyncMemoryBackend.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import de.bwaldvogel.mongo.MongoBackend; +import de.bwaldvogel.mongo.backend.memory.MemoryBackend; +import de.bwaldvogel.mongo.backend.memory.MemoryDatabase; +import de.bwaldvogel.mongo.exception.MongoServerException; +import io.netty.channel.Channel; +import org.bson.BSONObject; + +public class SyncMemoryBackend + extends MemoryBackend +{ + @Override + public MemoryDatabase openOrCreateDatabase(String databaseName) throws MongoServerException + { + return new SyncMemoryDatabase(this, databaseName); + } + + private static class SyncMemoryDatabase extends MemoryDatabase + { + public SyncMemoryDatabase(MongoBackend backend, String databaseName) throws MongoServerException + { + super(backend, databaseName); + } + + @Override + public synchronized BSONObject handleCommand(Channel channel, String command, BSONObject query) throws MongoServerException + { + return super.handleCommand(channel, command, query); + } + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoDistributedQueries.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoDistributedQueries.java new file mode 100644 index 000000000000..e1641b58e125 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoDistributedQueries.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.tests.AbstractTestQueries; +import io.airlift.tpch.TpchTable; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import static com.facebook.presto.mongodb.MongoQueryRunner.createMongoQueryRunner; + +@Test(singleThreaded = true) +public class TestMongoDistributedQueries + extends AbstractTestQueries +{ + private final MongoQueryRunner runner; + + public TestMongoDistributedQueries() + throws Exception + { + this(createMongoQueryRunner(TpchTable.getTables())); + } + + private TestMongoDistributedQueries(MongoQueryRunner runner) + { + super(runner); + this.runner = runner; + } + + @AfterClass(alwaysRun = true) + public final void destroy() + { + runner.shutdown(); + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java new file mode 100644 index 000000000000..186b024d569e --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoIntegrationSmokeTest.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.MaterializedRow; +import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest; +import org.joda.time.DateTime; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.sql.Timestamp; +import java.util.Date; + +import static com.facebook.presto.mongodb.MongoQueryRunner.createMongoQueryRunner; +import static io.airlift.tpch.TpchTable.ORDERS; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.joda.time.DateTimeZone.UTC; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; + +@Test(singleThreaded = true) +public class TestMongoIntegrationSmokeTest + extends AbstractTestIntegrationSmokeTest +{ + private final MongoQueryRunner runner; + + public TestMongoIntegrationSmokeTest() + throws Exception + { + this(createMongoQueryRunner(ORDERS)); + } + + public TestMongoIntegrationSmokeTest(MongoQueryRunner runner) + { + super(runner); + this.runner = runner; + } + + @Test + public void createTableWithEveryType() + throws Exception + { + String query = "" + + "CREATE TABLE test_types_table AS " + + "SELECT" + + " 'foo' _varchar" + + ", cast('bar' as varbinary) _varbinary" + + ", 1 _bigint" + + ", 3.14 _double" + + ", true _boolean" + + ", DATE '1980-05-07' _date" + + ", TIMESTAMP '1980-05-07 11:22:33.456' _timestamp"; + + assertUpdate(query, 1); + + MaterializedResult results = queryRunner.execute(getSession(), "SELECT * FROM test_types_table").toJdbcTypes(); + assertEquals(results.getRowCount(), 1); + MaterializedRow row = results.getMaterializedRows().get(0); + assertEquals(row.getField(0), "foo"); + assertEquals(row.getField(1), "bar".getBytes(UTF_8)); + assertEquals(row.getField(2), 1L); + assertEquals(row.getField(3), 3.14); + assertEquals(row.getField(4), true); + assertEquals(row.getField(5), new Date(new DateTime(1980, 5, 7, 0, 0, 0, UTC).getMillis())); + assertEquals(row.getField(6), new Timestamp(new DateTime(1980, 5, 7, 11, 22, 33, 456, UTC).getMillis())); + assertUpdate("DROP TABLE test_types_table"); + + assertFalse(queryRunner.tableExists(getSession(), "test_types_table")); + } + + @Test + public void testArrays() + throws Exception + { + assertUpdate("CREATE TABLE tmp_array1 AS SELECT ARRAY[1, 2, NULL] AS col", 1); + assertQuery("SELECT col[2] FROM tmp_array1", "SELECT 2"); + assertQuery("SELECT col[3] FROM tmp_array1", "SELECT NULL"); + + assertUpdate("CREATE TABLE tmp_array2 AS SELECT ARRAY[1.0, 2.5, 3.5] AS col", 1); + assertQuery("SELECT col[2] FROM tmp_array2", "SELECT 2.5"); + + assertUpdate("CREATE TABLE tmp_array3 AS SELECT ARRAY['puppies', 'kittens', NULL] AS col", 1); + assertQuery("SELECT col[2] FROM tmp_array3", "SELECT 'kittens'"); + assertQuery("SELECT col[3] FROM tmp_array3", "SELECT NULL"); + + assertUpdate("CREATE TABLE tmp_array4 AS SELECT ARRAY[TRUE, NULL] AS col", 1); + assertQuery("SELECT col[1] FROM tmp_array4", "SELECT TRUE"); + assertQuery("SELECT col[2] FROM tmp_array4", "SELECT NULL"); + + assertUpdate("CREATE TABLE tmp_array5 AS SELECT ARRAY[ARRAY[1, 2], NULL, ARRAY[3, 4]] AS col", 1); + assertQuery("SELECT col[1][2] FROM tmp_array5", "SELECT 2"); + + assertUpdate("CREATE TABLE tmp_array6 AS SELECT ARRAY[ARRAY['\"hi\"'], NULL, ARRAY['puppies']] AS col", 1); + assertQuery("SELECT col[1][1] FROM tmp_array6", "SELECT '\"hi\"'"); + assertQuery("SELECT col[3][1] FROM tmp_array6", "SELECT 'puppies'"); + } + + @Test + public void testTemporalArrays() + throws Exception + { + assertUpdate("CREATE TABLE tmp_array7 AS SELECT ARRAY[DATE '2014-09-30'] AS col", 1); + assertOneNotNullResult("SELECT col[1] FROM tmp_array7"); + assertUpdate("CREATE TABLE tmp_array8 AS SELECT ARRAY[TIMESTAMP '2001-08-22 03:04:05.321'] AS col", 1); + assertOneNotNullResult("SELECT col[1] FROM tmp_array8"); + } + + @Test + public void testMaps() + throws Exception + { + assertUpdate("CREATE TABLE tmp_map1 AS SELECT MAP(ARRAY[0,1], ARRAY[2,NULL]) AS col", 1); + assertQuery("SELECT col[0] FROM tmp_map1", "SELECT 2"); + assertQuery("SELECT col[1] FROM tmp_map1", "SELECT NULL"); + + assertUpdate("CREATE TABLE tmp_map2 AS SELECT MAP(ARRAY[1.0], ARRAY[2.5]) AS col", 1); + assertQuery("SELECT col[1.0] FROM tmp_map2", "SELECT 2.5"); + + assertUpdate("CREATE TABLE tmp_map3 AS SELECT MAP(ARRAY['puppies'], ARRAY['kittens']) AS col", 1); + assertQuery("SELECT col['puppies'] FROM tmp_map3", "SELECT 'kittens'"); + + assertUpdate("CREATE TABLE tmp_map4 AS SELECT MAP(ARRAY[TRUE], ARRAY[FALSE]) AS col", "SELECT 1"); + assertQuery("SELECT col[TRUE] FROM tmp_map4", "SELECT FALSE"); + + assertUpdate("CREATE TABLE tmp_map5 AS SELECT MAP(ARRAY[1.0], ARRAY[ARRAY[1, 2]]) AS col", 1); + assertQuery("SELECT col[1.0][2] FROM tmp_map5", "SELECT 2"); + + assertUpdate("CREATE TABLE tmp_map6 AS SELECT MAP(ARRAY[DATE '2014-09-30'], ARRAY[DATE '2014-09-29']) AS col", 1); + assertOneNotNullResult("SELECT col[DATE '2014-09-30'] FROM tmp_map6"); + assertUpdate("CREATE TABLE tmp_map7 AS SELECT MAP(ARRAY[TIMESTAMP '2001-08-22 03:04:05.321'], ARRAY[TIMESTAMP '2001-08-22 03:04:05.321']) AS col", 1); + assertOneNotNullResult("SELECT col[TIMESTAMP '2001-08-22 03:04:05.321'] FROM tmp_map7"); + } + + private void assertOneNotNullResult(String query) + { + MaterializedResult results = queryRunner.execute(getSession(), query).toJdbcTypes(); + assertEquals(results.getRowCount(), 1); + assertEquals(results.getMaterializedRows().get(0).getFieldCount(), 1); + assertNotNull(results.getMaterializedRows().get(0).getField(0)); + } + + @Override + public void testViewAccessControl() + { + // does not support views + } + + @AfterClass(alwaysRun = true) + public final void destroy() + { + runner.shutdown(); + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoPlugin.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoPlugin.java new file mode 100644 index 000000000000..ac63fe53f5e8 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoPlugin.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.Connector; +import com.facebook.presto.spi.ConnectorFactory; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.spi.type.TypeSignature; +import com.facebook.presto.spi.type.TypeSignatureParameter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import de.bwaldvogel.mongo.MongoServer; +import de.bwaldvogel.mongo.backend.memory.MemoryBackend; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID; +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.testng.Assert.assertEquals; + +public class TestMongoPlugin +{ + private MongoServer server; + private String seed; + + @BeforeClass + public void start() + { + server = new MongoServer(new MemoryBackend()); + + InetSocketAddress address = server.bind(); + seed = String.format("%s:%d", address.getHostString(), address.getPort()); + } + + @Test + public void testCreateConnector() + throws Exception + { + MongoPlugin plugin = new MongoPlugin(); + plugin.setTypeManager(new TestingTypeManager()); + ConnectorFactory factory = getOnlyElement(plugin.getServices(ConnectorFactory.class)); + Connector connector = factory.create("test", ImmutableMap.of("mongodb.seeds", seed)); + + Type type = getOnlyElement(plugin.getServices(Type.class)); + assertEquals(type, OBJECT_ID); + + connector.shutdown(); + } + + @AfterClass + public void destory() + { + server.shutdown(); + } + + private static class TestingTypeManager + implements TypeManager + { + @Override + public Type getType(TypeSignature signature) + { + return null; + } + + @Override + public Type getParameterizedType(String baseTypeName, List typeParameters) + { + return null; + } + + @Override + public Type getParameterizedType(String baseTypeName, List typeParameters, List literalParameters) + { + return null; + } + + @Override + public List getTypes() + { + return ImmutableList.of(); + } + + @Override + public Optional getCommonSuperType(List types) + { + return Optional.empty(); + } + + @Override + public Optional getCommonSuperType(Type firstType, Type secondType) + { + return Optional.empty(); + } + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSession.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSession.java new file mode 100644 index 000000000000..ec1f04e1cdf0 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSession.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.predicate.Domain; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.predicate.ValueSet; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.bson.Document; +import org.testng.annotations.Test; + +import static com.facebook.presto.spi.predicate.Range.equal; +import static com.facebook.presto.spi.predicate.Range.greaterThan; +import static com.facebook.presto.spi.predicate.Range.lessThan; +import static com.facebook.presto.spi.predicate.Range.range; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; +import static io.airlift.slice.Slices.utf8Slice; +import static java.util.Arrays.asList; +import static org.testng.Assert.assertEquals; + +public class TestMongoSession +{ + private static final MongoColumnHandle COL1 = new MongoColumnHandle("mongodb", "col1", BIGINT, false); + private static final MongoColumnHandle COL2 = new MongoColumnHandle("mongodb", "col2", VARCHAR, false); + + @Test + public void testBuildQuery() + { + TupleDomain tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of( + COL1, Domain.create(ValueSet.ofRanges(range(BIGINT, 100L, false, 200L, true)), false), + COL2, Domain.singleValue(VARCHAR, utf8Slice("a value")) + )); + + Document query = MongoSession.buildQuery(tupleDomain); + Document expected = new Document() + .append(COL1.getName(), new Document().append("$gt", 100L).append("$lte", 200L)) + .append(COL2.getName(), new Document("$eq", "a value")); + assertEquals(query, expected); + } + + @Test + public void testBuildQueryIn() + { + TupleDomain tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of( + COL2, Domain.create(ValueSet.ofRanges(equal(VARCHAR, utf8Slice("hello")), equal(VARCHAR, utf8Slice("world"))), false) + )); + + Document query = MongoSession.buildQuery(tupleDomain); + Document expected = new Document(COL2.getName(), new Document("$in", ImmutableList.of("hello", "world"))); + assertEquals(query, expected); + } + + @Test + public void testBuildQueryOr() + { + TupleDomain tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of( + COL1, Domain.create(ValueSet.ofRanges(lessThan(BIGINT, 100L), greaterThan(BIGINT, 200L)), false) + )); + + Document query = MongoSession.buildQuery(tupleDomain); + Document expected = new Document("$or", asList( + new Document(COL1.getName(), new Document("$lt", 100L)), + new Document(COL1.getName(), new Document("$gt", 200L)) + )); + assertEquals(query, expected); + } + + @Test + public void testBuildQueryNull() + { + TupleDomain tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of( + COL1, Domain.create(ValueSet.ofRanges(greaterThan(BIGINT, 200L)), true) + )); + + Document query = MongoSession.buildQuery(tupleDomain); + Document expected = new Document("$or", asList( + new Document(COL1.getName(), new Document("$gt", 200L)), + new Document(COL1.getName(), new Document("$exists", true).append("$eq", null)) + )); + assertEquals(query, expected); + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSplit.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSplit.java new file mode 100644 index 000000000000..cb8cf2d7e573 --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoSplit.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.predicate.TupleDomain; +import com.google.common.collect.ImmutableList; +import io.airlift.json.JsonCodec; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestMongoSplit +{ + private final JsonCodec codec = JsonCodec.jsonCodec(MongoSplit.class); + + @Test + public void testJsonRoundTrip() + { + MongoSplit expected = new MongoSplit("connectorId", new SchemaTableName("schema1", "table1"), TupleDomain.all(), ImmutableList.of()); + + String json = codec.toJson(expected); + MongoSplit actual = codec.fromJson(json); + + assertEquals(actual.getConnectorId(), expected.getConnectorId()); + assertEquals(actual.getSchemaTableName(), expected.getSchemaTableName()); + assertEquals(actual.getTupleDomain(), TupleDomain.all()); + assertEquals(actual.getAddresses(), ImmutableList.of()); + } +} diff --git a/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoTableHandle.java b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoTableHandle.java new file mode 100644 index 000000000000..165ab6953e9d --- /dev/null +++ b/presto-mongodb/src/test/java/com/facebook/presto/mongodb/TestMongoTableHandle.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.mongodb; + +import com.facebook.presto.spi.SchemaTableName; +import io.airlift.json.JsonCodec; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestMongoTableHandle +{ + private final JsonCodec codec = JsonCodec.jsonCodec(MongoTableHandle.class); + + @Test + public void testRoundTrip() + { + MongoTableHandle expected = new MongoTableHandle("client", new SchemaTableName("schema", "table")); + + String json = codec.toJson(expected); + MongoTableHandle actual = codec.fromJson(json); + + assertEquals(actual.getConnectorId(), expected.getConnectorId()); + assertEquals(actual.getSchemaTableName(), expected.getSchemaTableName()); + } +} diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index 5d37ab286ca7..ece242d4622a 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -121,4 +121,10 @@ + + + + + +