# ENRON EMAILS: ADVANCED FEATURE ENGINEERING AND MODEL BUILDING TECHNIQUES

In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{VectorAssembler, ChiSqSelector, RegexTokenizer, StopWordsRemover, CountVectorizer, CountVectorizerModel, IDF, NGram, Word2Vec, Word2VecModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, RandomForestClassifier, RandomForestClassificationModel}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder }
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

import org.apache.spark.ml.linalg._

In [2]:
//I recommend using Brunel 2.2 with Spark 2.1.
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.2.jar

Using cached version of spark-kernel-brunel-all-2.2.jar


## 1. Load in Data

In [3]:
val rdd = sc.textFile("enron_textfile.txt").map(_.split("\\|")).map(arr => (arr(0).toInt, arr(1), arr(2).toDouble) )
rdd.take(1)

Array((1,North America's integrated electricity market requires cooperation on environmental policies Commission for Environmental Cooperation releases working paper on North America's electricity market Montreal, 27 November 2001 -- The North American Commission for Environmental Cooperation (CEC) is releasing a working paper highlighting the trend towards increasing trade, competition and cross-border investment in electricity between Canada, Mexico and the United States. It is hoped that the working paper, Environmental Challenges and Opportunities in the Evolving North American Electricity Market, will stimulate public discussion around a CEC symposium of the same title about the need to coordinate environmental policies trinationally as a North...

In [4]:
val docCleaner :String => String = doc => { doc.replaceAll("[^a-zA-Z0-9]", " ").replaceAll("\\s{2,}", " ").trim().toLowerCase() }
val UDF_docCleaner = udf(docCleaner)

In [5]:
val schema = new StructType().
    add(StructField("id", IntegerType, true)).
    add(StructField("email", StringType, true)).
    add(StructField("label", DoubleType, true))

In [6]:
val rowRDD = rdd.map(record => Row(record._1, record._2, record._3))

In [7]:
val corpus = spark.createDataFrame(rowRDD,schema).withColumn("doc", UDF_docCleaner($"email"))
corpus.persist()
println("First five documents: ")
corpus.limit(5).show()

First five documents: 
+---+--------------------+-----+--------------------+
| id|               email|label|                 doc|
+---+--------------------+-----+--------------------+
|  1|North America's i...|  0.0|north america s i...|
|  2|FYI -----Original...|  1.0|fyi original mess...|
|  3|14:13:53 Synchron...|  0.0|14 13 53 synchron...|
|  4|^ ----- Forwarded...|  1.0|forwarded by stev...|
|  5|----- Forwarded b...|  0.0|forwarded by stev...|
+---+--------------------+-----+--------------------+



In [8]:
val stats = corpus.agg(sum("label").as("N1"), count("*").as("N")).
    withColumn("N0", $"N"-$"N1").
    withColumn("event_rate", $"N1"/$"N").withColumn("baseline_Accuracy", when($"N0">$"N1", $"N0").otherwise($"N1")/$"N")
println("Label analysis:")
stats.show()

Label analysis:
+-----+---+-----+-------------------+------------------+
|   N1|  N|   N0|         event_rate| baseline_Accuracy|
+-----+---+-----+-------------------+------------------+
|139.0|855|716.0|0.16257309941520467|0.8374269005847953|
+-----+---+-----+-------------------+------------------+



## 2. Feature Engineering

In [9]:
val regexTokenizer = new RegexTokenizer().
    setInputCol("doc").
    setOutputCol("tokens").
    setPattern("\\s+").
    setMinTokenLength(2)
val remover = new StopWordsRemover().setCaseSensitive(false).setInputCol("tokens").setOutputCol("tokens_rm")

val TF_1gram = new CountVectorizer().setInputCol("tokens_rm").setOutputCol("rawFeatures_1gram").
    setMinTF(2).
    setVocabSize(500)
val idf_1gram = new IDF().setInputCol("rawFeatures_1gram").setOutputCol("TFIDF1gramFeatures")

val _2grams = new NGram().setInputCol("tokens_rm").setOutputCol("tokens_2gram").setN(2)
val TF_2gram = new CountVectorizer().setInputCol("tokens_2gram").setOutputCol("rawFeatures_2gram").
    setMinTF(2).
    setVocabSize(500)
val idf_2gram = new IDF().setInputCol("rawFeatures_2gram").setOutputCol("TFIDF2gramFeatures")

val TF_2gramBinary = new CountVectorizer().setInputCol("tokens_2gram").setOutputCol("rawFeatures_2gramBin").
    setMinTF(2).
    setBinary(true).
    setVocabSize(100)
val chiSqSelector = new ChiSqSelector().setFeaturesCol("rawFeatures_2gramBin").setLabelCol("label").setOutputCol("TFBin2gramFeaturesChiSq").setSelectorType("percentile").setPercentile(0.1)

val featAssembler = new VectorAssembler().setInputCols(Array("TFIDF1gramFeatures", "rawFeatures_2gramBin")).setOutputCol("TFIDFcombiFeatures")

val basicFeatEng_stages = Array(regexTokenizer, remover, TF_1gram, idf_1gram, _2grams, TF_2gram, idf_2gram, TF_2gramBinary, chiSqSelector,featAssembler)
val basicFeatEng_pl = new Pipeline().setStages(basicFeatEng_stages).fit(corpus)



In [10]:
val basicFeatEng_docTerm =  basicFeatEng_pl.transform(corpus)
basicFeatEng_docTerm.persist()
basicFeatEng_docTerm.printSchema()

root
 |-- id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- label: double (nullable = true)
 |-- doc: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tokens_rm: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures_1gram: vector (nullable = true)
 |-- TFIDF1gramFeatures: vector (nullable = true)
 |-- tokens_2gram: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- rawFeatures_2gram: vector (nullable = true)
 |-- TFIDF2gramFeatures: vector (nullable = true)
 |-- rawFeatures_2gramBin: vector (nullable = true)
 |-- TFBin2gramFeaturesChiSq: vector (nullable = true)
 |-- TFIDFcombiFeatures: vector (nullable = true)



In [11]:
val _1gramVocabulary = basicFeatEng_pl.stages(2).asInstanceOf[CountVectorizerModel].vocabulary
_1gramVocabulary.slice(0,10).foreach(println)

enron
ect
com
hou
power
2000
subject
2001
energy
mail


In [12]:
val _2gramVocabulary = basicFeatEng_pl.stages(5).asInstanceOf[CountVectorizerModel].vocabulary
_2gramVocabulary.slice(0,10).foreach(println)

ect ect
hou ect
enron enron
enron com
cc subject
corp enron
na enron
enron development
ect cc
ees ees


In [13]:
val split_weights = Array(0.7,0.3)
val splits = basicFeatEng_docTerm.randomSplit(split_weights, 123).zip(Array("train","test"))
val train = splits(0)._1
val test = splits(1)._1

## 3. Model Building and word2vec featurization

### Ngram Pipelines

In [14]:
val cv_names = Array("1gram", "2gram", "combi")

In [15]:
val cv_ngrams = cv_names.map(name => {val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("TFIDF" + name + "Features").setNumTrees(20)
                                val paramGrid= new ParamGridBuilder().addGrid(rf.maxDepth, Array(3, 6, 9)).build()
                                val binClassEval = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
                                val cv = new CrossValidator().setEstimator(rf).setEstimatorParamMaps(paramGrid).setEvaluator(binClassEval).setNumFolds(3).fit(train)
                                val bestModel = cv.bestModel
                                val cvReport = cv.getEstimatorParamMaps.zip(cv.avgMetrics).sortBy(-_._2)
                                println("")
                                println(name)
                                cvReport.foreach(println)
                               (name, cv)
                              })

1gram
({
	rfc_932fd31a16ad-maxDepth: 6
},0.8687454634010725)
({
	rfc_932fd31a16ad-maxDepth: 9
},0.8597984709533524)
({
	rfc_932fd31a16ad-maxDepth: 3
},0.8505288738762268)

2gram
({
	rfc_b69c3a5b4b1c-maxDepth: 9
},0.7282307525606881)
({
	rfc_b69c3a5b4b1c-maxDepth: 6
},0.7181896844765344)
({
	rfc_b69c3a5b4b1c-maxDepth: 3
},0.697259190464961)

combi
({
	rfc_dc81a834333a-maxDepth: 9
},0.867121910553916)
({
	rfc_dc81a834333a-maxDepth: 6
},0.8653881242445118)
({
	rfc_dc81a834333a-maxDepth: 3
},0.8514992759213362)


### Word2Vec pipeline

Word2Vec computes distributed vector representation of words, taking into account semantics relationships between words, through context window analysis and a neural network model. Word2Vec maps every word to a vector that represent that word in a feature space, encapsulating semanting relationships

There are two main approaches:
* CBOW: Continous Bag Of Words: Predict a word from its context
* Skip-gram: Predict a context from each word

The neural network only has one hidden layer and the vector representation is gathered from neuron weights. If two words have similar context, (therefore similar meaning) they will be represented by similar vectors

In [16]:
//estimator
val word2Vec = new Word2Vec().
    setInputCol("tokens_rm").
    setOutputCol("w2vFeatures").
    setMinCount(5).
    setWindowSize(5)
//estimator
val rf_w2v = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("w2vFeatures").setNumTrees(20)
//grid param
val paramGrid_w2v = new ParamGridBuilder().addGrid(rf_w2v.maxDepth, Array(3, 6, 9)).addGrid(word2Vec.vectorSize, Array(10, 20, 30)).build()
//evaluator                                                   
val binClassEval = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
                                                   
val w2vStages = Array(word2Vec, rf_w2v)
val w2v_pl = new Pipeline().setStages(w2vStages)
val cv_w2v = new CrossValidator().setEstimator(w2v_pl).setEstimatorParamMaps(paramGrid_w2v).setEvaluator(binClassEval).setNumFolds(3).fit(basicFeatEng_docTerm)



In [17]:
val bestModel_w2v = cv_w2v.bestModel
val cvReport_w2v = cv_w2v.getEstimatorParamMaps.zip(cv_w2v.avgMetrics).sortBy(-_._2)
println("")
cvReport_w2v.foreach(println)

({
	rfc_dd40def74c5d-maxDepth: 3,
	w2v_6062a1fa540a-vectorSize: 20
},0.8418809269838038)
({
	rfc_dd40def74c5d-maxDepth: 6,
	w2v_6062a1fa540a-vectorSize: 30
},0.8412259838714485)
({
	rfc_dd40def74c5d-maxDepth: 6,
	w2v_6062a1fa540a-vectorSize: 20
},0.8387943326279337)
({
	rfc_dd40def74c5d-maxDepth: 9,
	w2v_6062a1fa540a-vectorSize: 20
},0.8350356015024923)
({
	rfc_dd40def74c5d-maxDepth: 3,
	w2v_6062a1fa540a-vectorSize: 30
},0.8306138328289846)
({
	rfc_dd40def74c5d-maxDepth: 9,
	w2v_6062a1fa540a-vectorSize: 30
},0.8274166202443218)
({
	rfc_dd40def74c5d-maxDepth: 3,
	w2v_6062a1fa540a-vectorSize: 10
},0.8264827398020201)
({
	rfc_dd40def74c5d-maxDepth: 6,
	w2v_6062a1fa540a-vectorSize: 10
},0.8152066378386517)
({
	rfc_dd40def74c5d-maxDepth: 9,
	w2v_6062a1fa540a-vectorSize: 10
},0.8073612937327077)


## 4. Model Assessment

In [30]:
val cv_array :Array[(String, org.apache.spark.ml.tuning.CrossValidatorModel)] = cv_ngrams ++ Array( ("w2v", cv_w2v) )

In [31]:
case class evalRecord(predictions :org.apache.spark.sql.DataFrame, model :org.apache.spark.ml.tuning.CrossValidatorModel, modelDesc :String, dataset :String, evalMetric :Double)

val binEval = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("label").setMetricName("areaUnderROC")

def CVEvaluatorBundle(binEval :BinaryClassificationEvaluator, cv_array: Array[(String, org.apache.spark.ml.tuning.CrossValidatorModel)], splits :Array[(org.apache.spark.sql.DataFrame, String)] ) :Array[evalRecord] = {
   val eval_tuples = cv_array.map(cvmodel => {
    val bestModel =  cvmodel._2.bestModel
    val preds = splits.map(dataset => (bestModel.transform(dataset._1), cvmodel._2, dataset._2) )
    val evalMetric = preds.map( preds => ( preds._1, preds._2, cvmodel._1, preds._3, binEval.evaluate(preds._1) ) ) //(predictions[DF], model[String], dataset[String], evalMetric[Double])
                               evalMetric}
                                  ).flatMap(x =>  x).sortBy(-_._5).map(x => evalRecord(x._1, x._2, x._3, x._4, x._5))  
    eval_tuples
}

val cv_eval = CVEvaluatorBundle(binEval, cv_array, splits)
cv_eval

Array(evalRecord([id: int, email: string ... 15 more fields],cv_079ef8a4491d,combi,train,0.9766666666666665), evalRecord([id: int, email: string ... 15 more fields],cv_9fd85a2de99e,1gram,train,0.958122489959839), evalRecord([id: int, email: string ... 16 more fields],cv_2a772717c894,w2v,train,0.9057329317269073), evalRecord([id: int, email: string ... 16 more fields],cv_2a772717c894,w2v,test,0.8877911079745943), evalRecord([id: int, email: string ... 15 more fields],cv_cee8ee5a8b6d,2gram,train,0.8461144578313252), evalRecord([id: int, email: string ... 15 more fields],cv_9fd85a2de99e,1gram,test,0.8380969183721478), evalRecord([id: int, email: string ... 15 more fields],cv_079ef8a4491d,combi,test,0.7928134556574926), evalRecord([id: int, email: string ... 15 m...

In [36]:
val evalReport = cv_eval.map(record =>  (record.modelDesc, record.dataset, record.evalMetric) )
val evalReport_df = spark.createDataFrame(evalReport).toDF("modelDesc", "dataset", "evalMetric")
println("Full List of models: ")
evalReport_df.show()
println("Best Performant model in Test Dataset: ")
evalReport_df.filter($"dataset" === lit("test")).orderBy($"evalMetric".desc).limit(1).show()

Full List of models: 
+---------+-------+------------------+
|modelDesc|dataset|        evalMetric|
+---------+-------+------------------+
|    combi|  train|0.9766666666666665|
|    1gram|  train| 0.958122489959839|
|      w2v|  train|0.9057329317269073|
|      w2v|   test|0.8877911079745943|
|    2gram|  train|0.8461144578313252|
|    1gram|   test|0.8380969183721478|
|    combi|   test|0.7928134556574926|
|    2gram|   test|0.7754645965655139|
+---------+-------+------------------+

Best Performant model in Test Dataset: 
+---------+-------+------------------+
|modelDesc|dataset|        evalMetric|
+---------+-------+------------------+
|      w2v|   test|0.8877911079745943|
+---------+-------+------------------+



Indeed, the best performant model in test dataset is not the one with best CV evaluation metrics (1gram, rf with maxDetph=6) but a word2vec (vector size of 20, rf with maxDetph=3)


### Word2Vec Exploratory Analysis

In [45]:
val pl_model :PipelineModel = bestModel_w2v.asInstanceOf[PipelineModel]
val w2v_model :Word2VecModel = pl_model.stages(0).asInstanceOf[Word2VecModel]

In [47]:
println("Top ten most similar words to energy in w2v feature space: ")
w2v_model.findSynonyms("energy", 10).show()

+------------+------------------+
|        word|        similarity|
+------------+------------------+
|       state| 0.851428330806497|
|       power|0.8225366866719859|
|        said|0.7708845933927029|
|    industry|0.7412072444116364|
|   wholesale|0.7246787905599046|
|  california|0.7244068972467216|
| electricity|0.7077743453046441|
|     markets|0.7022909007839018|
|deregulation|0.6939859421820851|
|    agreeing| 0.693162339732354|
+------------+------------------+



In [48]:
println("Top ten most similar words to power in w2v feature space: ")
w2v_model.findSynonyms("power", 10).show()

Top ten most similar words to energy in w2v feature space: 
+------------+------------------+
|        word|        similarity|
+------------+------------------+
| electricity|0.9284407403160926|
|       state|0.9007867689385296|
|deregulation|0.8960683589269083|
|     markets|0.8602079498272223|
|        said| 0.853785631969866|
|   utilities|0.8537180649752815|
|   supplying|0.8535447682240689|
|     western|0.8519880326027605|
|    industry|0.8516263994010073|
| politicians|0.8496277609003039|
+------------+------------------+

