Здравствуйте.

Spark умеет валидоровать модели. Попробуем это сделать. Evaluation ипортируется следующим образом:


```
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
```

В частности [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html#pyspark.ml.evaluation.RegressionEvaluator.metricName)

# Задание
Ниже обучается и оцениваться модель. 

Нужно перевести этот в Pipeline (вам понадобится VectorAssembler), а затем оценить MAE с помощью spark.


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 64 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=5f726be1fc7ad11dab06bed60e027d70a2347ef9c30d0f2aa4a8d1d30c9694e8
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [1]:
# https://scikit-learn.org/stable/datasets/toy_dataset.html#boston-dataset

from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

In [2]:
import pandas as pd
from sklearn.datasets import load_diabetes, load_iris, load_boston
from sklearn.metrics import mean_absolute_error

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder\
    .master("local[2]")\
    .appName("Lesson_2")\
    .config("spark.executor.instances",2)\
    .config("spark.executor.memory",'2g')\
    .config("spark.executor.cores",1)\
    .getOrCreate()
sc = spark.sparkContext

21/08/28 18:47:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

In [4]:
data = load_boston()
dataset = pd.DataFrame(data['data'], columns=data['feature_names'])
dataset['target'] = data['target']

In [5]:
spark_dataset_2 = spark.createDataFrame(dataset)
spark_dataset_2.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- target: double (nullable = true)



In [6]:
data_v = VectorAssembler(
    inputCols=["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT"],
    outputCol="features")
out_data = data_v.transform(spark_dataset_2).select('features', 'target')

In [7]:
(train, test) = spark_dataset_2.randomSplit([0.7, 0.3])

lr = RandomForestRegressor(featuresCol='features',labelCol='target')

In [8]:
train.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- target: double (nullable = true)



In [9]:
pipeline = Pipeline(stages=[data_v, lr])
model = pipeline.fit(train)

                                                                                

In [10]:
prediction_train = model.transform(train)
prediction_test = model.transform(test)

In [11]:
pr_train = prediction_train.toPandas()
pr_test = prediction_test.toPandas()
print(f'''
    Scores:: 
        train: {mean_absolute_error(
            pr_train['target'], 
            pr_train['prediction'])},
        test: {mean_absolute_error(
            pr_test['target'], 
            pr_test['prediction'])}
    ''')

21/08/28 18:48:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.



    Scores:: 
        train: 1.8798731513644906,
        test: 2.4565464207541012
    


In [17]:
eval = RegressionEvaluator().setLabelCol("target").setPredictionCol("prediction").setMetricName("mae")
accuracy = eval.evaluate(prediction_test)
print(f'MAE: ${accuracy}')

MAE: $2.4565464207541012
