## First, add the following artifacts to your interpreter

Artifact: edu.columbia.tjw:item-spark:1.5.1-spark3
Exclude: com., org., io.*

Artifact: edu.columbia.tjw:item:1.5.1
Exclude: 

## Now do all the imports to set up the code we will use later. 

In [2]:
%spark

import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import scala.collection.immutable.TreeSet;
import scala.collection.mutable.SortedSet;
import scala.collection.GenSeq;

import scala.collection.mutable.ListBuffer;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.linalg.Vector;

import org.apache.spark.ml.classification.ProbabilisticClassificationModel;
import org.apache.spark.ml.classification.ProbabilisticClassifier;




In [3]:
%spark


## print versions

In [5]:
%spark

// Print versions.

println("Scala version: " + util.Properties.versionString);
println("Spark version: " + sc.version);
println("Java version: " + System.getProperty("java.version"))

## Assemble functions to be used later. 


In [7]:
%spark

val b2d = (x:Boolean) => if(x) 1.0 else 0.0
val b2dUdf = udf(b2d);

def filterData (raw : DataFrame) : DataFrame = {
    var df_filtered = raw.filter($"MTM_LTV" > 0.0).filter($"MTM_LTV" < 2.0);
    df_filtered = df_filtered.filter($"INCENTIVE" > -1.0).filter($"CREDIT_SCORE" > 0);
    df_filtered = df_filtered.filter($"NEXT_STATUS" >= 0).filter($"NEXT_STATUS" <= 2);
    df_filtered = df_filtered.filter($"AGE" >= 0).filter($"STATUS" === 1);
    df_filtered = df_filtered.filter($"TERM" >= 0);
    df_filtered = df_filtered.filter($"MI_PERCENT" >= 0);
    df_filtered = df_filtered.filter($"UNIT_COUNT" >= 0);
    df_filtered = df_filtered.filter($"ORIG_LTV" >= 0);
    df_filtered = df_filtered.filter($"ORIG_CLTV" >= 0);
    df_filtered = df_filtered.filter($"ORIG_DTI" >= 0);
    df_filtered = df_filtered.filter($"ORIG_UPB" >= 0);
    df_filtered = df_filtered.filter($"ORIG_INTRATE" >= 0);
    
    // Go ahead and add the status columns here. 
    df_filtered = df_filtered.withColumn("actual_p", expr(" case when next_status = 0 then 1.0 else 0.0 end"));
    df_filtered = df_filtered.withColumn("actual_c", expr(" case when next_status = 1 then 1.0 else 0.0 end"));
    df_filtered = df_filtered.withColumn("actual_3", expr(" case when next_status = 2 then 1.0 else 0.0 end"));
    
    return df_filtered;
};

def convertBoolColumns(data: DataFrame, enumColumns: Array[String]) : DataFrame = {
    var d2 = data;
    val type_map = data.dtypes.groupBy(_._1).map { case (k,v) => (k,v.map(_._2))};
    
    for(cname <- enumColumns) {
        if(type_map(cname)(0).equals("BooleanType")) {
            // We need to convert this to a string type. 
            d2 = d2.withColumn(cname, b2dUdf(col(cname))); 
        }
    }
    
    return d2;
}

    

In [8]:
%spark


// This function generates the appropriate encoders for a dataframe. 
def generateEncoders(data: DataFrame, enumColumns: Array[String]) : PipelineModel = {
    val indexCols = enumColumns.map((s:String) => s + "_Index")
    val vecCols = enumColumns.map((s:String) => s + "_Vec")
    var stages  = new ListBuffer[PipelineStage]();
    
    for(i <- 0 to indexCols.length - 1) {
        val strCol = enumColumns(i);
        val indexCol = indexCols(i);
        val vecCol = vecCols(i);

        
        var indexer = new StringIndexer().setInputCol(strCol).setOutputCol(indexCol)
        .setStringOrderType("frequencyAsc");
        stages.append(indexer);
    }
    
    var onehot = new OneHotEncoder().setInputCols(indexCols).setOutputCols(vecCols).setDropLast(true);
    stages.append(onehot);
    
    val pipeline = new Pipeline()
        .setStages(stages.toArray)
    
    return pipeline.fit(data);
}


