Датасет взял с Kaggle ([ссылка](https://www.kaggle.com/datasets/rkiattisak/salaly-prediction-for-beginer)). Перед нами задача регресси с предсказанием зарплаты.

В датасете есть такие столбцы:

- ```Age``` - возраст
- ```Gender``` - пол
- ```Education Level``` - уровень образования
- ```Job Title``` -должность
- ```Years of Experience``` - стаж в годах
- ```Salary``` - зарплата

импортируем все необходимые библиотеки:

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd
import subprocess

открываем сессию, скачиваем датасет и смотрим на него:

In [2]:
spark = SparkSession.builder \
    .appName('create_model') \
    .config('spark.executor.instances', 4) \
    .getOrCreate()

23/09/09 13:18:32 WARN Utils: Your hostname, mysha-Inspiron-11-3147 resolves to a loopback address: 127.0.1.1; using 192.168.1.75 instead (on interface wlp1s0)


Ivy Default Cache set to: /root/.ivy2/cache


In [3]:
df = spark.read.csv('/mysha/dz8/salary_data.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Education Level: string (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Years of Experience: double (nullable = true)
 |-- Salary: double (nullable = true)



In [4]:
df.describe().show()

+-------+-----------------+------+---------------+---------------+-------------------+------------------+
|summary|              Age|Gender|Education Level|      Job Title|Years of Experience|            Salary|
+-------+-----------------+------+---------------+---------------+-------------------+------------------+
|  count|              373|   373|            373|            373|                373|               373|
|   mean|37.43163538873995|  null|           null|           null|  10.03083109919571|100577.34584450402|
| stddev|7.069072938567491|  null|           null|           null|  6.557007136414239|48240.013481882655|
|    min|             23.0|Female|     Bachelor's|Account Manager|                0.0|             350.0|
|    max|             53.0|  Male|            PhD|  Web Developer|               25.0|          250000.0|
+-------+-----------------+------+---------------+---------------+-------------------+------------------+



добавим в датасет два новых признака и переименуем ```Salary``` в ```label```:
- ```Age Group``` - категория возраста, сделаем их три
- ```Title Length``` - длина строки названия должности, а что, вдруг найдётся какая-то взаимосвязь

посмотрим на результат:

In [5]:
# новый признак - три категории возрастов
df = df.withColumn('Age Group', F.when(df['Age'] <= 30, 'young')
                               .when((df['Age'] > 30) & (df['Age'] <= 45), 'midage')
                               .otherwise('odl'))

# новый признак - количество символов в названии должности
df = df.withColumn('Title Length', F.length(df['Job Title']))

df = df.withColumnRenamed("Salary", "label")
df.show(3)

+----+------+---------------+-----------------+-------------------+--------+---------+------------+
| Age|Gender|Education Level|        Job Title|Years of Experience|   label|Age Group|Title Length|
+----+------+---------------+-----------------+-------------------+--------+---------+------------+
|32.0|  Male|     Bachelor's|Software Engineer|                5.0| 90000.0|   midage|          17|
|28.0|Female|       Master's|     Data Analyst|                3.0| 65000.0|    young|          12|
|45.0|  Male|            PhD|   Senior Manager|               15.0|150000.0|   midage|          14|
+----+------+---------------+-----------------+-------------------+--------+---------+------------+
only showing top 3 rows



делим датасет на train и test для поиска лучших гиперпараметров по сетке, а для итогового обучения модели создадим копию датафрейма ```save_df```, на нём сделаем финальный фит модели со всеми трансформациями для сохранения её ф директорию на HDFS:

In [6]:
train, test = df.randomSplit([0.7, 0.3], seed=0)
save_df = train.alias("save_df")
save_df.show(2)

+----+------+---------------+----------------+-------------------+-------+---------+------------+
| Age|Gender|Education Level|       Job Title|Years of Experience|  label|Age Group|Title Length|
+----+------+---------------+----------------+-------------------+-------+---------+------------+
|24.0|  Male|     Bachelor's|Junior Developer|                1.0|40000.0|    young|          16|
|25.0|Female|     Bachelor's|Data Entry Clerk|                0.0|35000.0|    young|          16|
+----+------+---------------+----------------+-------------------+-------+---------+------------+
only showing top 2 rows



во так будем преобразовывать данные в пайплайне - создадим ```features``` - вектор с трансформированными категориальными и числовыми признаками. А все шаги трансформаций сохраним в ```stages```

*handleInvalid='keep'* в *StringIndexer* нужен на тот случай, если *StringIndexer* обнаружит какие-то новые данные и не будет знать, как их обработать. Так код не будет падать:

In [7]:
numCols = ['Age', 'Years of Experience', 'Title Length']
catCols = ['Gender', 'Education Level', 'Job Title', 'Age Group']

stages = []
for c in catCols:
    st = StringIndexer(inputCol=c, outputCol=c+'_Index', handleInvalid='keep')
    encoder = OneHotEncoder(inputCols=[st.getOutputCol()], outputCols=[c+'_classVec'])
    stages += [st, encoder]
    
assemblerInputs = [i+'_classVec' for i in catCols] + numCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(train)

pipelineModel.write().overwrite().save("/mysha/dz8/pipelineModel")

train = pipelineModel.transform(train)
test = pipelineModel.transform(test)

pd.DataFrame(train.take(5), columns=train.columns).T

Unnamed: 0,0,1,2,3,4
Age,24.0,25.0,25.0,25.0,26.0
Gender,Male,Female,Female,Male,Female
Education Level,Bachelor's,Bachelor's,Master's,Bachelor's,Bachelor's
Job Title,Junior Developer,Data Entry Clerk,Junior Marketing Specialist,Sales Representative,Customer Success Rep
Years of Experience,1.0,0.0,1.5,0.0,1.0
label,40000.0,35000.0,40000.0,30000.0,40000.0
Age Group,young,young,young,young,young
Title Length,16,16,27,20,20
Gender_Index,0.0,1.0,1.0,0.0,1.0
Gender_classVec,"(1.0, 0.0)","(0.0, 1.0)","(0.0, 1.0)","(1.0, 0.0)","(0.0, 1.0)"


In [8]:
selectedCols = ['label', 'features'] + catCols + numCols
train.select(selectedCols).show(3)

+-------+--------------------+------+---------------+--------------------+---------+----+-------------------+------------+
|  label|            features|Gender|Education Level|           Job Title|Age Group| Age|Years of Experience|Title Length|
+-------+--------------------+------+---------------+--------------------+---------+----+-------------------+------------+
|40000.0|(153,[0,2,92,148,...|  Male|     Bachelor's|    Junior Developer|    young|24.0|                1.0|          16|
|35000.0|(153,[1,2,71,148,...|Female|     Bachelor's|    Data Entry Clerk|    young|25.0|                0.0|          16|
|40000.0|(153,[1,3,44,148,...|Female|       Master's|Junior Marketing ...|    young|25.0|                1.5|          27|
+-------+--------------------+------+---------------+--------------------+---------+----+-------------------+------------+
only showing top 3 rows



самописная функция ```get_model``` принимает алгоритм и в кросс-валидации по сетке параметров выбирает лучшие по значению MAE, пересоздаёт пайплайн, обучает модель и сохраняет обученную модель в именную папку на HDFS:

In [9]:
def get_model(ml, grid, evaluator, folderName, stages=stages, train=train, test=test, save_df=save_df):
    
    cv = CrossValidator(estimator=ml, estimatorParamMaps=grid, evaluator=evaluator)
    model = cv.fit(train)

    bestModel = model.bestModel
    
    train_predictions = model.transform(train)
    test_predictions = model.transform(test)
    print(f'модель {folderName}')
    print(f'MAE на тренировочной выборке = {evaluator.evaluate(bestModel.transform(train))}')
    print(f'MAE на тестовой выборке = {evaluator.evaluate(bestModel.transform(test))}')
    try:
        subprocess.call(['hdfs', 'dfs', '-mkdir', f'/mysha/dz8/model_{folderName}'])
        subprocess.call(['hdfs', 'dfs', '-rm', '-r', '-f', '-skipTrash', f'/mysha/dz8/model_{folderName}/*'])
        stages += [bestModel]
        pipeline = Pipeline(stages=stages)
        save_model = pipeline.fit(save_df)
        save_model.write().overwrite().save(f'/mysha/dz8/model_{folderName}')
        print(f"модель сохранена на HDFS в директорию: '/mysha/dz8/model_{folderName}'")
    except:
        print('что-то пошло не так')

переберём три алгоритма. Все три обучанные модели сохраним, а использовать в дальнейшем будет лучшую по MAE на тестовой выборке:

In [10]:
%%time
evaluator = RegressionEvaluator(metricName='mae')

lr = LinearRegression()
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

get_model(lr, paramGrid, evaluator, 'LinearRegression')

модель LinearRegression
MAE на тренировочной выборке = 3347.3892690685316
MAE на тестовой выборке = 13582.545809830764
модель сохранена на HDFS в директорию: '/mysha/dz8/model_LinearRegression'
CPU times: user 3.59 s, sys: 1.15 s, total: 4.75 s
Wall time: 1min 51s


In [11]:
%%time
rf = RandomForestRegressor()

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .addGrid(rf.featureSubsetStrategy, ['auto', 'sqrt', 'log2']) \
    .build()

get_model(rf, paramGrid, evaluator, 'RandomForestRegressor')

модель RandomForestRegressor
MAE на тренировочной выборке = 3904.9648227237863
MAE на тестовой выборке = 9580.307245477587
модель сохранена на HDFS в директорию: '/mysha/dz8/model_RandomForestRegressor'
CPU times: user 34.7 s, sys: 11.3 s, total: 45.9 s
Wall time: 10min 11s


In [12]:
%%time
gbt = GBTRegressor()

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .addGrid(gbt.stepSize, [0.1, 0.01, 0.001]) \
    .addGrid(gbt.subsamplingRate, [0.5, 0.75, 1.0]) \
    .build()

get_model(gbt, paramGrid, evaluator, 'GBTRegressor')

модель GBTRegressor
MAE на тренировочной выборке = 177.2288715811082
MAE на тестовой выборке = 11977.694564979498
модель сохранена на HDFS в директорию: '/mysha/dz8/model_GBTRegressor'
CPU times: user 42.2 s, sys: 14.4 s, total: 56.6 s
Wall time: 2h 8min 58s


лучше всех мнбя показал лес, его и будем использовать. А пока закроем сессию:

In [13]:
spark.stop()

_____