# Chapter 2 – End-to-end Machine Learning project**

*Welcome to Machine Learning Housing Corp.! Your task is to predict median house values in Californian districts, given a number of features from these districts Using Java.*



# Setup

First, let's make sure this notebook works well in :

In [None]:
%maven commons-io:commons-io:jar:2.6
%maven io.vavr:vavr:jar:0.10.0
%maven org.apache.commons:commons-compress:1.18
%maven tech.tablesaw:tablesaw-core:jar:0.32.7
%maven tech.tablesaw:tablesaw-jsplot:jar:0.32.7   
%maven nz.ac.waikato.cms.weka:weka-stable:jar:3.8.3
%maven nz.ac.waikato.cms.weka:wekaDeeplearning4j:jar:1.5.13
    
    
import org.apache.commons.io.*;
import java.io.*;
import io.vavr.control.*;
import org.apache.commons.compress.archivers.tar.*;
import org.apache.commons.compress.compressors.gzip.*;

var DOWNLOAD_ROOT = "https://raw.githubusercontent.com/ageron/handson-ml/master/";
var HOUSING_URL = DOWNLOAD_ROOT + "datasets/housing/housing.tgz";
var PROJECT_ROOT_DIR = ".";
var CHAPTER_ID = "end_to_end_project";
var HOUSING_PATH = FilenameUtils.concat("datasets", "housing");
var BUFFER_SIZE = 1024;
void fetch_housing_data(String housingUrl, File housingPath){
   housingUrl = Objects.toString(housingUrl,HOUSING_URL);
   Objects.requireNonNull(housingPath);
   if(!housingPath.exists()){
       Try.run(() -> FileUtils.forceMkdir(housingPath));
   }
   var tgzPath = new File(FilenameUtils.concat(housingPath.getPath(), "housing.tgz"));
   var urlTemp = housingUrl;
   Try.run(() -> FileUtils.copyURLToFile(new URL(urlTemp), tgzPath )); 
   Try.run(() -> extractTarGZ(tgzPath, housingPath) );
}

void extractTarGZ(File in, File destDir) throws Exception {
    GzipCompressorInputStream gzipIn = new GzipCompressorInputStream(new FileInputStream(in));
    try (TarArchiveInputStream tarIn = new TarArchiveInputStream(gzipIn)) {
        TarArchiveEntry entry;

        while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) {
            /** If the entry is a directory, create the directory. **/
            if (entry.isDirectory()) {
                File f = new File(FilenameUtils.concat(destDir.getPath(),entry.getName()));
                boolean created = f.mkdir();
                if (!created) {
                    System.out.printf("Unable to create directory '%s', during extraction of archive contents.\n",
                            f.getAbsolutePath());
                }
            } else {
                int count;
                byte data[] = new byte[BUFFER_SIZE];
                FileOutputStream fos = new FileOutputStream(FilenameUtils.concat(destDir.getPath(),entry.getName()), false);
                try (BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE)) {
                    while ((count = tarIn.read(data, 0, BUFFER_SIZE)) != -1) {
                        dest.write(data, 0, count);
                    }
                }
            }
        }

    }
}


In [None]:
//fetch_housing_data(HOUSING_URL, new File(HOUSING_PATH));

In [None]:
import tech.tablesaw.api.Table;

Table load_housing_data(File housingPathCsv){
 return Try.of(() ->Table.read().csv(housingPathCsv)).get();
}


In [None]:

var housing = load_housing_data(new File(FilenameUtils.concat(HOUSING_PATH,"housing.csv")));
housing.first(4);

In [None]:
//planning to replicate housing.info()
display(housing.structure());
display(housing.shape());
display(housing.summary());

In [None]:
housing.xTabCounts("ocean_proximity")

In [None]:


import tech.tablesaw.plotly.components.Figure;
import tech.tablesaw.plotly.components.Layout;
import tech.tablesaw.plotly.api.Histogram;  
import tech.tablesaw.plotly.Plot;
import javax.imageio.ImageIO;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
import tech.tablesaw.plotly.components.Page;

void renderPlotly(Figure fig){
    Page page = Page.pageBuilder(fig, "target").build();
    display(page.asJavascript(),"text/html");
}

//renderPlotly(Histogram.create("Distribution of total_rooms", housing, "total_rooms"));

