Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Spark Usage

Luke Lovett edited this page Mar 25, 2016 · 24 revisions

This page describes how to use the MongoDB Hadoop Connector with Spark.

Installation

  1. Obtain the MongoDB Hadoop Connector. You can either build it or download the jars. The releases page also includes instructions for use with Maven and Gradle. For Spark, all you need is the "spark" jar (this is called something like "mongo-hadoop-spark.jar".
  2. Get a JAR for the MongoDB Java Driver. The connector requires at least version 3.2.1 of the driver "uber" jar (called "mongo-java-driver.jar").

These are the only two depencies for building a project using Spark and MongoDB.

Basic Usage

See example code in Java, Scala or Python.

These examples go through the basics of setting up your Spark project to use MongoDB as a source or a sink. At a high level, here's what we're going to do:

  1. Create a new Configuration so we can set options on the MongoDB Hadoop Connector.
  2. Create a new RDD by calling the newAPIHadoopRDD method on a SparkContext object, passing in the Configuration and the InputFormat class we want to use, based on whether we want to read from a live cluster or a BSON snapshot.
  3. When we're ready to save data back into MongoDB or a BSON file, we'll call the saveAsNewAPIHadoopFile method on the RDD with the OutputFormat class we want.
Java Example
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format",
                  "com.mongodb.hadoop.MongoInputFormat");
// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
                  "mongodb://localhost:27017/db.collection");

// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
    mongodbConfig,            // Configuration
    MongoInputFormat.class,   // InputFormat: read from a live cluster.
    Object.class,             // Key class
    BSONObject.class          // Value class
);

// Create a separate Configuration for saving data back to MongoDB.
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.uri",
                 "mongodb://localhost:27017/output.collection");

// Save this RDD as a Hadoop "file".
// The path argument is unused; all documents will go to 'mongo.output.uri'.
documents.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    Object.class,
    BSONObject.class,
    MongoOutputFormat.class,
    outputConfig
);

// We can also save this back to a BSON file.
documents.saveAsNewAPIHadoopFile(
    "hdfs://localhost:8020/user/spark/bson-demo",
    Object.class,
    BSONObject.class,
    BSONFileOutputFormat.class,
    new Configuration()
);

// We can choose to update documents in an existing collection by using the
// MongoUpdateWritable class instead of BSONObject. First, we have to create
// the update operations we want to perform by mapping them across our current
// RDD.
JavaPairRDD<Object, MongoUpdateWritable> updates = documents.mapValues(
    new Function<BSONObject, MongoUpdateWritable>() {
        public MongoUpdateWritable call(BSONObject value) {
            return new MongoUpdateWritable(
                new BasicDBObject("_id", value.get("_id")),  // Query
                new BasicDBObject("$set", new BasicDBObject("foo", "bar")),  // Update operation
                false,  // Upsert
                false   // Update multiple documents
            );
        }
    }
);

// Now we call saveAsNewAPIHadoopFile, using MongoUpdateWritable as the value
// class.
updates.saveAsNewAPIHadoopFile(
    "file://this-is-completely-unused",
    Object.class,
    MongoUpdateWritable.class,
    MongoOutputFormat.class,
    outputConfig
);
Scala Example
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD

import org.bson.BSONObject
import com.mongodb.hadoop.{
  MongoInputFormat, MongoOutputFormat,
  BSONFileInputFormat, BSONFileOutputFormat}
import com.mongodb.hadoop.io.MongoUpdateWritable

object SparkExample extends App {
  // Set up the configuration for reading from MongoDB.
  val mongoConfig = new Configuration()
  // MongoInputFormat allows us to read from a live MongoDB instance.
  // We could also use BSONFileInputFormat to read BSON snapshots.
  // MongoDB connection string naming a collection to read.
  // If using BSON, use "mapred.input.dir" to configure the directory
  // where the BSON files are located instead.
  mongoConfig.set("mongo.input.uri",
    "mongodb://localhost:27017/db.collection")

  val sparkConf = new SparkConf()
  val sc = new SparkContext("local", "SparkExample", sparkConf)

  // Create an RDD backed by the MongoDB collection.
  val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

  // Create a separate Configuration for saving data back to MongoDB.
  val outputConfig = new Configuration()
  outputConfig.set("mongo.output.uri",
    "mongodb://localhost:27017/output.collection")

  // Save this RDD as a Hadoop "file".
  // The path argument is unused; all documents will go to "mongo.output.uri".
  documents.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    classOf[Object],
    classOf[BSONObject],
    classOf[MongoOutputFormat[Object, BSONObject]],
    outputConfig)

  // We can also save this back to a BSON file.
  val bsonOutputConfig = new Configuration()
  documents.saveAsNewAPIHadoopFile(
    "hdfs://localhost:8020/user/spark/bson-demo",
    classOf[Object],
    classOf[BSONObject],
    classOf[BSONFileOutputFormat[Object, BSONObject]])

  // We can choose to update documents in an existing collection by using the
  // MongoUpdateWritable class instead of BSONObject. First, we have to create
  // the update operations we want to perform by mapping them across our current
  // RDD.
  updates = documents.mapValues(
    value => new MongoUpdateWritable(
      new BasicDBObject("_id", value.get("_id")),  // Query
      new BasicDBObject("$set", new BasicDBObject("foo", "bar")),  // Update operation
      false,  // Upsert
      false   // Update multiple documents
    )
  )

  // Now we call saveAsNewAPIHadoopFile, using MongoUpdateWritable as the
  // value class.
  updates.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    classOf[Object],
    classOf[MongoUpdateWritable],
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
    outputConfig)
}
Python Example

As of version 1.5.0, mongo-hadoop now includes a Python package called pymongo-spark, which allows PySpark to interact with PyMongo, the MongoDB Python driver. It is highly recommended that you use this library when working with PySpark and MongoDB.

This example gives a very basic demonstration of how to use the library. For other information, such as how to install pymongo-spark, please consult the README for the package.

from pyspark import SparkContext, SparkConf

import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()


def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # This RDD *does not* contain key/value pairs, just documents.
    # If you want key/value pairs, use the mongoPairRDD method instead.
    rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')

    # Save this RDD back to MongoDB as a different collection.
    rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')

    # You can also read and write BSON:
    bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
    bson_rdd.saveToBSON('/path/to/bson/output')

if __name__ == '__main__':
    main()

Please find complete documentation in the README for pymongo-spark.

Python Example (pre-1.5.0)
from pyspark import SparkContext, SparkConf

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # MongoInputFormat allows us to read from a live MongoDB instance.
    # We could also use BSONFileInputFormat to read BSON snapshots.
    rdd = sc.newAPIHadoopRDD(
        inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable',
        conf={
            'mongo.input.uri': 'mongodb://localhost:27017/db.collection'
        }
    )

    # Save this RDD as a Hadoop "file".
    # The path argument is unused; all documents will go to "mongo.output.uri".
    rdd.saveAsNewAPIHadoopFile(
        path='file:///this-is-unused',
        outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable',
        conf={
            'mongo.output.uri': 'mongodb://localhost:27017/output.collection'
        }
    )

    # We can also save this back to a BSON file.
    rdd.saveAsNewAPIHadoopFile(
        path='hdfs://localhost:8020/user/spark/bson-demo',
        outputFormatClass='com.mongodb.hadoop.BSONFileOutputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable'
    )


if __name__ == '__main__':
    main()