Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial checkin

  • Loading branch information...
commit 5e2bc87b5791832af649331fcb4ccdd9f3492736 0 parents
@tlockney authored
40 build.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project name="mongodb-sink" default="jar">
+ <property name="javac.debug" value="on"/>
+ <property name="flume.base" value="../.."/>
+
+ <path id="classpath">
+ <!-- in case we are running in dev env -->
+ <pathelement location="${flume.base}/build/classes"/>
+ <fileset dir="${flume.base}/lib">
+ <include name="**/google-collect*.jar" />
+ <include name="**/guava*.jar" />
+ <include name="**/log4j-*.jar" />
+ <include name="**/slf4j-*.jar" />
+ </fileset>
+ <!-- in case we are running in release env -->
+ <fileset dir="${flume.base}">
+ <include name="flume-*.jar" />
+ </fileset>
+ <pathelement location="${flume.base}/lib/"/>
+ <pathelement location="./lib/mongo-2.3.jar"/>
+ </path>
+
+ <target name="jar">
+ <mkdir dir="build"/>
+ <mkdir dir="build/classes"/>
+
+ <javac srcdir="./src/java" destdir="build/classes" debug="${javac.debug}">
+ <classpath refid="classpath"/>
+ </javac>
+
+ <jar jarfile="mongodb-sink_plugin.jar" basedir="build/classes"/>
+ </target>
+
+ <target name="clean">
+ <echo message="Cleaning generated files and stuff"/>
+ <delete dir="build" />
+ <delete file="mongodb-sink_plugin.jar" />
+ </target>
+
+</project>
BIN  lib/mongo-2.3.jar
Binary file not shown
102 src/java/mongodb/MongoDBSink.java
@@ -0,0 +1,102 @@
+package mongodb;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.util.Pair;
+import com.google.common.base.Preconditions;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.Mongo;
+import com.mongodb.MongoException;
+
+public class MongoDBSink extends EventSink.Base {
+ static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
+
+ private String serverName;
+ private int serverPort;
+ private String dbName;
+ private String collName;
+ private Mongo mongo;
+ private DB db;
+ private DBCollection collection;
+
+ public MongoDBSink(String server, String port, String dbName, String collName) {
+ this.serverName = server;
+ this.serverPort = Integer.parseInt(port);
+ this.dbName = dbName;
+ this.collName = collName;
+ }
+
+ @Override
+ public synchronized void append(Event e) throws IOException {
+ BasicDBObject entry = new BasicDBObject();
+ entry.put("timestamp", new Date(e.getTimestamp()));
+ entry.put("hostname", e.getHost());
+ entry.put("priority", e.getPriority().name());
+ entry.put("message", new String(e.getBody()));
+ Map<String, byte[]> metadata = e.getAttrs();
+ if (!metadata.isEmpty()) {
+ BasicDBObject metadataEntry = new BasicDBObject();
+ for (String key: metadata.keySet()) {
+ metadataEntry.put(key, new String(metadata.get(key)));
+ }
+ entry.put("metadata", metadataEntry);
+ }
+ collection.insert(entry);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mongo.close();
+ }
+
+ @Override
+ public void open() throws IOException {
+ try {
+ mongo = new Mongo(serverName, serverPort);
+ db = mongo.getDB(dbName);
+ collection = db.getCollection(collName);
+ } catch (UnknownHostException e) {
+ LOG.error("Could not find specified server.", e);
+ } catch (MongoException e) {
+ LOG.error("Error connecting to server.", e);
+ }
+ }
+
+ public static SinkBuilder builder() {
+ return new SinkBuilder() {
+ // construct a new parameterized sink
+ @Override
+ public EventSink build(Context context, String... argv) {
+ Preconditions.checkArgument(argv.length == 3,
+ "usage: mongoDBSink(\"server\",\"port\",\"db\",\"collection\")");
+
+ return new MongoDBSink(argv[0], argv[1], argv[2], argv[3]);
+ }
+ };
+ }
+
+ /**
+ * This is a special function used by the SourceFactory to pull in this class
+ * as a plugin sink.
+ */
+ public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
+ List<Pair<String, SinkBuilder>> builders =
+ new ArrayList<Pair<String, SinkBuilder>>();
+ builders.add(new Pair<String, SinkBuilder>("mongoDBSink", builder()));
+ return builders;
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.