// housing.numericColumns().forEach(f ->{
//          HistogramTrace trace = HistogramTrace.builder(f.asDoubleArray()).build();
//          Plot.show(new Figure(Layout.builder(f.name()).build(), trace));
        
// });


# Preparing for stratified sampling
*I have skipped random sampling in the book*

In [None]:
import java.util.function.ToDoubleFunction;
var incomeCat = housing.doubleColumn("median_income").map((ToDoubleFunction<Double>) f -> Math.ceil(f/1.5)).map((ToDoubleFunction<Double>)cat -> cat > 5 ? 5: cat );
housing.addColumns(incomeCat.setName("income_cat"));
housing.first(2)

In [None]:
import tech.tablesaw.api.CategoricalColumn;
import tech.tablesaw.columns.Column;
import static tech.tablesaw.aggregate.AggregateFunctions.*;
Table[] stratifiedSampleSplit(Table table, String column, double table1Proportion){
    final Table first = table.emptyCopy();
    final Table second = table.emptyCopy();
    String categoricalColumn = column;
    Column<?> col = table.column(column);
    if(!CategoricalColumn.class.isInstance(col)){
        categoricalColumn += "_stringified";
        table.addColumns(col.asStringColumn().setName(categoricalColumn));
    }
    table.splitOn(categoricalColumn).asTableList().forEach(tab-> {
       Table[] splits = tab.sampleSplit(table1Proportion); 
        first.append(splits[0]);
        second.append(splits[1]);
    });
    if(!categoricalColumn.equals(column)){
        table.removeColumns(table.column(categoricalColumn));
    }
    return new Table[]{first, second};
}

var strats = stratifiedSampleSplit(housing,"income_cat", 0.2);
strats[1].removeColumns(strats[1].column("income_cat"));
strats[0].removeColumns(strats[0].column("income_cat"));
display(strats[1].shape());
display(strats[0].shape());
display(strats[0].summarize("longitude","median_income", mean, count).apply());
strats[1].summarize("longitude","median_income", mean, count).apply();


# Discover and visualize the data to gain insights

**skipped for now

In [None]:
housing = strats[1].copy();
housing.shape();

**Tablesaw  doesn handle missing values very well so we will set missing values to 0

In [None]:
housing.missingValueCounts();

In [None]:
var tBRoom = housing.intColumn("total_bedrooms");
housing.replaceColumn("total_bedrooms",tBRoom.set(tBRoom.isMissing(),0));
var summarizer = housing.summarize("total_bedrooms", mean, sum, count).apply();
summarizer

In [None]:
housing.missingValueCounts();

**lets save the mean because we will need it later

In [None]:
var totalBedroomsMean = summarizer.column(0).get(0);
totalBedroomsMean;

**Looking for correlation

In [None]:
import java.util.stream.*;
import org.apache.commons.math3.stat.correlation.PearsonsCorrelation;
import io.vavr.Tuple;  

var medianVector  = housing.intColumn("median_house_value").asDoubleColumn();

var corr = new PearsonsCorrelation();
housing.numericColumns().stream()
    .map(i -> Tuple.of(i.name(),corr.correlation( i.asDoubleArray(), medianVector.asDoubleArray() )))
    .sorted((a, b) -> {
        int c = 0;
        if( a._2 == Double.NaN  && b._2 == Double.NaN ){
            c = 0;
        }
        else if(b._2 == Double.NaN || a._2 > b._2){
            c = 1;
        }
        else if(a._2 == Double.NaN ||a._2 < b._2){
            c = -1;
        }
        
        return c;
    })
    .collect(Collectors.toList());

**Plot Pandas correlation later

In [None]:
housing.addColumns( 
    housing.nCol("total_rooms").divide(housing.nCol("households")).setName("rooms_per_household"),
    housing.nCol("total_bedrooms").divide(housing.nCol("total_rooms")).setName("bedrooms_per_room"),
    housing.nCol("total_bedrooms").divide(housing.nCol("households")).setName("population_per_household")
);
housing.summary()

In [None]:
var medianVector  = housing.intColumn("median_house_value").asDoubleColumn();

