Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Commit

Permalink
New Spark examples, including Dataframes & SparkSQL, using Enron emai…
Browse files Browse the repository at this point in the history
…l dataset.
  • Loading branch information
Bryan Reinero committed Dec 3, 2015
1 parent 9905150 commit 9ef021f
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 1 deletion.
25 changes: 25 additions & 0 deletions build.gradle
Expand Up @@ -447,6 +447,31 @@ project(":examples/enron") {
}
}

project(":examples/enron/spark") {
archivesBaseName = "mongo-spark-enron"

dependencies {
compile project(":core")
compile project(":spark")
compile 'org.apache.spark:spark-sql_2.10:1.4.1'

testCompile files(project(':core').sourceSets.main.output)
testCompile files(project(':core').sourceSets.test.output)

}

jar {
from sourceSets.main.output
from project(':spark').sourceSets.main.output

configurations.compile.filter {
it.name.startsWith('mongo-java-driver')
}.each {
from zipTree(it)
}
}
}

project(":examples/sensors") {
uploadArchives.onlyIf { false }
dependencies {
Expand Down
@@ -0,0 +1,80 @@
package com.mongodb.spark.examples.enron;


import com.mongodb.hadoop.MongoInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import org.bson.BSONObject;
import scala.Tuple2;


/**
* Created by bryan on 12/3/15.
*/
public class DataframeExample {
public static void main ( String[] args ) {

JavaSparkContext sc = new JavaSparkContext( new SparkConf() );
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");

// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
"mongodb://localhost:27017/enron_mail.messages");

// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);

JavaRDD<Message> messages = documents.map(

new Function<Tuple2<Object, BSONObject>, Message>() {

public Message call(Tuple2<Object, BSONObject> tuple) {
Message m = new Message();
BSONObject header = (BSONObject) tuple._2.get("headers");

m.setTo( (String) header.get("To") );
m.setX_From( (String) header.get("From") );
m.setMessage_ID( (String) header.get( "Message-ID" ) );
m.setBody( (String) tuple._2.get( "body" ) );

return m;
}
}
);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame messagesSchema = sqlContext.createDataFrame(messages, Message.class);
messagesSchema.registerTempTable("messages");

DataFrame ericsMessages = sqlContext.sql("SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\" " );

ericsMessages.show();


messagesSchema.printSchema();
}

}
@@ -0,0 +1,109 @@
package com.mongodb.spark.examples.enron;

import com.mongodb.hadoop.MongoInputFormat;

import com.mongodb.hadoop.MongoOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import org.bson.BSONObject;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
* Created by bryan on 12/3/15.
*/
public class Enron {

public static void main ( String[] args ) {

JavaSparkContext sc = new JavaSparkContext( new SparkConf() );
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");

// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
"mongodb://localhost:27017/enron_mail.messages");

// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);

JavaRDD<String> edges = documents.flatMap(

new FlatMapFunction<Tuple2<Object, BSONObject>, String >() {

@Override
public Iterable<String> call(Tuple2<Object, BSONObject> t) throws Exception {

BSONObject header = (BSONObject) t._2.get("headers");
String to = (String) header.get("To");
String from = (String) header.get("From");

// each tuple in the set is an individual from|to pair
//JavaPairRDD< String, Integer > tuples = new JavaPairRDD<String, Integer>();
List<String> tuples = new ArrayList<String>();

if ( to != null && !to.isEmpty() )
for (String recipient : to.split(",")) {
String s = recipient.trim();
if (s.length() > 0) {
tuples.add( from + "|" + s );
}
}
return tuples;
}
}
);

JavaPairRDD<String, Integer> pairs = edges.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}
);

JavaPairRDD<String, Integer> counts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
);

// Create a separate Configuration for saving data back to MongoDB.
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/enron_mail.message_pairs");

// Save this RDD as a Hadoop "file".
// The path argument is unused; all documents will go to 'mongo.output.uri'.
counts.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
Object.class,
BSONObject.class,
MongoOutputFormat.class,
outputConfig
);

}
}
@@ -0,0 +1,170 @@
package com.mongodb.spark.examples.enron;

/**
* Created by bryan on 12/3/15.
*/
public class Message {

private String id;
private String body;
private String mailbox;
private String filename;
private String X_cc;
private String Subject;
private String X_Folder;
private String Content_Transfer_Encoding;
private String X_bcc;
private String To;
private String X_Origin;
private String X_FileName;
private String X_From;
private String Date;
private String X_To;
private String Message_ID;
private String Content_Type;
private String Mime_Version;

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getBody() {
return body;
}

public void setBody(String body) {
this.body = body;
}

public String getMailbox() {
return mailbox;
}

public void setMailbox(String mailbox) {
this.mailbox = mailbox;
}

public String getFilename() {
return filename;
}

public void setFilename(String filename) {
this.filename = filename;
}

public String getX_cc() {
return X_cc;
}

public void setX_cc(String x_cc) {
X_cc = x_cc;
}

public String getSubject() {
return Subject;
}

public void setSubject(String subject) {
Subject = subject;
}

public String getX_Folder() {
return X_Folder;
}

public void setX_Folder(String x_Folder) {
X_Folder = x_Folder;
}

public String getContent_Transfer_Encoding() {
return Content_Transfer_Encoding;
}

public void setContent_Transfer_Encoding(String content_Transfer_Encoding) {
Content_Transfer_Encoding = content_Transfer_Encoding;
}

public String getX_bcc() {
return X_bcc;
}

public void setX_bcc(String x_bcc) {
X_bcc = x_bcc;
}

public String getTo() {
return To;
}

public void setTo(String to) {
To = to;
}

public String getX_Origin() {
return X_Origin;
}

public void setX_Origin(String x_Origin) {
X_Origin = x_Origin;
}

public String getX_FileName() {
return X_FileName;
}

public void setX_FileName(String x_FileName) {
X_FileName = x_FileName;
}

public String getX_From() {
return X_From;
}

public void setX_From(String x_From) {
X_From = x_From;
}

public String getDate() {
return Date;
}

public void setDate(String date) {
Date = date;
}

public String getX_To() {
return X_To;
}

public void setX_To(String x_To) {
X_To = x_To;
}

public String getMessage_ID() {
return Message_ID;
}

public void setMessage_ID(String message_ID) {
Message_ID = message_ID;
}

public String getContent_Type() {
return Content_Type;
}

public void setContent_Type(String content_Type) {
Content_Type = content_Type;
}

public String getMime_Version() {
return Mime_Version;
}

public void setMime_Version(String mime_Version) {
Mime_Version = mime_Version;
}
}
2 changes: 1 addition & 1 deletion settings.gradle
@@ -1,2 +1,2 @@
include 'core', 'hive', 'pig', 'streaming', 'flume',
'spark', 'examples/treasury_yield', 'examples/enron', 'examples/sensors'
'spark', 'examples/treasury_yield', 'examples/enron', 'examples/enron/spark', 'examples/sensors'

0 comments on commit 9ef021f

Please sign in to comment.