### Loading the libraries

In [1]:
%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.0

In [2]:
%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.2

### Import the classes

In [3]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf

import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.udf
import com.salesforce.op._
import com.salesforce.op.features._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.evaluators.Evaluators


In [4]:
import com.salesforce.op.OpWorkflow
import com.salesforce.op.evaluators.Evaluators
import com.salesforce.op.readers.DataReaders

import com.salesforce.op.OpWorkflow
import com.salesforce.op.evaluators.Evaluators
import com.salesforce.op.readers.DataReaders


### Instantiate Spark Instance

In [5]:
val conf = new SparkConf().setMaster("local[*]").setAppName("SimpleRegression")
implicit val spark = SparkSession.builder.config(conf).getOrCreate()

org.apache.spark.sql.SparkSession@6c425e72

### Schema and Feature Creation

In [6]:
case class SimpleRegression (
  population: Double,
  profit: Double)

defined class SimpleRegression


In [7]:
val population = FeatureBuilder.RealNN[SimpleRegression].extract(_.population.toRealNN).asPredictor
val profit = FeatureBuilder.RealNN[SimpleRegression].extract(_.profit.toRealNN).asResponse

Feature(name = profit, uid = RealNN_000000000002, isResponse = true, originStage = FeatureGeneratorStage_000000000002, parents = [], distributions = [])

#### Load the data

In [8]:
import spark.implicits._

val trainFilePath = "../src/main/resources/SimpleRegressionDataset/simple_regression.csv"
val trainDataReader = DataReaders.Simple.csvCase[SimpleRegression](
    path = Option(trainFilePath)
)

org.apache.spark.sql.SparkSession$implicits$@1e8ac1e6

In [9]:
// check that path exists
scala.reflect.io.File(trainFilePath).exists

true

In [10]:
import com.salesforce.op.stages.impl.tuning.{DataCutter, DataSplitter}
val features = Seq(population).transmogrify()
val randomSeed = 42L
val splitter = DataSplitter(seed = randomSeed)

DataSplitter_000000000005

#### Model Selector
The ModelSelector is an Estimator that uses data to find the best model. 

In [11]:
import com.salesforce.op.stages.impl.regression.RegressionModelsToTry.{OpGBTRegressor, OpRandomForestRegressor,OpLinearRegression}
import com.salesforce.op.stages.impl.regression.RegressionModelSelector

val cutter = DataCutter(reserveTestFraction = 0.2, seed = randomSeed)

val prediction = RegressionModelSelector
      .withCrossValidation(
        dataSplitter = Some(splitter), seed = randomSeed,
        modelTypesToUse = Seq(OpGBTRegressor, OpRandomForestRegressor)
        //modelTypesToUse = Seq(OpLinearRegression)
        
 
).setInput(profit, features).getOutput()

Feature(name = population-profit_3-stagesApplied_Prediction_00000000000f, uid = Prediction_00000000000f, isResponse = true, originStage = ModelSelector_00000000000f, parents = [RealNN_000000000002,OPVector_000000000004], distributions = [])

#### Evaluators and Workflow
Factory that performs the evaluation of metrics for regression. The metrics returned are rmse, mse, r2 and mae.
* Mean Squared Error (MSE)	
* Root Mean Squared Error (RMSE)	
* Mean Absolute Error (MAE)	
* Coefficient of Determination 

OpWorkflows create and transform the raw data needed to compute Features fed into them. In addition they optimize the application of Stages needed to create the final Features ensuring optimal computations within the full pipeline DAG. OpWorkflows can be fit to a given dataset using the .train() method. This produces an OpWorkflowModel which can then be saved to disk and applied to another dataset.

In [12]:
val evaluator = Evaluators.Regression().setLabelCol(profit).
      setPredictionCol(prediction)

OpRegressionEvaluator_000000000010

In [13]:
val workflow = new OpWorkflow().setResultFeatures(prediction, profit).setReader(trainDataReader)
val workflowModel = workflow.train()

com.salesforce.op.OpWorkflowModel@7eda8a5c

#### Score and evaluate

In [14]:
val dfScoreAndEvaluate = workflowModel.scoreAndEvaluate(evaluator)
val dfScore = dfScoreAndEvaluate._1//.withColumnRenamed("population-profit_3-stagesApplied_Prediction_000000000011","predicted_profit")
val dfEvaluate = dfScoreAndEvaluate._2
println("Evaluate:\n" + dfEvaluate.toString())

dfScore.show(false)

Evaluate:
{
  "RootMeanSquaredError" : 3.172343537076648,
  "MeanSquaredError" : 10.06376351723198,
  "R2" : 0.6650990450368588,
  "MeanAbsoluteError" : 2.300052394510972
}
+--------------------+-------+---------------------------------------------------------+
|key                 |profit |population-profit_3-stagesApplied_Prediction_00000000000f|
+--------------------+-------+---------------------------------------------------------+
|-2820701744609922157|17.592 |[prediction -> 3.9116793861193053]                       |
|-1364364500464464697|9.1302 |[prediction -> 2.7915410250785895]                       |
|4775711067767085698 |13.662 |[prediction -> 7.463972327115551]                        |
|2713511938805709318 |11.854 |[prediction -> 4.915782218746902]                        |
|732041312525985608  |6.8233 |[prediction -> 2.998118778340556]                        |
|-2153771554033910381|11.886 |[prediction -> 7.225213414646878]                        |
|3708650087469259187 |4.34

null