In [1]:
import org.apache.spark.sql.SparkSession;

//import statistics.functions._;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.types._;
import org.apache.spark.sql.functions._;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.Model;
import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier};
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,MulticlassClassificationEvaluator};
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.ml.feature.{RegexTokenizer, NGram};
import org.apache.spark.ml.feature.{HashingTF, IDF};
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder};

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.Model
import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.{RegexTokenizer, NGram}
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}


In [3]:
val path: String = "hdfs://localhost:9000/TextMining/tokens/";

// "rate" must be Double because it can be easily binarized by Spark
val original_schema = new StructType(Array(
  StructField("product",          StringType,  true),
  StructField("votes",            IntegerType, true),
  StructField("rate",             DoubleType,  true),
  StructField("original_text",    StringType,  true),
  StructField("text",             StringType,  true),
  StructField("summary",          StringType,  true)));

val original_data:DataFrame = spark.read
  .options(Map("delimiter" -> "\t"))
  .schema(original_schema)
  .csv(path)
  .na.drop();

path: String = hdfs://localhost:9000/TextMining/tokens/
original_schema: org.apache.spark.sql.types.StructType = StructType(StructField(product,StringType,true), StructField(votes,IntegerType,true), StructField(rate,DoubleType,true), StructField(original_text,StringType,true), StructField(text,StringType,true), StructField(summary,StringType,true))
original_data: org.apache.spark.sql.DataFrame = [product: string, votes: int ... 4 more fields]


In [4]:
original_data.show(5)

+----------+-----+----+--------------------+--------------------+--------------------+
|   product|votes|rate|       original_text|                text|             summary|
+----------+-----+----+--------------------+--------------------+--------------------+
|0143065971|    4| 5.0|This is a masterp...|masterpiec someon...|Fantastic Book Ab...|
|0143065971|    1| 5.0|Great condition a...| great condit great |          Five Stars|
|1423600150|    1| 5.0|Excellent book on...|excel sauc fun tr...|           excellent|
|1423600150|    1| 5.0|          Great book|              great |          Five Stars|
|1423600150|    1| 5.0|   Great mexi stuff.|   great mexi stuff |          Five Stars|
+----------+-----+----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [4]:
/* original_data.select("summary","rate","votes")
 *    .groupBy("summary")
 *    .agg(mean("rate"), mean("votes"), count("summary"))
 *   .orderBy(desc("count(summary)"))
 */

In [5]:
val binarizer = new Binarizer()
  .setInputCol("rate")
  .setOutputCol("label")
  .setThreshold(3.5);

// get n-grams
val tokenizer = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("tokens")
  .setPattern("\\W");
val ngrams = new NGram()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("n-grams");

// calc tf-idf 
val tf = new HashingTF()
  .setInputCol(ngrams.getOutputCol)
  .setOutputCol("tf");
val idf = new IDF()
  .setInputCol(tf.getOutputCol)
  .setOutputCol("tf-idf")
  .setMinDocFreq(3);

// build the classifier
val classifierMod = new LogisticRegression()
  .setMaxIter(10)
  .setFeaturesCol(idf.getOutputCol)
  .setLabelCol(binarizer.getOutputCol);

// this is the pipeline that data follows to be evaluated
val pipeline = new Pipeline()
  .setStages(Array(binarizer, tokenizer, ngrams, tf, idf, classifierMod));

// a little of optimization: try different hyperparameters
val paramGrid = new ParamGridBuilder()
  .addGrid(classifierMod.regParam, Array(0.01, 0.05, 0.1))
  .addGrid(ngrams.n, Array(1, 2, 3))
  .build();

// do it with a cross validation on the train set (3 folds)
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3);