var corr = new PearsonsCorrelation();
housing.numericColumns().stream()
    .map(i -> Tuple.of(i.name(),corr.correlation( i.asDoubleArray(), medianVector.asDoubleArray() )))    
    .sorted((a, b) -> {
        int c = 0;
        if( a._2 == Double.NaN  && b._2 == Double.NaN ){
            c = 0;
        }
        else if(b._2 == Double.NaN || a._2 > b._2){
            c = 1;
        }
        else if(a._2 == Double.NaN ||a._2 < b._2){
            c = -1;
        }
        
        return c;
    })
    .collect(Collectors.toList());

In [None]:
housing.missingValueCounts();

**I am going with option 1(Get rid of the corresponding districts.)

In [None]:
housing = strats[1].copy();
housing.addColumns( 
    housing.nCol("total_rooms").divide(housing.nCol("households")).setName("rooms_per_household"),
    housing.nCol("total_bedrooms").divide(housing.nCol("total_rooms")).setName("bedrooms_per_room"),
    housing.nCol("total_bedrooms").divide(housing.nCol("households")).setName("population_per_household")
);
//housing =housing.dropRowsWithMissingValues();
housing.missingValueCounts();

In [None]:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import weka.core.Attribute;
import tech.tablesaw.api.ColumnType;
import tech.tablesaw.api.NumericColumn;
import tech.tablesaw.api.StringColumn;
import tech.tablesaw.columns.Column;
import tech.tablesaw.table.Relation;
import weka.core.DenseInstance;
import weka.core.Instance;
import weka.core.Instances;
import weka.core.Utils;

/**
 *
 * @author James Akinniranye
 */
public class WekaConverter {

    private Relation table;
    private Instances structure;

    public WekaConverter() {
        
    }
    public WekaConverter(Relation table) {
        this.table = table;
    }
    
    public WekaConverter setRelation(Relation table) {
        this.table = table;
        return this;
    }

    /**
     * Returns a dataset where the response column is numeric. E.g. to be used
     * for a regression
     */
    public Instances numericDataset(String classColName) {
        return dataset(
                table.numberColumn(classColName),
                AttributeType.NUMERIC,
                table.numericColumns().stream().filter(c -> !c.name().equals(classColName)).collect(Collectors.toList()));
    }

    /**
     * Returns a dataset where the response column is numeric. E.g. to be used
     * for a regression
     */
    public Instances numericDataset(int classColIndex, int... variablesColIndices) {
        return dataset(table.numberColumn(classColIndex), AttributeType.NUMERIC, table.columns(variablesColIndices));
    }

    /**
     * Returns a dataset where the response column is numeric. E.g. to be used
     * for a regression
     */
    public Instances numericDataset(String classColName, String... variablesColNames) {
        return dataset(table.numberColumn(classColName), AttributeType.NUMERIC, table.columns(variablesColNames));
    }

    /**
     * Returns a dataset where the response column is nominal. E.g. to be used
     * for a classification
     */
    public Instances nominalDataset(String classColName) {
        return dataset(
                table.numberColumn(classColName),
                AttributeType.NOMINAL,
                table.numericColumns().stream().filter(c -> !c.name().equals(classColName)).collect(Collectors.toList()));
    }

    /**
     * Returns a dataset where the response column is nominal. E.g. to be used
     * for a classification
     */
    public Instances nominalDataset(int classColIndex, int... variablesColIndices) {
        return dataset(table.numberColumn(classColIndex), AttributeType.NOMINAL, table.columns(variablesColIndices));
    }

    /**
     * Returns a dataset where the response column is nominal. E.g. to be used
     * for a classification
     */
    public Instances nominalDataset(String classColName, String... variablesColNames) {
        return dataset(table.numberColumn(classColName), AttributeType.NOMINAL, table.columns(variablesColNames));
    }

