In [0]:
import org.apache.spark.sql.types._
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.Model
import org.apache.spark.sql._

In [1]:
def readCsv(path: String) = spark.read
    .option("header", true)
    .option("quote", "\"")
    .option("delimiter", ",")
    .option("charset", "utf-8")
    .option("escape","\"")
    .option("multiline",true)
    .csv(path)

def getShape(dataset: DataFrame) = (dataset.count(), dataset.columns.length)

def isNullOrEmpty(column:Column): Column = {
    column.isNull or column <=> lit("") or column.isNaN or column <=> lit("nan")
}

In [2]:
val submitDataPath = "/data/submit.csv"
val testDataPath = "/data/test.csv"
val trainDataPath = "/data/train.csv"
val submitData = readCsv(submitDataPath) 
val testData = readCsv(testDataPath) 
val trainData = readCsv(trainDataPath)

In [3]:
trainData.show(10)
testData.show(10)
submitData.show(10)
println(s"Train Shape : ${getShape(trainData)}")
println(s"Test Shape : ${getShape(testData)}")
println(s"Submit Shape : ${getShape(submitData)}")

In [4]:
println("Train data summary")
println("Schema")
trainData.printSchema()
println("Summary (all)")
trainData.summary("count").show()
println("Summary (not nulls)")
println(s"""id - ${trainData.select("id").filter(!isNullOrEmpty($"id")).count()}""")
println(s"""title - ${trainData.select("title").filter(!isNullOrEmpty($"title")).count()}""")
println(s"""author - ${trainData.select("author").filter(!isNullOrEmpty($"author")).count()}""")
println(s"""text - ${trainData.select("text").filter(!isNullOrEmpty($"text")).count()}""")
println(s"""label - ${trainData.select("label").filter(!isNullOrEmpty($"label")).count()}""")
println("Summary (nulls)")
println(s"""id - ${trainData.select("id").filter(isNullOrEmpty($"id")).count()}""")
println(s"""title - ${trainData.select("title").filter(isNullOrEmpty($"title")).count()}""")
println(s"""author - ${trainData.select("author").filter(isNullOrEmpty($"author")).count()}""")
println(s"""text - ${trainData.select("text").filter(isNullOrEmpty($"text")).count()}""")
println(s"""label - ${trainData.select("label").filter(isNullOrEmpty($"label")).count()}""")

In [5]:
val joinedTestData = testData
    .join(submitData.withColumnRenamed("id", "id2"), $"id" === $"id2", "inner")
    .drop("id2")

val filledTrainData = trainData
    .withColumn("title", when(isNullOrEmpty($"title"), "").otherwise($"title"))
    .withColumn("author", when(isNullOrEmpty($"author"), "").otherwise($"author"))
    .withColumn("text", when(isNullOrEmpty($"text"), "").otherwise($"text"))
    
val filledTestData = joinedTestData
    .withColumn("title", when(isNullOrEmpty($"title"), "").otherwise($"title"))
    .withColumn("author", when(isNullOrEmpty($"author"), "").otherwise($"author"))
    .withColumn("text", when(isNullOrEmpty($"text"), "").otherwise($"text"))
    
val trainDataWithTotal = filledTrainData
    .withColumn("total", concat_ws(" ", $"title", $"author", $"text"))

val testDataWithTotal = filledTestData
    .withColumn("total", concat_ws(" ", $"title", $"author", $"text"))
    
val trainingData = trainDataWithTotal
    .select("label", "total")
    .withColumn("label", col("label").cast("int"))
    
val testingData = testDataWithTotal
    .select("label", "total")
    .withColumn("label", col("label").cast("int"))

In [6]:
trainingData.show(10)
testingData.show(10)

In [7]:
val tokenizer = new Tokenizer().setInputCol("total").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(30)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(5)

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))

In [8]:
val model = pipeline.fit(trainingData)
val transformedTrainingData = model.transform(trainingData)
val transformedTestingData = model.transform(testingData)

In [9]:
transformedTrainingData.show(10)

In [10]:
def logisticRegressionModelLearning(trainData: DataFrame, testData: DataFrame) = {
    val classifier = new LogisticRegression()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setAggregationDepth(2)
        .setThreshold(0.5)
        .setFamily("auto")
        .setStandardization(true)
        .setFitIntercept(true)
        .setMaxIter(100)
        .setTol(1E-6)
        .setRegParam(0.0)
        .setElasticNetParam(0.0)
    val model = classifier.fit(trainData)
    val predictions = model.transform(testData)
    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Model accuracy: ${accuracy}")
}
logisticRegressionModelLearning(transformedTrainingData, transformedTestingData)

In [11]:
def randomForestModelLearning(trainData: DataFrame, testData: DataFrame) = {
    val classifier = new RandomForestClassifier()
        .setLabelCol("label")
        .setFeaturesCol("features")
    val model = classifier.fit(trainData)
    val predictions = model.transform(testData)
    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Model accuracy: ${accuracy}")
}
randomForestModelLearning(transformedTrainingData, transformedTestingData)

In [12]:
def svmModelLearning(trainData: DataFrame, testData: DataFrame) = {
    val classifier = new LinearSVC()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setAggregationDepth(2)
        .setStandardization(true)
        .setTol(1E-6)
        .setFitIntercept(true)
        .setMaxIter(100)
        .setRegParam(0.0)
    val model = classifier.fit(trainData)
    val predictions = model.transform(testData)
    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Model accuracy: ${accuracy}")
}
svmModelLearning(transformedTrainingData, transformedTestingData)