def applyEncoders(data: DataFrame, model: PipelineModel, binaryCols : ListBuffer[String]) : DataFrame = {
    binaryCols.clear();
    var df_output = model.transform(data);
    
    val vToA: Any => Array[Double] = _.asInstanceOf[Vector].toArray
    val vToAUdf = udf(vToA);
    
    
    for(next <- model.stages) {
        if(next.isInstanceOf[StringIndexerModel]) {
            // Add a column for each potential value. 
            var indexer = next.asInstanceOf[StringIndexerModel]
            val labels = indexer.labels;
            val inputCol = indexer.getInputCol;
            
            for(i <- 0 to labels.length - 2) {
                val labelName = labels(i).replaceAll("[^a-zA-Z0-9_]", "_");
                val colName = (inputCol + "_" + labelName);
                
                df_output = df_output.withColumn(colName, element_at(vToAUdf(col(inputCol + "_Vec")), 1 + i))
                binaryCols += colName;
            }
        }
    }
    
    return df_output;
    }


In [9]:
%spark

import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.ml.Estimator

// Super ugly. The Probability array is a DenseVector, and basically none of the functions work on it. 
def convertProbArray (raw : DataFrame, table_name : String) : DataFrame = {
    return raw.withColumn("prob_array",vector_to_array(functions.col("probability")));
};

def expandColumns (raw : DataFrame, table_name : String) : DataFrame = {
    var expanded = convertProbArray(raw, table_name);
    
    val distEntropy = (x: GenSeq[Double]) => {
        var sum = 0.0;
        
        for(next <- x) {
            if(next > 0) {
                sum = sum + (next*Math.log(next))
            }
        }
        
         -1.0 * sum    
    }
    
    val deUdf = udf(distEntropy)
    
    // Now split out the probabilities.
    expanded = expanded.withColumn("prob_p", element_at($"prob_array", 1)).withColumn("prob_c",element_at($"prob_array", 2)).withColumn("prob_3", element_at($"prob_array", 3));
    expanded = expanded.withColumn("h_g", deUdf(col("prob_array")));
    expanded = expanded.withColumn("h_fg", expr(" -1.0 * log(prob_array[next_status])"));
    expanded = expanded.withColumn("table_name", functions.lit(table_name));

    expanded.registerTempTable(table_name);
    return expanded;
};

def doFit[A <: ProbabilisticClassifier[Vector,A,B],B <: ProbabilisticClassificationModel[Vector,B]](classifier : ProbabilisticClassifier[Vector,A,B], assembled_train : DataFrame) : B = {
    var updated = classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures");
    
    val start =  System.currentTimeMillis();
    val model = updated.fit(assembled_train);
    val elapsed = System.currentTimeMillis() - start;
    println("Fitting Time[" + classifier.getClass() + "] (ms): " + elapsed);
    return model;
}

// used to measure performance of eachh pipeline model
def doFit2[A <: ProbabilisticClassifier[Vector,A,B],B <: ProbabilisticClassificationModel[Vector,B]](classifier : ProbabilisticClassifier[Vector,A,B], df_train_limit : DataFrame, assembler : PipelineModel) : PipelineModel = {
    var updated = classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures");
    
    var mlp_pipeline = new Pipeline().setStages(Array(assembler, updated));

    val start =  System.currentTimeMillis();
    val model = mlp_pipeline.fit(df_train_limit);
    val elapsed = System.currentTimeMillis() - start;
    println("Fitting Time[" + classifier.getClass() + "] : " + elapsed + " ms.");
    return model;
}

def testClassifier (df_test : DataFrame, model : PipelineModel, tableName: String) : DataFrame = {
    return expandColumns(model.transform(df_test),tableName);
}

def testClassifier2 (df_test : DataFrame, model : ProbabilisticClassificationModel[Vector,_], tableName: String) : DataFrame = {
    return expandColumns(model.transform(df_test),tableName);
}


In [10]:
%spark

def generateAssembler(data: DataFrame, featureCols: Array[String]) : PipelineModel = {
    var features = SortedSet[String]()
    
    for(next <- featureCols) {
        if(data.columns.contains(next)) {
            features += next;
        }
        else if(data.columns.contains(next + "_Vec")) {
            features += (next + "_Vec");
        }
        else {
            throw new IllegalArgumentException("Invalid column: " + next);
        }
    }

    val assembler = new VectorAssembler().
      setInputCols(features.toArray).
      setOutputCol("features")
    
    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithStd(true)
      .setWithMean(true)

    val pipeline = new Pipeline()
        .setStages(Array[PipelineStage](assembler, scaler));
    
    return pipeline.fit(data); 
}