binarizer: org.apache.spark.ml.feature.Binarizer = Binarizer: uid=binarizer_f6c1a0c2d18d
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_c5e5c52b6619
ngrams: org.apache.spark.ml.feature.NGram = NGram: uid=ngram_5145290d4b78, n=2
tf: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_3d6a0d0708ea, binary=false, numFeatures=262144
idf: org.apache.spark.ml.feature.IDF = idf_c79ca263202a
classifierMod: org.apache.spark.ml.classification.LogisticRegression = logreg_337cbecdac6b
pipeline: org.apache.spark.ml.Pipeline = pipeline_0abfdf22723d
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	ngram_5145290d4b78-n: 1,
	logreg_337cbecdac6b-regParam: 0.01
}, {
	ngram_5145290d4b78-n: 2,
	logreg_337cbecdac6b-regParam: 0.01
}, {
	ngram_5145290d4b78-n: 3,
	l...


In [6]:
// spark.sql("SELECT summary, COUNT(*) FROM original_data")

In [7]:
println("Training... ");
val model = cv.fit(original_data);
println("done!");

// print results
for (i <- 0 until model.avgMetrics.size) {
  println("\n\n");
  println(model.getEstimatorParamMaps(i));
  println(model.avgMetrics(i));
}

Training... 
done!



{
	ngram_5145290d4b78-n: 1,
	logreg_337cbecdac6b-regParam: 0.01
}
0.9019483361008155



{
	ngram_5145290d4b78-n: 2,
	logreg_337cbecdac6b-regParam: 0.01
}
0.8985675928328255



{
	ngram_5145290d4b78-n: 3,
	logreg_337cbecdac6b-regParam: 0.01
}
0.7627563338905388



{
	ngram_5145290d4b78-n: 1,
	logreg_337cbecdac6b-regParam: 0.05
}
0.9036257241517349



{
	ngram_5145290d4b78-n: 2,
	logreg_337cbecdac6b-regParam: 0.05
}
0.9044877488592841



{
	ngram_5145290d4b78-n: 3,
	logreg_337cbecdac6b-regParam: 0.05
}
0.7693852143472046



{
	ngram_5145290d4b78-n: 1,
	logreg_337cbecdac6b-regParam: 0.1
}
0.9034636511888169



{
	ngram_5145290d4b78-n: 2,
	logreg_337cbecdac6b-regParam: 0.1
}
0.906886374283741



{
	ngram_5145290d4b78-n: 3,
	logreg_337cbecdac6b-regParam: 0.1
}
0.7727091640831082


model: org.apache.spark.ml.tuning.CrossValidatorModel = CrossValidatorModel: uid=cv_02ece6023b0e, bestModel=pipeline_0abfdf22723d, numFolds=3


# Random forest multiclassifier

In [11]:
// get n-grams
val tokenizer = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("tokens")
  .setPattern("\\W");
val ngrams = new NGram()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("n-grams");

// calc tf-idf 
val tf = new HashingTF()
  .setInputCol(ngrams.getOutputCol)
  .setOutputCol("tf");
val idf = new IDF()
  .setInputCol(tf.getOutputCol)
  .setOutputCol("tf-idf")
  .setMinDocFreq(3);

// build the classifier
val classifierMod = new RandomForestClassifier()
  .setFeaturesCol(idf.getOutputCol)
  .setLabelCol("rate");

// this is the pipeline that data follows to be evaluated
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, ngrams, tf, idf, classifierMod));

// a little of optimization: try different hyperparameters
val paramGrid = new ParamGridBuilder()
  .addGrid(classifierMod.numTrees, Array(3, 10, 20, 50, 100))
  .addGrid(ngrams.n, Array(1, 2, 3))
  .build();

val evaluator = new MulticlassClassificationEvaluator().setLabelCol("rate")

// do it with a cross validation on the train set (3 folds)
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3);

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_45f24f1e0b77
ngrams: org.apache.spark.ml.feature.NGram = NGram: uid=ngram_9e38137db5e3, n=2
tf: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_71d6484d363e, binary=false, numFeatures=262144
idf: org.apache.spark.ml.feature.IDF = idf_69ddbc2541bb
classifierMod: org.apache.spark.ml.classification.RandomForestClassifier = rfc_72eb5de8f492
pipeline: org.apache.spark.ml.Pipeline = pipeline_2abf875b320e
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	ngram_9e38137db5e3-n: 1,
	rfc_72eb5de8f492-numTrees: 3
}, {
	ngram_9e38137db5e3-n: 2,
	rfc_72eb5de8f492-numTrees: 3
}, {
	ngram_9e38137db5e3-n: 3,
	rfc_72eb5de8f492-numTrees: 3
}, {
	ngram_9e38137db5e3-n: 1,
	rfc_72eb5de8f492-numTrees: 10
}, {
	ngra...


In [None]:
print("Training... ");
val model = cv.fit(original_data)
print("done!")

Training... 