In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
    .builder
    .appName("spark-model")
    .config("spark.driver.cores", "5")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.cores", "5")
    .config("spark.executor.memory", "8g")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "32g")
    .config("spark.dynamicAllocation.enabled", "true")
    .getOrCreate()

spark = org.apache.spark.sql.SparkSession@1845b51a


org.apache.spark.sql.SparkSession@1845b51a

In [2]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [3]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

Use Train-Validation Split to get better model.

In [4]:
val Array(training, test) = raw_sentiment.randomSplit(Array(0.9, 0.1), seed = 12345)
training.cache

training = [label: int, tweet: string]
test = [label: int, tweet: string]


[label: int, tweet: string]

In [5]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val rf = new RandomForestClassifier()
    .setLabelCol("label")
    .setFeaturesCol("features")
    .setNumTrees(10)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, rf))

tokenizer = tok_ac27dce37eaf
hashingTF = hashingTF_1e1a6b41b7dd
rf = rfc_c5d3f6700f16
pipeline = pipeline_e9786cc35e88


pipeline_e9786cc35e88

Use a ParamGridBuilder to construct a grid of parameters to search over.

In [6]:
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

val paramGrid = new ParamGridBuilder()
  .addGrid(rf.maxDepth, Array(5, 10))
  .addGrid(rf.numTrees, Array(20, 50))
  .build()

paramGrid = 


Array({
	rfc_c5d3f6700f16-maxDepth: 5,
	rfc_c5d3f6700f16-numTrees: 20
}, {
	rfc_c5d3f6700f16-maxDepth: 5,
	rfc_c5d3f6700f16-numTrees: 50
}, {
	rfc_c5d3f6700f16-maxDepth: 10,
	rfc_c5d3f6700f16-numTrees: 20
}, {
	rfc_c5d3f6700f16-maxDepth: 10,
	rfc_c5d3f6700f16-numTrees: 50
})


TrainValidationSplit will try all combinations of values and determine best model using the evaluator.

In [7]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  // 80% of the data will be used for training and the remaining 20% for validation.
  .setTrainRatio(0.8)
  // Evaluate up to 2 parameter settings in parallel
  .setParallelism(2)

trainValidationSplit = tvs_96c9401d91cd


tvs_96c9401d91cd

Run train validation split, and choose the best set of parameters.

In [8]:
val model = trainValidationSplit.fit(training).bestModel.asInstanceOf[PipelineModel]

model = pipeline_e9786cc35e88


pipeline_e9786cc35e88

Let's look on params for our best model

In [9]:
model.stages(2).asInstanceOf[RandomForestClassificationModel].extractParamMap

{
	rfc_c5d3f6700f16-cacheNodeIds: false,
	rfc_c5d3f6700f16-checkpointInterval: 10,
	rfc_c5d3f6700f16-featureSubsetStrategy: auto,
	rfc_c5d3f6700f16-featuresCol: features,
	rfc_c5d3f6700f16-impurity: gini,
	rfc_c5d3f6700f16-labelCol: label,
	rfc_c5d3f6700f16-maxBins: 32,
	rfc_c5d3f6700f16-maxDepth: 10,
	rfc_c5d3f6700f16-maxMemoryInMB: 256,
	rfc_c5d3f6700f16-minInfoGain: 0.0,
	rfc_c5d3f6700f16-minInstancesPerNode: 1,
	rfc_c5d3f6700f16-numTrees: 50,
	rfc_c5d3f6700f16-predictionCol: prediction,
	rfc_c5d3f6700f16-probabilityCol: probability,
	rfc_c5d3f6700f16-rawPredictionCol: rawPrediction,
	rfc_c5d3f6700f16-seed: 207336481,
	rfc_c5d3f6700f16-subsamplingRate: 1.0
}


Make predictions on test data.

In [10]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [13]:
val testResult = model.transform(test)

testResult.select($"tweet", $"label", getProbability($"probability").alias("clean_probability"), $"prediction").show

+--------------------+-----+-------------------+----------+
|               tweet|label|  clean_probability|prediction|
+--------------------+-----+-------------------+----------+
|      My current ...|    0|0.45799854653578903|       0.0|
|     I dont like ...|    0|0.47154405148038614|       0.0|
|  Jus Got Hom Fr....|    0| 0.4388917102765312|       0.0|
|  in cab headed t...|    0| 0.5341072869493625|       1.0|
|  now the pic won...|    0| 0.5360566658737981|       1.0|
|             over it|    0| 0.5308792350458302|       1.0|
|  went to maggie'...|    0|  0.447332356227742|       0.0|
| ... had weird en...|    0|  0.513398097735378|       1.0|
|      ... that's all|    0| 0.5358657398890049|       1.0|
| .... Life can be...|    0| 0.4628345037954727|       0.0|
|          @I_am_delo|    0| 0.5358657398890049|       1.0|
| Archie's annoyin...|    0| 0.5014728836966545|       1.0|
| Argh, the speake...|    0| 0.4720658465457641|       0.0|
| B- in genetics s...|    0|0.4374742823

testResult = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

Let's evaluate our best model

In [15]:
val bce = new BinaryClassificationEvaluator()
    .setLabelCol("label")
    .setMetricName("areaUnderROC")
    .setRawPredictionCol("probability")

bce.evaluate(testResult)

bce = binEval_ad71352aba2c


0.7231869604660494

AUROC = 0.72 - Good enough

In [16]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [17]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_e9786cc35e88


pipeline_e9786cc35e88

In [18]:
val predictionsDF = sameModel.transform(raw_sentiment)
predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|[@switchfoot, htt...|(1000,[7,14,21,54...|[21.0526438619503...|[0.42105287723900...|       1.0|
|    0|is upset that he ...|[is, upset, that,...|(1000,[170,193,22...|[25.0385188841850...|[0.50077037768370...|       0.0|
|    0|@Kenichan I dived...|[@kenichan, i, di...|(1000,[10,36,77,1...|[29.3783772814382...|[0.58756754562876...|       0.0|
|    0|my whole body fee...|[my, whole, body,...|(1000,[82,191,296...|[25.5832912735477...|[0.51166582547095...|       0.0|
|    0|@nationwideclass ...|[@nationwideclass...|(1000,[18,96,130,...|[27.4134510551815...|[0.54826902110363...|       0.0|
|    0|@

predictionsDF = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

In [20]:
predictionsDF
    .select($"tweet", $"label", getProbability($"probability").alias("clean_probability"), $"prediction")
    .show

+--------------------+-----+-------------------+----------+
|               tweet|label|  clean_probability|prediction|
+--------------------+-----+-------------------+----------+
|@switchfoot http:...|    0| 0.5789471227609927|       1.0|
|is upset that he ...|    0| 0.4992296223162992|       0.0|
|@Kenichan I dived...|    0|0.41243245437123427|       0.0|
|my whole body fee...|    0|0.48833417452904576|       0.0|
|@nationwideclass ...|    0| 0.4517309788963685|       0.0|
|@Kwesidei not the...|    0| 0.4878082939150626|       0.0|
|         Need a hug |    0| 0.5252106617974605|       1.0|
|@LOLTrish hey  lo...|    0| 0.5764273715757433|       1.0|
|@Tatiana_K nope t...|    0| 0.5026701540637027|       1.0|
|@twittera que me ...|    0| 0.5313245588887434|       1.0|
|spring break in p...|    0| 0.5343905845287309|       1.0|
|I just re-pierced...|    0| 0.4581465868830846|       0.0|
|@caregiving I cou...|    0| 0.4375072123793634|       0.0|
|@octolinz16 It it...|    0| 0.520684284