In [11]:
%spark

val enumCols = SortedSet[String]() ++ Array("OCCUPANCY_STATUS", "FIRSTTIME_BUYER", "TERM", "UNIT_COUNT", "PREPAYMENT_PENALTY")
val numericCols = SortedSet[String]() ++ Array("AGE", "MTM_LTV", "INCENTIVE", "CREDIT_SCORE", "MI_PERCENT", "ORIG_CLTV", "ORIG_DTI", "ORIG_UPB", "ORIG_INTRATE")

var df_raw : DataFrame = convertBoolColumns(filterData(spark.read.parquet("/Applications/zeppelin-0.10.1-bin-netinst/notebook/HW/df_c_1")), enumCols.toArray)
df_raw.registerTempTable("df_raw")

var binaryCols = new ListBuffer[String]();
var pipeline = generateEncoders(df_raw, enumCols.toArray);
df_raw = applyEncoders(df_raw, pipeline, binaryCols);

println("Cols: " + binaryCols)



In [12]:
%sql 
select age, count(1) AS value
from df_raw
group by age
order by age

## Prepare the data


In [14]:
%spark

// For now, just allow all features. 
val featureList = Array("OCCUPANCY_STATUS", "FIRSTTIME_BUYER", "TERM", "UNIT_COUNT", "PREPAYMENT_PENALTY", "AGE", "MTM_LTV", "INCENTIVE", "CREDIT_SCORE", "MI_PERCENT", "ORIG_CLTV", "ORIG_DTI", "ORIG_UPB", "ORIG_INTRATE");
val fittingSample : Int = 500 * 1000;

val assembler = generateAssembler(df_raw, featureList);
val df_assembled = assembler.transform(df_raw);

// Split these, allocating about 500k observations for training. 
var Array(df_train, df_test) = df_raw.randomSplit(Array(0.25, 0.75), seed=12345);
var Array(assembled_train, assembled_test) = df_assembled.randomSplit(Array(0.25, 0.75), seed= 12345);

// Randomize the order of the training data. 
// This is small enough that we can pretty easily cache the whole thing.
df_train = df_train.orderBy(rand()).persist();
assembled_train = assembled_train.orderBy(rand()).persist();


## Fit LR and RF Models


In [16]:
%spark 

def generate_summary(df_test : DataFrame, model : PipelineModel, tableName: String) : DataFrame = {
    val results = testClassifier(df_test, model,tableName);
    val summary = results.groupBy("table_name").agg(
        functions.count("*").as("count"), functions.avg("h_g").as("distEntropy"), functions.avg("h_fg").as("h_fg"), 
        functions.avg("prob_p").as("prob_p"), functions.avg("prob_c").as("prob_c"), functions.avg("prob_3").as("prob_3"), 
        functions.avg("actual_p").as("actual_p"), functions.avg("actual_c").as("actual_c"), functions.avg("actual_3").as("actual_3")
        )
    
    return summary
}

def summarize_assembled(df_train : DataFrame, df_test: DataFrame, model : PipelineModel, tableName: String) : DataFrame = {
    val train_summary = generate_summary(df_train, model, tableName + "_train")
    val test_summary = generate_summary(df_test, model, tableName + "_test")
    val output = train_summary.union(test_summary).persist();
    return output
}



In [17]:
%spark

// var lr_classifier = new LogisticRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.001).setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures")
// var lr_pipeline = new Pipeline().setStages(Array(assembler, lr_classifier))
// var lrModel = lr_pipeline.fit(df_train.limit(fittingSample))


In [18]:
%spark

// var rf_classifier = new RandomForestClassifier().setSeed(12345).setNumTrees(12).setMaxDepth(10)
// rf_classifier = rf_classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures")
// var rf_pipeline = new Pipeline().setStages(Array(assembler, rf_classifier))
// var rf_model = rf_pipeline.fit(df_train.limit(fittingSample))

In [19]:
%spark