    private Instances dataset(NumericColumn<?> classCol, AttributeType type, List<Column<?>> variableCols) {
        List<Column<?>> convertedVariableCols = variableCols.stream()
                .map(col -> col.type() == ColumnType.STRING ? col : table.nCol(col.name()))
                .collect(Collectors.toList());
       

       
        Instances dataset;
        if(structure == null){
             Attribute classAttribute = type == AttributeType.NOMINAL
                ? colAsNominalAttribute(classCol) : new Attribute(classCol.name());
            ArrayList<Attribute> attributes = new ArrayList<>(convertedVariableCols.stream().map(col -> colAsAttribute(col)).collect(Collectors.toList()));
            attributes.add(classAttribute);
            dataset = new Instances(table.name(), attributes,classCol.size());
            dataset.setClass(classAttribute);
        }
        else{
            dataset = new Instances(structure,classCol.size());
        }
        
        for (int i = 0; i < classCol.size(); i++) {
            Instance inst = new DenseInstance(dataset.numAttributes());
            inst.setDataset(dataset);
            final int r = i;
            IntStream.range(0, dataset.numAttributes()-1)
                    .forEach(c -> inst.setValue(c, getDouble(convertedVariableCols.get(c), dataset.attribute(c), r)));
            inst.setValue(dataset.numAttributes()-1, getDouble(classCol, dataset.classAttribute(), r));
            dataset.add(inst);
        }
        if(structure == null){
            structure  = dataset.stringFreeStructure();
        }
        dataset.compactify();
        return dataset;
    }

    private double getDouble(Column<?> col, Attribute attr, int r) {
        if (col.type() == ColumnType.STRING) {
            return attr.indexOfValue(Utils.unquote(((StringColumn) col).get(r)));
        }
        if (col instanceof NumericColumn) {
            return ((NumericColumn<?>) col).getDouble(r);
        }
        throw new IllegalStateException("Error converting " + col.type() + " column " + col.name() + " to Smile");
    }

    private Attribute colAsAttribute(Column<?> col) {
        return col.type() == ColumnType.STRING ? colAsNominalAttribute(col) : new Attribute(col.name());
    }

    private Attribute colAsNominalAttribute(Column<?> col) {
        Column<?> unique = col.unique().removeMissing();
        Attribute att = new Attribute(col.name(),
                unique.mapInto(o -> Utils.unquote(o.toString()), StringColumn.create(col.name(), unique.size())).asList());
        //att.setWeight(1.0);
        return att;
    }

    private static enum AttributeType {
        NUMERIC,
        NOMINAL
    }
}


In [None]:
var cols= housing.columnNames().stream().filter(c -> !c.equals("median_house_value")).toArray(String[]::new);
var wekaConverter = new WekaConverter(housing);
var housingMl = wekaConverter.numericDataset("median_house_value",cols);
housingMl.toSummaryString();

In [None]:
import weka.filters.supervised.attribute.NominalToBinary;
import weka.filters.unsupervised.attribute.Remove;
import weka.filters.Filter;


NominalToBinary nom = new NominalToBinary();

nom.setInputFormat(housingMl);
var h2 = Filter.useFilter(housingMl, nom);

h2.toSummaryString();

In [None]:
h2.firstInstance();

In [None]:
import weka.filters.unsupervised.attribute.ReplaceMissingValues;
var rpl = new ReplaceMissingValues();
rpl.setInputFormat(h2);

h2 = Filter.useFilter(h2, rpl);

h2.toSummaryString();

In [None]:
import weka.classifiers.functions.SimpleLinearRegression;
import weka.filters.unsupervised.instance.Resample;
import weka.classifiers.evaluation.EvaluationUtils;
import weka.filters.Filter;

var linerReg = new SimpleLinearRegression();

Resample resample = new Resample();
resample.setInputFormat(h2);
resample.setSampleSizePercent((double)5*100/h2.size());
var evalUtil = new EvaluationUtils();
var testH2 = Filter.useFilter(h2, resample);
evalUtil.getTrainTestPredictions(linerReg, h2, testH2 )
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));

In [None]:
import weka.classifiers.functions.LinearRegression;

var linerReg = new LinearRegression();
evalUtil.getTrainTestPredictions(linerReg, h2, testH2)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));


In [None]:
import weka.filters.Filter;
import java.util.function.Function;
import java.util.HashMap;
class Pipeline{
    
    private final Filter[] filters;
    private final String[] attributeCols;
    private final String classAttribute;
    private WekaConverter converter;
    private Function<Relation, Relation> preProcessors;
    private final HashMap<Integer,Boolean> checks = new HashMap<>();
    public Pipeline( String[] attributeCols, String classAttribute, Filter... filters ){
        this.attributeCols = attributeCols;
        this.classAttribute = classAttribute;
        this.filters = filters;
    }
    
