In [1]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoders, SparkSession}

Intitializing Scala interpreter ...

Spark Web UI available at http://FireBall:4040
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1548607726347)
SparkSession available as 'spark'


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoders, SparkSession}


In [2]:
val schemaStruct = StructType(
            StructField("country", StringType) ::
            StructField("points", DoubleType) ::
            StructField("price", DoubleType) :: Nil
        )

schemaStruct: org.apache.spark.sql.types.StructType = StructType(StructField(country,StringType,true), StructField(points,DoubleType,true), StructField(price,DoubleType,true))


In [3]:
val df = spark.read
      .option("header", true)
      //.schema(schemaStruct)
      .option("inferSchema", true)
      .csv("file:///C:/Users/FireBall/IdeaProjects/SparkDemo/src/main/data/winereviews/wine-data.csv")
      .na.drop()

df: org.apache.spark.sql.DataFrame = [id: string, country: string ... 9 more fields]


In [4]:
val new_df = df.drop("id", "description", "designation", "province", "region_1", "region_2", "variety", "winery")

new_df: org.apache.spark.sql.DataFrame = [country: string, points: string ... 1 more field]


In [5]:
new_df.printSchema()
new_df.cache()
//df.printSchema()

root
 |-- country: string (nullable = true)
 |-- points: string (nullable = true)
 |-- price: string (nullable = true)



res0: new_df.type = [country: string, points: string ... 1 more field]


In [6]:
new_df.head(2)
//df.cache()
//df.head(2)

res1: Array[org.apache.spark.sql.Row] = Array([US,96,235], [US,96,90])


In [7]:
/*
import spark.implicits._
case class Initial(country: String , points: String , price: String )
case class Final(country: String, points: Double, price: Double)

def swapType(in: Initial) = Final(
    in.country,
    in.points.toDouble,
    in.price.toDouble
)

val df_2 = new_df.as[Initial].map(swapType(_))
df_2.printSchema()
*/

In [8]:
//new_df.printSchema()

In [9]:
val train_df = new_df.select( new_df("country"), 
                              new_df("points").cast(DoubleType).as("points"), 
                              new_df("price").cast(DoubleType).as("price") )

train_df: org.apache.spark.sql.DataFrame = [country: string, points: double ... 1 more field]


In [10]:
train_df.na.drop()

res4: org.apache.spark.sql.DataFrame = [country: string, points: double ... 1 more field]


In [11]:
train_df.show()

+-------+------+-----+
|country|points|price|
+-------+------+-----+
|     US|  96.0|235.0|
|     US|  96.0| 90.0|
|     US|  96.0| 65.0|
|     US|  95.0| 65.0|
|     US|  95.0| 60.0|
|     US|  95.0| 48.0|
|     US|  95.0| 48.0|
|     US|  95.0|185.0|
|     US|  95.0| 90.0|
|     US|  95.0|325.0|
|     US|  95.0| 75.0|
|     US|  95.0| 24.0|
|     US|  95.0| 60.0|
|     US|  95.0| 45.0|
|     US|  94.0|105.0|
|     US|  94.0| 60.0|
|     US|  94.0| 60.0|
|     US|  90.0| 37.0|
|     US|  90.0| 42.0|
|     US|  90.0| 60.0|
+-------+------+-----+
only showing top 20 rows



In [12]:
val Array(trainingData, testData) = train_df.randomSplit(Array(0.7, 0.3))

trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, points: double ... 1 more field]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, points: double ... 1 more field]


In [13]:
val labelColumn = "price"

labelColumn: String = price


In [14]:
val countryIndexer = new StringIndexer()
            .setInputCol("country")
            .setOutputCol("countryIndex")
            .setHandleInvalid("skip")

countryIndexer: org.apache.spark.ml.feature.StringIndexer = strIdx_4d94574d0867


In [25]:
val assembler = new VectorAssembler()
            .setInputCols(Array("points", "countryIndex"))
            .setOutputCol("features")
            .setHandleInvalid("skip")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_baa0c40228de


In [26]:
val gbt = new GBTRegressor()
            .setLabelCol(labelColumn)
            .setFeaturesCol("features")
            .setPredictionCol("Predicted " + labelColumn)
            .setMaxIter(100)

gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_e0db4b568ef6


In [27]:
val stages = Array(
            countryIndexer,
            assembler,
            gbt
        )

stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(strIdx_4d94574d0867, vecAssembler_baa0c40228de, gbtr_e0db4b568ef6)


In [28]:
val pipeline = new Pipeline().setStages(stages)

pipeline: org.apache.spark.ml.Pipeline = pipeline_0cd78c344e64


In [29]:
val model = pipeline.fit(trainingData)

model: org.apache.spark.ml.PipelineModel = pipeline_0cd78c344e64


In [30]:
val predictions = model.transform(testData)

predictions: org.apache.spark.sql.DataFrame = [country: string, points: double ... 4 more fields]


In [31]:
predictions.show()

+-------+------+-----+------------+----------+------------------+
|country|points|price|countryIndex|  features|   Predicted price|
+-------+------+-----+------------+----------+------------------+
|     US|  80.0| 12.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  80.0| 22.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  80.0| 25.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  80.0| 30.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  80.0| 36.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  80.0| 40.0|         0.0|[80.0,0.0]|24.748344366963156|
|     US|  81.0|  8.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US|  81.0| 13.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US|  81.0| 16.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US|  81.0| 19.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US|  81.0| 19.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US|  81.0| 20.0|         0.0|[81.0,0.0]|22.990867585442828|
|     US| 

In [32]:
val predictions_na = predictions.na.drop()

predictions_na: org.apache.spark.sql.DataFrame = [country: string, points: double ... 4 more fields]


In [33]:
val evaluator = new RegressionEvaluator()
            .setLabelCol(labelColumn)
            .setPredictionCol("Predicted " + labelColumn)
            .setMetricName("rmse")

evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_5d5a5f14103f


In [34]:
val winePredError = evaluator.evaluate(predictions_na)

winePredError: Double = 27.66570715559826