// var rf_insample = testClassifier(df_train.limit(fittingSample), rf_model,"rf_insample");
// var lr_insample = testClassifier(df_train.limit(fittingSample), lrModel,"lr_insample");
// var rf_indist = testClassifier(df_test, rf_model,"rf_indist");
// var lr_indist = testClassifier(df_test, lrModel,"lr_indist");


// var df_combined = summarize_assembled(df_train.limit(fittingSample), df_test, lrModel, "base_lr")
// df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, rf_model, "base_rf"))

// df_combined = df_combined.persist()
// df_combined.createOrReplaceTempView("df_combined")
// z.show(df_combined)

In [20]:
%spark

// df_combined = df_combined.persist()

// z.show(df_combined)

In [21]:
%spark
// used to measure performance of eachh pipeline model
// def doFit2[A <: ProbabilisticClassifier[Vector,A,B],B <: ProbabilisticClassificationModel[Vector,B]](classifier : ProbabilisticClassifier[Vector,A,B], df_train_limit : DataFrame, assembler : PipelineModel) : PipelineModel = {
//     var updated = classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures");
    
//     var mlp_pipeline = new Pipeline().setStages(Array(assembler, updated));

//     val start =  System.currentTimeMillis();
//     val model = mlp_pipeline.fit(df_train_limit);
//     val elapsed = System.currentTimeMillis() - start;
//     println("Fitting Time[" + classifier.getClass() + "] : " + elapsed + " ms.");
//     return model;
// }


In [22]:
%spark
// Default
val status_count = 3;
val layers = Array[Int](featureList.size, 8, 6, status_count);
// var mlp_classifier = new MultilayerPerceptronClassifier().setLayers(layers).setSeed(1234L).setMaxIter(100).setSolver("l-bfgs");
// mlp_classifier = mlp_classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("scaledFeatures");
// var mlp_pipeline = new Pipeline().setStages(Array(assembler, mlp_classifier));

// val start =  System.currentTimeMillis();
// var mlp_mode1 = mlp_pipeline.fit(df_train.limit(fittingSample));
// val elapsed = System.currentTimeMillis() - start;
// println("Fitting Time[ mlp_model ] (ms): " + elapsed);
   
   // need to use assembled_train
// val mlp_model = doFit(new MultilayerPerceptronClassifier().setLayers(layers).setSeed(1234L).setMaxIter(100).setSolver("l-bfgs"), assembled_train.limit(fittingSample));


In [23]:
%spark
// use pipeline model to fit
// val mlp_model = doFit2(new MultilayerPerceptronClassifier().setLayers(layers).setSeed(1234L).setMaxIter(100).setSolver("l-bfgs"), df_train.limit(fittingSample),assembler);

var mlp_insample = testClassifier(df_train.limit(fittingSample), mlp_model,"mlp_insample");
var mlp_indist = testClassifier(df_test, mlp_model,"mlp_indist");

In [24]:
%spark
df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model, "base_mlp")).persist()
// z.show(df_combined)

In [25]:
%sql
(select "mlp_indist" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist ) 
UNION
(select "mlp_insample" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample )


In [26]:
%spark
val layers2 = Array[Int](featureList.size,8 ,6, status_count)
val mlp_model2 = doFit2(new MultilayerPerceptronClassifier().setLayers(layers2).setSeed(1234L).setMaxIter(100).setSolver("gd"), df_train.limit(fittingSample),assembler);

mlp_model2.save("/Applications/zeppelin-0.10.1-bin-netinst/notebook/HW/savedModel_151p_" + edu.columbia.tjw.item.util.random.RandomTool.randomString(10) + ".dat");

var mlp_insample2 = testClassifier(df_train.limit(fittingSample), mlp_model2,"mlp_insample_gd");
var mlp_indist2 = testClassifier(df_test, mlp_model2,"mlp_indist_gd");

// register this DataFrame first before querying it via %spark.sql
mlp_insample2.createOrReplaceTempView("mlp_insample2")
mlp_indist2.createOrReplaceTempView("mlp_indist2")
// df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model2, "mlp_gd")).persist()
// z.show(df_combined)

In [27]:
%sql
(select "mlp_indist2" AS label,count(*) AS count,  avg(h_g), sum(h_fg),avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist2 ) 
UNION
(select "mlp_insample2" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample2 )


