diff --git a/pom.xml b/pom.xml
index 4f1da6995e24..ec2cf765f486 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
presto-base-jdbcpresto-mysqlpresto-postgresql
+ presto-mongodbpresto-clientpresto-parserpresto-main
diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst
index 10dbb9d6628e..e11859910119 100644
--- a/presto-docs/src/main/sphinx/connector.rst
+++ b/presto-docs/src/main/sphinx/connector.rst
@@ -14,6 +14,7 @@ from different data sources.
connector/jmx
connector/kafka
connector/kafka-tutorial
+ connector/mongodb
connector/mysql
connector/postgresql
connector/redis
diff --git a/presto-docs/src/main/sphinx/connector/mongodb.rst b/presto-docs/src/main/sphinx/connector/mongodb.rst
new file mode 100644
index 000000000000..cda6a20f20fc
--- /dev/null
+++ b/presto-docs/src/main/sphinx/connector/mongodb.rst
@@ -0,0 +1,244 @@
+=================
+MongoDB Connector
+=================
+
+This connector allows the use of Mongodb collections as tables in Presto.
+
+.. note::
+
+ Mongodb 2.6+ is supported although it is highly recommend to use 3.0 or later.
+
+Configuration
+-------------
+
+To configure the MongoDB connector, create a catalog properties file
+``etc/catalog/mongodb.properties`` with the following contents,
+replacing the properties as appropriate:
+
+.. code-block:: none
+
+ connector.name=mongodb
+ mongodb.seeds=host1,host:port
+
+Multiple MongoDB Clusters
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can have as many catalogs as you need, so if you have additional
+MongoDB clusters, simply add another properties file to ``etc/catalog``
+with a different name (making sure it ends in ``.properties``). For
+example, if you name the property file ``sales.properties``, Presto
+will create a catalog named ``sales`` using the configured connector.
+
+Configuration Properties
+------------------------
+
+The following configuration properties are available:
+
+===================================== ==============================================================
+Property Name Description
+===================================== ==============================================================
+``mongodb.seeds`` List of all mongod servers
+``mongodb.schema-collection`` A collection which contains schema information
+``mongodb.credentials`` List of credentials
+``mongodb.min-connections-per-host`` The minimum size of the connection pool per host
+``mongodb.connections-per-host`` The maximum size of the connection pool per host
+``mongodb.max-wait-time`` The maximum wait time
+``mongodb.connection-timeout`` The socket connect timeout
+``mongodb.socket-timeout`` The socket timeout
+``mongodb.socket-keep-alive`` Whether keep-alive is enabled on each socket
+``mongodb.read-preference`` The read preference
+``mongodb.write-concern`` The write concern
+``mongodb.required-replica-set`` The required replica set name
+``mongodb.cursor-batch-size`` The number of elements to return in a batch
+===================================== ==============================================================
+
+``Mongodb.seeds``
+^^^^^^^^^^^^^^^^^
+
+Comma-separated list of ``hostname[:port]`` all mongod servers in the same replica set or a list of mongos servers in the same sharded cluster. If port is not specified, port 27017 will be used.
+
+This property is required; there is no default and at least one seed must be defined.
+
+``mongodb.schema-collection``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+As the MongoDB is a document database, there's no fixed schema information in the system. So a special collection in each MongoDB database should defines the schema of all tables. Please refer the :ref:`table-definition-label` section for the details.
+
+At startup, this plugin tries guessin fields' types, but it might not be correct for your collection. In that case, you need to modify it manually. ``CREATE TABLE`` and ``CREATE TABLE AS SELECT`` will create an entry for you.
+
+This property is optional; the default is ``_schema``.
+
+``mongodb.credentials``
+^^^^^^^^^^^^^^^^^^^^^^^
+
+A comma separated list of ``username:password@collection`` credentials
+
+This property is optional; no default value
+
+``mongodb.min-connections-per-host``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The minimum number of connections per host for this MongoClient instance. Those connections will be kept in a pool when idle, and the pool will ensure over time that it contains at least this minimum number.
+
+This property is optional; the default is ``0``.
+
+``mongodb.connections-per-host``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The maximum number of connections allowed per host for this MongoClient instance. Those connections will be kept in a pool when idle. Once the pool is exhausted, any operation requiring a connection will block waiting for an available connection.
+
+This property is optional; the default is ``100``.
+
+``mongodb.max-wait-time``
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The maximum wait time in milliseconds that a thread may wait for a connection to become available.
+A value of 0 means that it will not wait. A negative value means to wait indefinitely for a connection to become available.
+
+This property is optional; the default is ``120000``.
+
+``mongodb.connection-timeout``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The connection timeout in milliseconds. A value of 0 means no timeout. It is used solely when establishing a new connection Socket.connect(java.net.SocketAddress, int)
+
+This property is optional; the default is ``10000``.
+
+``mongodb.socket-timeout``
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The socket timeout in milliseconds. It is used for I/O socket read and write operations Socket.setSoTimeout(int)
+
+This property is optional; the default is ``0`` and means no timeout.
+
+``mongodb.socket-keep-alive``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This flag controls the socket keep alive feature that keeps a connection alive through firewalls Socket.setKeepAlive(boolean)
+
+This property is optional; the default is ``false``.
+
+``mongodb.read-preference``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The read preference to use for queries, map-reduce, aggregation, and count. The available values are PRIMARY, PRIMARY_PREFERRED, SECONDARY, SECONDARY_PREFERRED and NEAREST.
+
+This property is optional; the default is ``PRIMARY``.
+
+``mongodb.write-concern``
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The write concern to use. The available values are ACKNOWLEDGED, FSYNC_SAFE, FSYNCED, JOURNAL_SAFEY, JOURNALED, MAJORITY, NORMAL, REPLICA_ACKNOWLEDGED , REPLICAS_SAFE and UNACKNOWLEDGED.
+
+This property is optional; the default is ``ACKNOWLEDGED``.
+
+``mongodb.required-replica-set``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The required replica set name. With this option set, the MongoClient instance will
+
+#. Connect in replica set mode, and discover all members of the set based on the given servers
+#. Make sure that the set name reported by all members matches the required set name.
+#. Refuse to service any requests if any member of the seed list is not part of a replica set with the required name.
+
+This property is optional; no default value
+
+``mongodb.required-replica-set``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Limits the number of elements returned in one batch. A cursor typically fetches a batch of result objects and stores them locally.
+If batchSize is 0, Driver's default will be used.
+If batchSize is positive, it represents the size of each batch of objects retrieved. It can be adjusted to optimize performance and limit data transfer.
+If batchSize is negative, it will limit of number objects returned, that fit within the max batch size limit (usually 4MB), and cursor will be closed. For example if batchSize is -10, then the server will return a maximum of 10 documents and as many as can fit in 4MB, then close the cursor.
+
+.. note::
+ Do not use a batch size of 1.
+
+This property is optional; the default is ``0``.
+
+.. _table-definition-label:
+
+Table Definition
+----------------
+
+MongoDB maintains table definitions on the special collection where ``mongodb.schema-collection`` configuration value specifies.
+
+.. note::
+ There's no way for the plugin to detect a collection is deleted.
+ You need to delete the entry by ``db.getCollection("_schema").remove( { table: deleted_table_name })`` at the Mongo Shell. Or please drop a collection by ``drop table table_name`` at Presto shell.
+
+A schema collection consists of a MongoDB document for a table.
+
+.. code-block:: json
+
+ {
+ "table": ...,
+ "fields": [
+ { "name" : ...,
+ "type" : "varchar|bigint|boolean|double|date|array|...",
+ "hidden" : false },
+ ...
+ ]
+ }
+ }
+
+=============== ========= ============== =============================
+Field Required Type Description
+=============== ========= ============== =============================
+``table`` required string Presto table name
+``fields`` required array A list of field definitions. Each field definition creates a new column in the Presto table.
+=============== ========= ============== =============================
+
+Each field definition:
+
+.. code-block:: json
+
+ {
+ "name": ...,
+ "type": ...,
+ "hidden": ...
+ }
+
+=============== ========= ========= =============================
+Field Required Type Description
+=============== ========= ========= =============================
+``name`` required string Name of the column in the Presto table.
+``type`` required string Presto type of the column.
+``hidden`` optional boolean Hides the column from ``DESCRIBE
`` and ``SELECT *``. Defaults to ``false``.
+=============== ========= ========= =============================
+
+There is no limit on field descriptions for either key or message.
+
+ObjectId
+--------
+MongoDB collection has the special filed ``_id``. The plugin tries to follow the same rules for this special field, so there will be hidden field ``_id``.
+
+
+.. code-block:: sql
+
+ CREATE TABLE IF NOT EXISTS orders (
+ orderkey bigint,
+ orderstatus varchar,
+ totalprice double,
+ orderdate date
+ );
+
+ insert into orders values( 1, 'bad', 50.0, current_date);
+ insert into orders values( 2, 'good', 100.0, current_date);
+ select _id, * from orders3;
+
+ _id | orderkey | orderstatus | totalprice | orderdate
+ -------------------------------------+----------+-------------+------------+------------
+ 55 b1 51 63 38 64 d6 43 8c 61 a9 ce | 1 | bad | 50.0 | 2015-07-23
+ 55 b1 51 67 38 64 d6 43 8c 61 a9 cf | 2 | good | 100.0 | 2015-07-23
+ (2 rows)
+
+ select _id, * from orders3 where _id = ObjectId('55b151633864d6438c61a9ce');
+
+ _id | orderkey | orderstatus | totalprice | orderdate
+ -------------------------------------+----------+-------------+------------+------------
+ 55 b1 51 63 38 64 d6 43 8c 61 a9 ce | 1 | bad | 50.0 | 2015-07-23
+ (1 row)
+
+.. note::
+ Unfortunately there's no way to represent _id field more fancy like `55b151633864d6438c61a9ce`
diff --git a/presto-mongodb/pom.xml b/presto-mongodb/pom.xml
new file mode 100644
index 000000000000..1da615994784
--- /dev/null
+++ b/presto-mongodb/pom.xml
@@ -0,0 +1,218 @@
+
+
+ 4.0.0
+
+ com.facebook.presto
+ presto-root
+ 0.132-SNAPSHOT
+
+
+ presto-mongodb
+ Presto - mongodb Connector
+ presto-plugin
+
+
+ ${project.parent.basedir}
+ 3.1.0
+ 1.5.0
+ 4.0.32.Final
+
+
+
+
+ org.mongodb
+ mongo-java-driver
+ ${mongo-java.version}
+
+
+
+ joda-time
+ joda-time
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ configuration
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ com.facebook.presto
+ presto-main
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ javax.inject
+ javax.inject
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ provided
+
+
+
+
+ com.facebook.presto
+ presto-tests
+ test
+
+
+
+ com.facebook.presto
+ presto-tpch
+ test
+
+
+
+ io.airlift.tpch
+ tpch
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ javax.annotation
+ javax.annotation-api
+ test
+
+
+
+ de.bwaldvogel
+ mongo-java-server
+ ${mongo-server.version}
+
+
+ org.mongodb
+ mongo-java-driver
+
+
+ test
+
+
+
+ de.bwaldvogel
+ mongo-java-server-core
+ ${mongo-server.version}
+ test
+
+
+
+ de.bwaldvogel
+ mongo-java-server-memory-backend
+ ${mongo-server.version}
+ test
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ io.netty
+ netty-transport
+ ${netty.version}
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+ 1
+
+ **/TestMongoDistributed.java
+
+
+
+
+
+
+
+
+ ci
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+
+
+
+
+
+
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java
new file mode 100644
index 000000000000..12208fac257b
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientConfig.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.mongodb.MongoCredential.createCredential;
+
+public class MongoClientConfig
+{
+ private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
+ private static final Splitter PORT_SPLITTER = Splitter.on(':').trimResults().omitEmptyStrings();
+ private static final Splitter USER_SPLITTER = Splitter.onPattern("[:@]").trimResults().omitEmptyStrings();
+
+ private String schemaCollection = "_schema";
+ private List seeds = ImmutableList.of();
+ private List credentials = ImmutableList.of();
+
+ private int minConnectionsPerHost = 0;
+ private int connectionsPerHost = 100;
+ private int maxWaitTime = 120_000;
+ private int connectionTimeout = 10_000;
+ private int socketTimeout = 0;
+ private boolean socketKeepAlive = false;
+
+ // query configurations
+ private int cursorBatchSize = 0; // use driver default
+
+ private ReadPreferenceType readPreference = ReadPreferenceType.PRIMARY;
+ private WriteConcernType writeConcern = WriteConcernType.ACKNOWLEDGED;
+ private String requiredReplicaSetName;
+ private String implicitRowFieldPrefix = "_pos";
+
+ @NotNull
+ public String getSchemaCollection()
+ {
+ return schemaCollection;
+ }
+
+ @Config("mongodb.schema-collection")
+ public MongoClientConfig setSchemaCollection(String schemaCollection)
+ {
+ this.schemaCollection = schemaCollection;
+ return this;
+ }
+
+ @NotNull
+ @Size(min = 1)
+ public List getSeeds()
+ {
+ return seeds;
+ }
+
+ @Config("mongodb.seeds")
+ public MongoClientConfig setSeeds(String commaSeparatedList)
+ {
+ this.seeds = buildSeeds(SPLITTER.split(commaSeparatedList));
+ return this;
+ }
+
+ public MongoClientConfig setSeeds(String... seeds)
+ {
+ this.seeds = buildSeeds(Arrays.asList(seeds));
+ return this;
+ }
+
+ @NotNull
+ @Size(min = 0)
+ public List getCredentials()
+ {
+ return credentials;
+ }
+
+ @Config("mongodb.credentials")
+ public MongoClientConfig setCredentials(String commaSeparatedList)
+ {
+ this.credentials = buildCredentials(SPLITTER.split(commaSeparatedList));
+ return this;
+ }
+
+ public MongoClientConfig setCredentils(String... seeds)
+ {
+ this.credentials = buildCredentials(Arrays.asList(seeds));
+ return this;
+ }
+
+ private List buildSeeds(Iterable hostPorts)
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (String hostPort : hostPorts) {
+ List values = PORT_SPLITTER.splitToList(hostPort);
+ checkArgument(values.size() == 1 || values.size() == 2, "Invalid ServerAddress format. Requires host[:port]");
+ try {
+ if (values.size() == 1) {
+ builder.add(new ServerAddress(values.get(0)));
+ }
+ else {
+ builder.add(new ServerAddress(values.get(0), Integer.parseInt(values.get(1))));
+ }
+ }
+ catch (NumberFormatException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ return builder.build();
+ }
+
+ private List buildCredentials(Iterable userPasses)
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (String userPass : userPasses) {
+ List values = USER_SPLITTER.splitToList(userPass);
+ checkArgument(values.size() == 3, "Invalid Credential format. Requires user:password@collectioin");
+ builder.add(createCredential(values.get(0), values.get(2), values.get(1).toCharArray()));
+ }
+ return builder.build();
+ }
+
+ @Min(0)
+ public int getMinConnectionsPerHost()
+ {
+ return minConnectionsPerHost;
+ }
+
+ @Config("mongodb.min-connections-per-host")
+ public MongoClientConfig setMinConnectionsPerHost(int minConnectionsPerHost)
+ {
+ this.minConnectionsPerHost = minConnectionsPerHost;
+ return this;
+ }
+
+ @Min(1)
+ public int getConnectionsPerHost()
+ {
+ return connectionsPerHost;
+ }
+
+ @Config("mongodb.connection-per-host")
+ public MongoClientConfig setConnectionsPerHost(int connectionsPerHost)
+ {
+ this.connectionsPerHost = connectionsPerHost;
+ return this;
+ }
+
+ @Min(0)
+ public int getMaxWaitTime()
+ {
+ return maxWaitTime;
+ }
+
+ @Config("mongodb.max-wait-time")
+ public MongoClientConfig setMaxWaitTime(int maxWaitTime)
+ {
+ this.maxWaitTime = maxWaitTime;
+ return this;
+ }
+
+ @Min(0)
+ public int getConnectionTimeout()
+ {
+ return connectionTimeout;
+ }
+
+ @Config("mongodb.connection-timeout")
+ public MongoClientConfig setConnectionTimeout(int connectionTimeout)
+ {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ @Min(0)
+ public int getSocketTimeout()
+ {
+ return socketTimeout;
+ }
+
+ @Config("mongodb.socket-timeout")
+ public MongoClientConfig setSocketTimeout(int socketTimeout)
+ {
+ this.socketTimeout = socketTimeout;
+ return this;
+ }
+
+ public boolean getSocketKeepAlive()
+ {
+ return socketKeepAlive;
+ }
+
+ @Config("mongodb.socket-keep-alive")
+ public MongoClientConfig setSocketKeepAlive(boolean socketKeepAlive)
+ {
+ this.socketKeepAlive = socketKeepAlive;
+ return this;
+ }
+
+ public ReadPreferenceType getReadPreference()
+ {
+ return readPreference;
+ }
+
+ @Config("mongodb.read-preference")
+ public MongoClientConfig setReadPreference(ReadPreferenceType readPreference)
+ {
+ this.readPreference = readPreference;
+ return this;
+ }
+
+ public WriteConcernType getWriteConcern()
+ {
+ return writeConcern;
+ }
+
+ @Config("mongodb.write-concern")
+ public MongoClientConfig setWriteConcern(WriteConcernType writeConcern)
+ {
+ this.writeConcern = writeConcern;
+ return this;
+ }
+
+ public String getRequiredReplicaSetName()
+ {
+ return requiredReplicaSetName;
+ }
+
+ @Config("mongodb.required-replica-set")
+ public MongoClientConfig setRequiredReplicaSetName(String requiredReplicaSetName)
+ {
+ this.requiredReplicaSetName = requiredReplicaSetName;
+ return this;
+ }
+
+ public int getCursorBatchSize()
+ {
+ return cursorBatchSize;
+ }
+
+ @Config("mongodb.cursor-batch-size")
+ public MongoClientConfig setCursorBatchSize(int cursorBatchSize)
+ {
+ this.cursorBatchSize = cursorBatchSize;
+ return this;
+ }
+
+ public String getImplicitRowFieldPrefix()
+ {
+ return implicitRowFieldPrefix;
+ }
+
+ @Config("mongodb.implicit-row-field-prefix")
+ public MongoClientConfig setImplicitRowFieldPrefix(String implicitRowFieldPrefix)
+ {
+ this.implicitRowFieldPrefix = implicitRowFieldPrefix;
+ return this;
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java
new file mode 100644
index 000000000000..cb5821377106
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoClientModule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+
+import javax.inject.Singleton;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class MongoClientModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(MongoConnector.class).in(Scopes.SINGLETON);
+ binder.bind(MongoMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(MongoSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(MongoPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(MongoPageSinkProvider.class).in(Scopes.SINGLETON);
+ binder.bind(MongoHandleResolver.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(MongoClientConfig.class);
+ }
+
+ @Singleton
+ @Provides
+ public static MongoSession createMongoSession(
+ TypeManager typeManager,
+ MongoConnectorId connectorId,
+ MongoClientConfig config)
+ {
+ requireNonNull(config, "config is null");
+
+ MongoClientOptions.Builder options = MongoClientOptions.builder();
+
+ options.connectionsPerHost(config.getConnectionsPerHost())
+ .connectTimeout(config.getConnectionTimeout())
+ .socketTimeout(config.getSocketTimeout())
+ .socketKeepAlive(config.getSocketKeepAlive())
+ .maxWaitTime(config.getMaxWaitTime())
+ .minConnectionsPerHost(config.getMinConnectionsPerHost())
+ .readPreference(config.getReadPreference().getReadPreference())
+ .writeConcern(config.getWriteConcern().getWriteConcern());
+
+ if (config.getRequiredReplicaSetName() != null) {
+ options.requiredReplicaSetName(config.getRequiredReplicaSetName());
+ }
+
+ MongoClient client = new MongoClient(config.getSeeds(), config.getCredentials(), options.build());
+
+ return new MongoSession(
+ typeManager,
+ connectorId.toString(),
+ client,
+ config);
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java
new file mode 100644
index 000000000000..4d65ad3b4006
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoColumnHandle.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import org.bson.Document;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class MongoColumnHandle
+ implements ColumnHandle
+{
+ public static final String SAMPLE_WEIGHT_COLUMN_NAME = "presto_sample_weight";
+
+ private final String connectorId;
+ private final String name;
+ private final Type type;
+ private final boolean hidden;
+
+ @JsonCreator
+ public MongoColumnHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("name") String name,
+ @JsonProperty("columnType") Type type,
+ @JsonProperty("hidden") boolean hidden)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.name = requireNonNull(name, "name is null");
+ this.type = requireNonNull(type, "columnType is null");
+ this.hidden = hidden;
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty("columnType")
+ public Type getType()
+ {
+ return type;
+ }
+
+ @JsonProperty
+ public boolean isHidden()
+ {
+ return hidden;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(name, type, false, null, hidden);
+ }
+
+ public Document getDocument()
+ {
+ return new Document().append("name", name)
+ .append("type", type.getTypeSignature().toString())
+ .append("hidden", hidden);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ connectorId,
+ name,
+ hidden);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ MongoColumnHandle other = (MongoColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) &&
+ Objects.equals(this.type, other.type) &&
+ Objects.equals(this.name, other.name);
+ }
+
+ @Override
+ public String toString()
+ {
+ ToStringHelper helper = toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("name", name)
+ .add("type", type)
+ .add("hidden", hidden);
+
+ return helper.toString();
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java
new file mode 100644
index 000000000000..45943769cf58
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnector.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.Connector;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorMetadata;
+import com.facebook.presto.spi.ConnectorPageSinkProvider;
+import com.facebook.presto.spi.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.ConnectorSplitManager;
+
+import javax.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+public class MongoConnector
+ implements Connector
+{
+ private final MongoSession mongoSession;
+ private final MongoMetadata metadata;
+ private final MongoSplitManager splitManager;
+ private final MongoPageSourceProvider pageSourceProvider;
+ private final MongoPageSinkProvider pageSinkProvider;
+ private final MongoHandleResolver handleResolver;
+
+ @Inject
+ public MongoConnector(
+ MongoSession mongoSession,
+ MongoMetadata metadata,
+ MongoSplitManager splitManager,
+ MongoPageSourceProvider pageSourceProvider,
+ MongoPageSinkProvider pageSinkProvider,
+ MongoHandleResolver handleResolver)
+ {
+ this.mongoSession = mongoSession;
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
+ this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider()
+ {
+ return pageSinkProvider;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return handleResolver;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ mongoSession.shutdown();
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java
new file mode 100644
index 000000000000..27f12613df69
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.Connector;
+import com.facebook.presto.spi.ConnectorFactory;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class MongoConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Map optionalConfig;
+ private final TypeManager typeManager;
+
+ public MongoConnectorFactory(String name, TypeManager typeManager, Map optionalConfig)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.optionalConfig = requireNonNull(optionalConfig, "optionalConfig is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public Connector create(String connectorId, Map config)
+ {
+ requireNonNull(config, "config is null");
+
+ try {
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new MongoClientModule(),
+ binder -> {
+ binder.bind(TypeManager.class).toInstance(typeManager);
+ binder.bind(MongoConnectorId.class).toInstance(new MongoConnectorId(connectorId));
+ });
+
+ Injector injector = app.strictConfig().doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .setOptionalConfigurationProperties(optionalConfig)
+ .initialize();
+
+ return injector.getInstance(MongoConnector.class);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java
new file mode 100644
index 000000000000..e2ceb4ece1a4
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoConnectorId.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class MongoConnectorId
+{
+ private final String connectorId;
+
+ public MongoConnectorId(String connectorId)
+ {
+ requireNonNull(connectorId, "connectorId is null");
+ checkArgument(!connectorId.isEmpty(), "connectorId is empty");
+ this.connectorId = connectorId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return connectorId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ MongoConnectorId other = (MongoConnectorId) obj;
+ return this.connectorId.equals(other.connectorId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return connectorId;
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java
new file mode 100644
index 000000000000..1a84224b3562
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoFunctionFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.metadata.FunctionFactory;
+import com.facebook.presto.metadata.FunctionListBuilder;
+import com.facebook.presto.metadata.SqlFunction;
+import com.facebook.presto.spi.type.TypeManager;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class MongoFunctionFactory
+ implements FunctionFactory
+{
+ private final TypeManager typeManager;
+
+ public MongoFunctionFactory(TypeManager typeManager)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public List listFunctions()
+ {
+ return new FunctionListBuilder(typeManager)
+ .scalar(ObjectIdFunctions.class)
+ .getFunctions();
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java
new file mode 100644
index 000000000000..a9eb093a3feb
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoHandleResolver.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+
+import javax.inject.Inject;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class MongoHandleResolver
+ implements ConnectorHandleResolver
+{
+ private final String connectorId;
+
+ @Inject
+ public MongoHandleResolver(MongoConnectorId connectorId)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ }
+
+ @Override
+ public boolean canHandle(ConnectorTableHandle handle)
+ {
+ return handle instanceof MongoTableHandle && ((MongoTableHandle) handle).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public boolean canHandle(ColumnHandle handle)
+ {
+ return handle instanceof MongoColumnHandle && ((MongoColumnHandle) handle).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public boolean canHandle(ConnectorSplit split)
+ {
+ return split instanceof MongoSplit && ((MongoSplit) split).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public boolean canHandle(ConnectorOutputTableHandle handle)
+ {
+ return handle instanceof MongoOutputTableHandle && ((MongoOutputTableHandle) handle).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public boolean canHandle(ConnectorInsertTableHandle handle)
+ {
+ return handle instanceof MongoInsertTableHandle && ((MongoInsertTableHandle) handle).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public boolean canHandle(ConnectorTableLayoutHandle handle)
+ {
+ return handle instanceof MongoTableLayoutHandle && ((MongoTableLayoutHandle) handle).getConnectorId().equals(connectorId);
+ }
+
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return MongoTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return MongoColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return MongoSplit.class;
+ }
+
+ public Class extends ConnectorOutputTableHandle> getOutputTableHandleClass()
+ {
+ return MongoOutputTableHandle.class;
+ }
+
+ public Class extends ConnectorInsertTableHandle> getInsertTableHandleClass()
+ {
+ return MongoInsertTableHandle.class;
+ }
+
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return MongoTableLayoutHandle.class;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .toString();
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java
new file mode 100644
index 000000000000..dd12d72b99c3
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoIndex.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.block.SortOrder;
+import com.google.common.collect.ImmutableList;
+import com.mongodb.client.ListIndexesIterable;
+import org.bson.Document;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class MongoIndex
+{
+ private final String name;
+ private final List keys;
+ private final boolean unique;
+
+ public static List parse(ListIndexesIterable indexes)
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (Document index : indexes) {
+ // TODO: v, ns, sparse fields
+ Document key = (Document) index.get("key");
+ String name = index.getString("name");
+ boolean unique = index.getBoolean("unique", false);
+
+ if (key.containsKey("_fts")) { // Full Text Search
+ continue;
+ }
+ builder.add(new MongoIndex(name, parseKey(key), unique));
+ }
+
+ return builder.build();
+ }
+
+ private static List parseKey(Document key)
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+
+ for (String name : key.keySet()) {
+ Object value = key.get(name);
+ if (value instanceof Number) {
+ int order = ((Number) value).intValue();
+ checkState(order == 1 || order == -1, "Unknown index sort order");
+ builder.add(new MongodbIndexKey(name, order == 1 ? SortOrder.ASC_NULLS_LAST : SortOrder.DESC_NULLS_LAST));
+ }
+ else if (value instanceof String) {
+ builder.add(new MongodbIndexKey(name, (String) value));
+ }
+ else {
+ throw new UnsupportedOperationException("Unknown index type: " + value.toString());
+ }
+ }
+
+ return builder.build();
+ }
+
+ public MongoIndex(String name, List keys, boolean unique)
+ {
+ this.name = name;
+ this.keys = keys;
+ this.unique = unique;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public List getKeys()
+ {
+ return keys;
+ }
+
+ public boolean isUnique()
+ {
+ return unique;
+ }
+
+ public static class MongodbIndexKey
+ {
+ private final String name;
+ private final Optional sortOrder;
+ private final Optional type;
+
+ public MongodbIndexKey(String name, SortOrder sortOrder)
+ {
+ this(name, Optional.of(sortOrder), Optional.empty());
+ }
+
+ public MongodbIndexKey(String name, String type)
+ {
+ this(name, Optional.empty(), Optional.of(type));
+ }
+
+ public MongodbIndexKey(String name, Optional sortOrder, Optional type)
+ {
+ this.name = requireNonNull(name, "name is null");
+ this.sortOrder = sortOrder;
+ this.type = type;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Optional getSortOrder()
+ {
+ return sortOrder;
+ }
+
+ public Optional getType()
+ {
+ return type;
+ }
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java
new file mode 100644
index 000000000000..413780d640a7
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoInsertTableHandle.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class MongoInsertTableHandle
+ implements ConnectorInsertTableHandle
+{
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
+ private final List columns;
+
+ @JsonCreator
+ public MongoInsertTableHandle(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+ @JsonProperty("columns") List columns)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
+ this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName()
+ {
+ return schemaTableName;
+ }
+
+ @JsonProperty
+ public List getColumns()
+ {
+ return columns;
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java
new file mode 100644
index 000000000000..a1432837e214
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoMetadata.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.mongodb.MongoIndex.MongodbIndexKey;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorInsertTableHandle;
+import com.facebook.presto.spi.ConnectorMetadata;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.LocalProperty;
+import com.facebook.presto.spi.NotFoundException;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.SortingProperty;
+import com.facebook.presto.spi.TableNotFoundException;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+
+import javax.inject.Inject;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.presto.mongodb.MongoColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME;
+import static com.facebook.presto.mongodb.TypeUtils.checkType;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+
+public class MongoMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger log = Logger.get(MongoMetadata.class);
+
+ private final String connectorId;
+ private final MongoSession mongoSession;
+
+ @Inject
+ public MongoMetadata(MongoConnectorId connectorId,
+ MongoSession mongoSession)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.mongoSession = requireNonNull(mongoSession, "mongoSession is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return mongoSession.getAllSchemas();
+ }
+
+ @Override
+ public MongoTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ requireNonNull(tableName, "tableName is null");
+ try {
+ return mongoSession.getTable(tableName).getTableHandle();
+ }
+ catch (TableNotFoundException e) {
+ log.debug(e, "Table(%s) not found", tableName);
+ return null;
+ }
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ requireNonNull(tableHandle, "tableHandle is null");
+ SchemaTableName tableName = getTableName(tableHandle);
+ return getTableMetadata(session, tableName);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, String schemaNameOrNull)
+ {
+ ImmutableList.Builder tableNames = ImmutableList.builder();
+
+ for (String schemaName : listSchemas(session, schemaNameOrNull)) {
+ for (String tableName : mongoSession.getAllTables(schemaName)) {
+ tableNames.add(new SchemaTableName(schemaName, tableName.toLowerCase(ENGLISH)));
+ }
+ }
+ return tableNames.build();
+ }
+
+ @Override
+ public ColumnHandle getSampleWeightColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return getColumnHandles(tableHandle, true).get(SAMPLE_WEIGHT_COLUMN_NAME);
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return getColumnHandles(tableHandle, false);
+ }
+
+ private Map getColumnHandles(ConnectorTableHandle tableHandle, boolean includeSampleWeight)
+ {
+ MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle");
+ List columns = mongoSession.getTable(table.getSchemaTableName()).getColumns();
+
+ ImmutableMap.Builder columnHandles = ImmutableMap.builder();
+ for (MongoColumnHandle columnHandle : columns) {
+ if (includeSampleWeight || !columnHandle.getName().equals(SAMPLE_WEIGHT_COLUMN_NAME)) {
+ columnHandles.put(columnHandle.getName(), columnHandle);
+ }
+ }
+ return columnHandles.build();
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ requireNonNull(prefix, "prefix is null");
+ ImmutableMap.Builder> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix)) {
+ try {
+ columns.put(tableName, getTableMetadata(session, tableName).getColumns());
+ }
+ catch (NotFoundException e) {
+ // table disappeared during listing operation
+ }
+ }
+ return columns.build();
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ checkType(tableHandle, MongoTableHandle.class, "tableHandle");
+ return checkType(columnHandle, MongoColumnHandle.class, "columnHandle").getColumnMetadata();
+ }
+
+ @Override
+ public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns)
+ {
+ MongoTableHandle tableHandle = checkType(table, MongoTableHandle.class, "table");
+
+ Optional> partitioningColumns = Optional.empty(); //TODO: sharding key
+ ImmutableList.Builder> localProperties = ImmutableList.builder();
+
+ MongoTable tableInfo = mongoSession.getTable(tableHandle.getSchemaTableName());
+ Map columns = getColumnHandles(session, tableHandle);
+
+ for (MongoIndex index : tableInfo.getIndexes()) {
+ for (MongodbIndexKey key : index.getKeys()) {
+ if (!key.getSortOrder().isPresent()) {
+ continue;
+ }
+ if (columns.get(key.getName()) != null) {
+ localProperties.add(new SortingProperty<>(columns.get(key.getName()), key.getSortOrder().get()));
+ }
+ }
+ }
+
+ ConnectorTableLayout layout = new ConnectorTableLayout(
+ new MongoTableLayoutHandle(tableHandle, constraint.getSummary()),
+ Optional.empty(),
+ TupleDomain.all(),
+ partitioningColumns,
+ Optional.empty(),
+ localProperties.build());
+
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
+ {
+ MongoTableLayoutHandle layout = checkType(handle, MongoTableLayoutHandle.class, "layout");
+
+ // tables in this connector have a single layout
+ return getTableLayouts(session, layout.getTable(), Constraint.alwaysTrue(), Optional.empty())
+ .get(0)
+ .getTableLayout();
+ }
+
+ @Override
+ public boolean canCreateSampledTables(ConnectorSession session)
+ {
+ return true;
+ }
+
+ @Override
+ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
+ {
+ mongoSession.createTable(tableMetadata.getTable(), buildColumnHandles(tableMetadata));
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle");
+
+ mongoSession.dropTable(table.getSchemaTableName());
+ }
+
+ @Override
+ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
+ {
+ List columns = buildColumnHandles(tableMetadata);
+
+ mongoSession.createTable(tableMetadata.getTable(), columns);
+
+ return new MongoOutputTableHandle(connectorId,
+ tableMetadata.getTable(),
+ columns.stream().filter(c -> !c.isHidden()).collect(toList()));
+ }
+
+ @Override
+ public void commitCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments)
+ {
+ }
+
+ @Override
+ public void rollbackCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle)
+ {
+ MongoOutputTableHandle table = checkType(tableHandle, MongoOutputTableHandle.class, "tableHandle");
+ mongoSession.dropTable(table.getSchemaTableName());
+ }
+
+ @Override
+ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ MongoTableHandle table = checkType(tableHandle, MongoTableHandle.class, "tableHandle");
+ List columns = mongoSession.getTable(table.getSchemaTableName()).getColumns();
+
+ return new MongoInsertTableHandle(connectorId,
+ table.getSchemaTableName(),
+ columns.stream().filter(c -> !c.isHidden()).collect(toList()));
+ }
+
+ @Override
+ public void commitInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments)
+ {
+ }
+
+ private static SchemaTableName getTableName(ConnectorTableHandle tableHandle)
+ {
+ return checkType(tableHandle, MongoTableHandle.class, "tableHandle").getSchemaTableName();
+ }
+
+ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
+ {
+ MongoTableHandle tableHandle = mongoSession.getTable(tableName).getTableHandle();
+
+ List columns = ImmutableList.copyOf(
+ getColumnHandles(session, tableHandle).values().stream()
+ .map(MongoColumnHandle.class::cast)
+ .map(MongoColumnHandle::getColumnMetadata)
+ .collect(toList()));
+
+ return new ConnectorTableMetadata(tableName, columns);
+ }
+
+ private List listSchemas(ConnectorSession session, String schemaNameOrNull)
+ {
+ if (schemaNameOrNull == null) {
+ return listSchemaNames(session);
+ }
+ return ImmutableList.of(schemaNameOrNull);
+ }
+
+ private List listTables(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ if (prefix.getSchemaName() == null) {
+ return listTables(session, prefix.getSchemaName());
+ }
+ return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
+
+ private List buildColumnHandles(ConnectorTableMetadata tableMetadata)
+ {
+ return tableMetadata.getColumns().stream()
+ .map(m -> new MongoColumnHandle(connectorId, m.getName(), m.getType(), m.isHidden()))
+ .collect(toList());
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .toString();
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java
new file mode 100644
index 000000000000..d542f028a849
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoOutputTableHandle.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class MongoOutputTableHandle
+ implements ConnectorOutputTableHandle
+{
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
+ private final List columns;
+
+ @JsonCreator
+ public MongoOutputTableHandle(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+ @JsonProperty("columns") List columns)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
+ this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName()
+ {
+ return schemaTableName;
+ }
+
+ @JsonProperty
+ public List getColumns()
+ {
+ return columns;
+ }
+}
diff --git a/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java
new file mode 100644
index 000000000000..e77c3a59bd97
--- /dev/null
+++ b/presto-mongodb/src/main/java/com/facebook/presto/mongodb/MongoPageSink.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.mongodb;
+
+import com.facebook.presto.spi.ConnectorPageSink;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.type.SqlDate;
+import com.facebook.presto.spi.type.SqlTime;
+import com.facebook.presto.spi.type.SqlTimestamp;
+import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
+import com.facebook.presto.spi.type.SqlVarbinary;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.collect.ImmutableList;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.InsertManyOptions;
+import io.airlift.slice.Slice;
+import org.bson.Document;
+import org.bson.types.Binary;
+import org.bson.types.ObjectId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.facebook.presto.mongodb.ObjectIdType.OBJECT_ID;
+import static com.facebook.presto.mongodb.TypeUtils.containsType;
+import static com.facebook.presto.mongodb.TypeUtils.isArrayType;
+import static com.facebook.presto.mongodb.TypeUtils.isMapType;
+import static com.facebook.presto.mongodb.TypeUtils.isRowType;
+import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
+import static java.util.stream.Collectors.toList;
+
+public class MongoPageSink
+ implements ConnectorPageSink
+{
+ private final MongoSession mongoSession;
+ private final ConnectorSession session;
+ private final SchemaTableName schemaTableName;
+ private final List columns;
+ private final List requireTranslate;
+ private final String implicitPrefix;
+
+ public MongoPageSink(MongoClientConfig config,
+ MongoSession mongoSession,
+ ConnectorSession session,
+ SchemaTableName schemaTableName,
+ List columns)
+ {
+ this.mongoSession = mongoSession;
+ this.session = session;
+ this.schemaTableName = schemaTableName;
+ this.columns = columns;
+ this.requireTranslate = columns.stream()
+ .map(c -> containsType(c.getType(), TypeUtils::isDateType,
+ TypeUtils::isMapType, TypeUtils::isRowType,
+ OBJECT_ID::equals, VARBINARY::equals))
+ .collect(toList());
+ this.implicitPrefix = config.getImplicitRowFieldPrefix();
+ }
+
+ @Override
+ public void appendPage(Page page, Block sampleWeightBlock)
+ {
+ // TODO: handle sampleWeightBlock
+ MongoCollection collection = mongoSession.getCollection(schemaTableName);
+ List batch = new ArrayList<>(page.getPositionCount());
+
+ for (int position = 0; position < page.getPositionCount(); position++) {
+ Document doc = new Document();
+
+ for (int channel = 0; channel < page.getChannelCount(); channel++) {
+ MongoColumnHandle column = columns.get(channel);
+ doc.append(column.getName(), getObjectValue(columns.get(channel).getType(), page.getBlock(channel), position, requireTranslate.get(channel)));
+ }
+ batch.add(doc);
+ }
+
+ collection.insertMany(batch, new InsertManyOptions().ordered(true));
+ }
+
+ private Object getObjectValue(Type type, Block block, int position, boolean translate)
+ {
+ if (block.isNull(position)) {
+ return null;
+ }
+
+ Object value = type.getObjectValue(session, block, position);
+
+ if (translate) {
+ value = translateValue(type, value);
+ }
+
+ return value;
+ }
+
+ private Object translateValue(Type type, Object value)
+ {
+ if (type.equals(OBJECT_ID)) {
+ value = value == null ? new ObjectId() : new ObjectId(((SqlVarbinary) value).getBytes());
+ }
+
+ if (value == null) {
+ return null;
+ }
+
+ if (type.getJavaType() == long.class) {
+ if (value instanceof SqlDate) {
+ return new Date(TimeUnit.DAYS.toMillis(((SqlDate) value).getDays()));
+ }
+ if (value instanceof SqlTime) {
+ return new Date(((SqlTime) value).getMillisUtc());
+ }
+ if (value instanceof SqlTimestamp) {
+ return new Date(((SqlTimestamp) value).getMillisUtc());
+ }
+ if (value instanceof SqlTimestampWithTimeZone) {
+ return new Date(((SqlTimestampWithTimeZone) value).getMillisUtc());
+ }
+ }
+ else if (type.getJavaType() == Slice.class) {
+ if (type.equals(VARBINARY)) {
+ value = new Binary(((SqlVarbinary) value).getBytes());
+ }
+ }
+ else if (type.getJavaType() == Block.class) {
+ if (isArrayType(type)) {
+ value = ((List>) value).stream()
+ .map(v -> translateValue(type.getTypeParameters().get(0), v))
+ .collect(toList());
+ }
+ else if (isMapType(type)) {
+ // map type is converted into list of fixed keys document
+ ImmutableList.Builder