    public void setPreProcessing(Function<Relation, Relation> preProcessors){
        this.preProcessors = preProcessors;
    }
    
    public Instances fitTransom(Relation data){
        if(converter == null){
            converter = new WekaConverter();
        }
        if(preProcessors != null){
                data = preProcessors.apply(data);
        }
        Instances inst = converter.setRelation(data).numericDataset(classAttribute,attributeCols);
        Instances result = inst;
        for(Filter filter : filters){
            if(!checks.containsKey(filter.hashCode()) ){
               Try.run(() ->  filter.setInputFormat(inst));
               checks.put(filter.hashCode(), true);
            }
            Instances resultTemp = result;
            result = Try.of(() -> Filter.useFilter(resultTemp, filter)).get();
        }
        return result;
    }
}

In [None]:
import weka.filters.unsupervised.attribute.Standardize;

var cols= housing.columnNames().stream().filter(c -> !c.equals("median_house_value")).toArray(String[]::new);
var pipe = new Pipeline(cols, "median_house_value", 
    new ReplaceMissingValues(), new Standardize(), new NominalToBinary() );
Function<Relation,Relation> func = (hous) -> hous.addColumns( 
    hous.nCol("total_rooms").divide(hous.nCol("households")).setName("rooms_per_household"),
    hous.nCol("total_bedrooms").divide(hous.nCol("total_rooms")).setName("bedrooms_per_room"),
    hous.nCol("total_bedrooms").divide(hous.nCol("households")).setName("population_per_household")
);
pipe.setPreProcessing(func);
var d = pipe.fitTransom(strats[1].copy());
Resample resample = new Resample();
resample.setInputFormat(d);
resample.setSampleSizePercent((double)5*100/d.size());

var dtest = Filter.useFilter(d, resample);
dtest;

In [None]:
var linerReg = new LinearRegression();
linerReg.buildClassifier(d);
evalUtil.getTestPredictions(linerReg, dtest)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));

In [None]:
 import weka.classifiers.Evaluation;
 import java.util.Random;

var linerReg = new LinearRegression();
Evaluation eval = new Evaluation(d);
eval.crossValidateModel(linerReg, d, 10, new Random(1));
display("** Linear Regression Evaluation with Datasets **");
display(eval.toSummaryString(false));


In [None]:
import weka.classifiers.rules.DecisionTable;

var desc = new DecisionTable();
Evaluation eval = new Evaluation(d);
eval.crossValidateModel(desc, d, 10, new Random(1));
display("** DecisionTable Evaluation with Datasets **");
display(eval.toSummaryString(false));

In [None]:
desc.buildClassifier(d);
evalUtil.getTestPredictions(desc,  dtest)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));

In [None]:
import weka.classifiers.trees.RandomForest;

RandomForest forest=new RandomForest();
//increasing i to 100 makes the model better
forest.setOptions(new String[]{"-I", "10"});
Evaluation eval = new Evaluation(d);
eval.crossValidateModel(forest, d, 10, new Random(1));
display("** RandomForest Regression Evaluation with Datasets **");
display(eval.toSummaryString(false));

In [None]:

//RandomForest forest=new RandomForest();
forest.buildClassifier(d);
forest.setOptions(new String[]{"-I", "20"});
evalUtil.getTestPredictions(forest,  dtest)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));


In [None]:
import weka.classifiers.functions.SMOreg;

var svm = new SMOreg();
svm.setOptions(new String[]{"-N","0"});
Evaluation eval = new Evaluation(d);
eval.crossValidateModel(desc, d, 10, new Random(1));
display("** SMO SVM Evaluation with Datasets **");
display(eval.toSummaryString(false));

In [None]:
svm.buildClassifier(d);
evalUtil.getTestPredictions(svm,  dtest)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));

In [None]:
import weka.classifiers.meta.GridSearch;
var grid = new GridSearch();
grid.setOptions(new String[]{"-E","RMSE","-D"});
grid.buildClassifier(d);
grid.enumerateMeasures();

In [None]:
grid.getBestClassifier();

In [None]:
grid.getBestFilter();

In [None]:
grid.getEvaluation();

In [None]:
evalUtil.getTestPredictions(grid.getBestClassifier(),  dtest)
    .forEach(c -> display("test "+ c.actual() + " predicted = "+ c.predicted() ));