In [28]:
%spark
val layers3 = Array[Int](featureList.size, 4, 4, 6, status_count)
val mlp_model3 = doFit2(new MultilayerPerceptronClassifier().setLayers(layers3).setSeed(1234L).setMaxIter(100).setSolver("l-bfgs"), df_train.limit(fittingSample),assembler);
var mlp_insample3 = testClassifier(df_train.limit(fittingSample), mlp_model3,"mlp_insample_3");
var mlp_indist3 = testClassifier(df_test, mlp_model3,"mlp_indist_3");

mlp_insample3.createOrReplaceTempView("mlp_insample3")
mlp_indist3.createOrReplaceTempView("mlp_indist3")

df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model3, "mlp_3")).persist()
// z.show(df_combined)

In [29]:
%sql
(select "mlp_indist3" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist3 ) 
UNION
(select "mlp_insample3" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample3 )


In [30]:
%spark
val layers4  = Array[Int](featureList.size, 4, 4, 6, status_count)
// setMaxIter(50

val mlp_model4 = doFit2(new MultilayerPerceptronClassifier().setLayers(layers4).setSeed(1234L).setMaxIter(300).setSolver("l-bfgs"), df_train.limit(fittingSample),assembler);

mlp_model4.save("/Applications/zeppelin-0.10.1-bin-netinst/notebook/HW/savedModel_151p_" + edu.columbia.tjw.item.util.random.RandomTool.randomString(10) + ".dat");

var mlp_insample4 = testClassifier(df_train.limit(fittingSample), mlp_model4,"mlp_insample_4");
var mlp_indist4 = testClassifier(df_test, mlp_model4,"mlp_indist_4");

mlp_insample4.createOrReplaceTempView("mlp_insample4")
mlp_indist4.createOrReplaceTempView("mlp_indist4")

// var df_combined = summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model4, "mlp_4")
// df_combined.persist()
// df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model4, "mlp_4")).persist()
// z.show(df_combined)


In [31]:
%sql
(select "mlp_indist4" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist4 ) 
UNION
(select "mlp_insample4" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample4 )



In [32]:
%spark

val layers5 = Array[Int](featureList.size, 4, 4, 3, 3, status_count)
val mlp_model5 = doFit2(new MultilayerPerceptronClassifier().setLayers(layers5).setSeed(1234L).setMaxIter(200).setSolver("l-bfgs"), df_train.limit(fittingSample),assembler);
var mlp_insample5 = testClassifier(df_train.limit(fittingSample), mlp_model5,"mlp_insample_5");
var mlp_indist5 = testClassifier(df_test, mlp_model5,"mlp_indist_5");
mlp_insample5.createOrReplaceTempView("mlp_insample5")
mlp_indist5.createOrReplaceTempView("mlp_indist5")

df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model5, "mlp_5")).persist()
// z.show(df_combined)

In [33]:
%spark
// z.show(df_combined)

In [34]:
%sql
(select "mlp_indist5" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist5 ) 
UNION
(select "mlp_insample5" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample5 )


In [35]:
%spark
val layers6 = Array[Int](featureList.size, 4,2,2,2,4, status_count)
val mlp_model6 = doFit2(new MultilayerPerceptronClassifier().setLayers(layers6).setSeed(1234L).setMaxIter(200).setSolver("l-bfgs"), df_train.limit(fittingSample),assembler);
var mlp_insample6 = testClassifier(df_train.limit(fittingSample), mlp_model6,"mlp_insample_6");
var mlp_indist6 = testClassifier(df_test, mlp_model6,"mlp_indist_6");
mlp_insample6.createOrReplaceTempView("mlp_insample6")
mlp_indist6.createOrReplaceTempView("mlp_indist6")

// df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model6, "mlp_6")).persist()
// z.show(df_combined)

In [36]:
%spark
var df_combined = summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model6, "mlp_6")
df_combined = df_combined.persist()

// df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, mlp_model6, "mlp_6")).persist()
z.show(df_combined)

In [37]:
%sql
(select "mlp_indist6" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_indist6 ) 
UNION
(select "mlp_insample6" AS label,count(*) AS count,  avg(h_g),sum(h_fg), avg(h_fg), avg(prob_p), avg(actual_p), avg(prob_c), avg(actual_c), avg(prob_3), avg(actual_3) from mlp_insample6 )



In [38]:
%spark


In [39]:
%spark


In [40]:
%spark


In [41]:
%spark


In [42]:
%spark


In [43]:
%spark


In [44]:
%spark


In [45]:
%spark


In [46]:
%spark


In [47]:
%spark


In [48]:
%spark


In [49]:
%spark


In [50]:
%spark


In [51]:
%spark


In [52]:
%spark


In [53]:
%spark


In [54]:
%spark


## Now try to fit an ITEM model

Make sure to save your fit, the fitting is quite expensive and you don't want to be doing this over and over again. 

However, if you change the features column, you will need to redo the fit, FYI. 


In [56]:
%spark

// Load the item code. 
import edu.columbia.tjw.item.spark.ItemClassifier;
import edu.columbia.tjw.item.spark.ItemClassificationModel;
import edu.columbia.tjw.item.spark.ItemClassifierSettings;
import edu.columbia.tjw.item.base.SimpleStatus;
import edu.columbia.tjw.item.util.EnumFamily;
import edu.columbia.tjw.item.spark.ItemClassifierSettings;

def generateItemSettings(data : DataFrame, features : Array[String], numericCols : Array[String], paramCount : Integer) : ItemClassifierSettings = {
    var featureList = features.toList.asJava;
    
    var curves = new java.util.TreeSet[String];
    curves.addAll(featureList);
    curves.retainAll(numericCols.toList.asJava);
    
    var settings = ItemClassifier.prepareSettings(data, "NEXT_STATUS", featureList, curves, paramCount);
    return settings;
}

In [57]:
%spark


if(false) {
    // ITEM models are controlled by telling it how many parameters it can add. It might stop before (if it can't find any more good ones), so this is just an upper limit.
    // Setting this to a huge value won't matter beyond a certain point (probably around 30 or so for our dataset sizes)
    // N.B: This is the number allowed for curves, enum columns will not count against this limit, so the effective parameter count reported will often be higher. 
    var parametersAllowed = 30
    
    var settings = generateItemSettings(df_raw, Array.concat((numericCols).toArray, binaryCols.toArray), numericCols.toArray, parametersAllowed);
    var assembler = ItemClassifier.prepareAssembler(settings, "features").setHandleInvalid("skip");
    var classifier = new ItemClassifier(settings)
    classifier = classifier.setLabelCol("NEXT_STATUS").setFeaturesCol("features")
    
    var item_pipeline = new Pipeline().setStages(Array(assembler, classifier))
    var itemModel = item_pipeline.fit(df_train.limit(fittingSample).drop(classifier.getFeaturesCol))

    itemModel.save("/Users/tyler/sync-workspace/code/spark-notebooks/savedModel_151p_" + edu.columbia.tjw.item.util.random.RandomTool.randomString(10) + ".dat");
}




// var item_pipeline = PipelineModel.load("/Users/tyler/sync-workspace/code/spark-notebooks/savedModel_151p_49dfa23b9b.dat")



In [58]:
%spark

var item_insample = testClassifier(df_train.limit(fittingSample), item_pipeline,"item_insample");
var item_indist = testClassifier(df_test, item_pipeline,"item_indist");

In [59]:
%spark


df_combined = df_combined.union(summarize_assembled(df_train.limit(fittingSample), df_test, item_pipeline, "base_item")).persist()

z.show(df_combined)

## Now apply the models to the various datasets, and analyze the results. 


In [61]:
%sql

SELECT age, sum(prob_c), sum(prob_p), sum(prob_3), sum(actual_c), sum(actual_p), sum(actual_3), sum(h_g), sum(h_fg)
FROM rf_insample
GROUP BY age
ORDER BY age


In [62]:
%sql

SELECT age, sum(prob_c), sum(prob_p), sum(prob_3), sum(actual_c), sum(actual_p), sum(actual_3), sum(h_g), sum(h_fg)
FROM lr_insample
GROUP BY age
ORDER BY age


In [63]:
%sql

SELECT age, sum(prob_c), sum(prob_p), sum(prob_3), sum(actual_c), sum(actual_p), sum(actual_3), sum(h_g), sum(h_fg)
FROM item_insample
GROUP BY age
ORDER BY age

In [64]:
%sql

SELECT age, sum(prob_c), sum(prob_p), sum(prob_3), sum(actual_c), sum(actual_p), sum(actual_3), sum(h_g), sum(h_fg)
FROM mlp_insample
GROUP BY age
ORDER BY age

### END
