# Что такое Spark?
Spark - одна из новейших технологий, используемых для быстрой и простой обработки больших данных
Это проект с открытым исходным кодом
Впервые он был выпущен в феврале 2013 года и приобрел огромную популярность благодаря простоте использования и скорости
Spark в 100 раз быстрее, чем Hadoop MapReduce
Spark ничего не хранит, если к данным не применено какое-либо действие

# Импортируем все необходимые библиотеки

In [94]:
import os
import warnings

warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, count, when, isnan, col, regexp_replace
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

for dirname, _, filenames in os.walk('Datasets'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

Datasets\auto-mpg.csv
Datasets\online_retail.csv


# Создадим SparkSession

In [95]:
spark = SparkSession.builder.appName('First Session').getOrCreate()

print('Spark Version: {}'.format(spark.version))

Spark Version: 3.5.0


# Загузим данные

In [96]:
schema = StructType([StructField('mpg', FloatType(), nullable=True),
                     StructField('cylinders', IntegerType(), nullable=True),
                     StructField('displacement', FloatType(), nullable=True),
                     StructField('horsepower', StringType(), nullable=True),
                     StructField('weight', IntegerType(), nullable=True),
                     StructField('acceleration', FloatType(), nullable=True),
                     StructField('model year', IntegerType(), nullable=True),
                     StructField('origin', IntegerType(), nullable=True),
                     StructField('car name', StringType(), nullable=True)])

file_path = 'Datasets/auto-mpg.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True, nanValue='?')

df.show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|     130.0|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|     165.0|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|     150.0|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|     150.0|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|     140.0|  3449|        10.5|        70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


# Просмотрим пропущенные значения

In [97]:
def check_missing(dataframe):
    return dataframe.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataframe.columns]).show()


'''
1. dataframe.columns - свойство DataFrame, возвращающее список всех столбцов в DataFrame.
2. isnan(c) - функция, которая проверяет, является ли значение столбца NaN.
3. col(c).isNull() - функция, которая проверяет, является ли значение столбца нулевым (null).
4. when(isnan(c) | col(c).isNull(), c) - функция when, используемая для создания условного выражения. В данном случае, если значение столбца c является NaN или нулевым (null), то возвращается само значение c.
5. count(when(isnan(c) | col(c).isNull(), c)).alias(c) - функция count, используемая для подсчета количества пропущенных значений. Здесь мы подсчитываем количество значений, для которых условие в пункте 5 возвращает True, и назначаем данному столбцу имя c.
6. [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataframe.columns] - генератор списка, который создает список выражений подсчета пропущенных значений для каждого столбца в DataFrame.
7. dataframe.select(...).show() - функция select, используемая для выбора указанных столбцов в DataFrame. В качестве аргумента передается список выражений подсчета пропущенных значений, созданный в пункте 6. Затем вызывается функция show(), которая выводит результат на консоль.
'''

check_missing(df)

+---+---------+------------+----------+------+------------+----------+------+--------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name|
+---+---------+------------+----------+------+------------+----------+------+--------+
|  0|        0|           0|         6|     0|           0|         0|     0|       0|
+---+---------+------------+----------+------+------------+----------+------+--------+


# Обработаем пропущенные значения

In [98]:
df = df.na.drop()

df = df.withColumn("horsepower", df["horsepower"].cast(IntegerType()))  # сменим лошадиные силы на тип данных int

df.show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


# Выведем имена столбцов

In [99]:
df.columns

['mpg',
 'cylinders',
 'displacement',
 'horsepower',
 'weight',
 'acceleration',
 'model year',
 'origin',
 'car name']

# Можем отображать данные в pandas формате

In [100]:
df.toPandas().head()

Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model year,origin,car name
0,18.0,8,307.0,130,3504,12.0,70,1,chevrolet chevelle malibu
1,15.0,8,350.0,165,3693,11.5,70,1,buick skylark 320
2,18.0,8,318.0,150,3436,11.0,70,1,plymouth satellite
3,16.0,8,304.0,150,3433,12.0,70,1,amc rebel sst
4,17.0,8,302.0,140,3449,10.5,70,1,ford torino


# Можем выводить информацию о датасете

In [101]:
df.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- model year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- car name: string (nullable = true)


# Переименуем столбцы

In [102]:
df = df.withColumnRenamed('model year', 'model_year')

df = df.withColumnRenamed('car name', 'car_name')

df.show(3)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


# Выведем информацию по строкам

In [103]:
for car in df.head(4):
    print(car, '\n')

Row(mpg=18.0, cylinders=8, displacement=307.0, horsepower=130, weight=3504, acceleration=12.0, model_year=70, origin=1, car_name='chevrolet chevelle malibu') 

Row(mpg=15.0, cylinders=8, displacement=350.0, horsepower=165, weight=3693, acceleration=11.5, model_year=70, origin=1, car_name='buick skylark 320') 

Row(mpg=18.0, cylinders=8, displacement=318.0, horsepower=150, weight=3436, acceleration=11.0, model_year=70, origin=1, car_name='plymouth satellite') 

Row(mpg=16.0, cylinders=8, displacement=304.0, horsepower=150, weight=3433, acceleration=12.0, model_year=70, origin=1, car_name='amc rebel sst') 


# Получим статистическую сводку по датафрейму

In [104]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|summary|              mpg|         cylinders|      displacement|        horsepower|            weight|      acceleration|       model_year|            origin|            car_name|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|  count|              392|               392|               392|               392|               392|               392|              392|               392|                 392|
|   mean|23.44591836734694| 5.471938775510204|194.41198979591837|104.46938775510205|2977.5841836734694|15.541326530612228| 75.9795918367347|1.5765306122448979|                NULL|
| stddev|7.805007486571802|1.7057832474527845|104.64400390890465| 38.49115993282846| 849.402560

# Также можем получить сводку по конкретным столбцам

In [105]:
df.describe(['mpg', 'horsepower']).show()

+-------+-----------------+------------------+
|summary|              mpg|        horsepower|
+-------+-----------------+------------------+
|  count|              392|               392|
|   mean|23.44591836734694|104.46938775510205|
| stddev|7.805007486571802| 38.49115993282846|
|    min|              9.0|                46|
|    max|             46.6|               230|
+-------+-----------------+------------------+


In [106]:
def get_num_cols(dataframe):
    num_cols = [col for col in dataframe.columns if dataframe.select(col).dtypes[0][1] in ['double', 'int']]
    return num_cols


num_cols = get_num_cols(df)
df.describe(num_cols).show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|              mpg|         cylinders|      displacement|        horsepower|            weight|      acceleration|       model_year|            origin|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|              392|               392|               392|               392|               392|               392|              392|               392|
|   mean|23.44591836734694| 5.471938775510204|194.41198979591837|104.46938775510205|2977.5841836734694|15.541326530612228| 75.9795918367347|1.5765306122448979|
| stddev|7.805007486571802|1.7057832474527845|104.64400390890465| 38.49115993282846| 849.4025600429486|  2.75886411918808|3.683736543577868|0.8055181834183057|
|    min|              9.0|             

# Базовые операции с Датафреймами

In [107]:
df.filter(df['mpg'] > 23).show(5)  # Возьмем автомобили с пробегом более 23 миль на галлон топлива

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3|        datsun pl510|
|26.0|        4|        97.0|        46|  1835|        20.5|        70|     2|volkswagen 1131 d...|
|25.0|        4|       110.0|        87|  2672|        17.5|        70|     2|         peugeot 504|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|         audi 100 ls|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


In [108]:
df.filter((df['horsepower'] > 80) & (df['weight'] > 2000)).select('car_name').show(
    5)  # Рассмотрим множественные условия.
# Тут мы выбрали названия автомобилей, у которых количество лошадиных сил больше 80 и вес больше 2 тонн

+--------------------+
|            car_name|
+--------------------+
|chevrolet chevell...|
|   buick skylark 320|
|  plymouth satellite|
|       amc rebel sst|
|         ford torino|
+--------------------+


In [109]:
df.filter((df['mpg'] > 25) & (df['origin'] == 2)).orderBy('mpg', ascending=False).show(
    5)  # Выполним сортировку по расходу топлива для автомобилей с расходом более 25 миль.

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|44.3|        4|        90.0|        48|  2085|        21.7|        80|     2|vw rabbit c (diesel)|
|44.0|        4|        97.0|        52|  2130|        24.6|        82|     2|           vw pickup|
|43.4|        4|        90.0|        48|  2335|        23.7|        80|     2|  vw dasher (diesel)|
|43.1|        4|        90.0|        48|  1985|        21.5|        78|     2|volkswagen rabbit...|
|41.5|        4|        98.0|        76|  2144|        14.7|        80|     2|           vw rabbit|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


In [110]:
# Найдем автомобили с буквой "фольксваген" в названии и отсортируем их по году выпуска и мощности двигателя
df.filter(df['car_name'].contains('volkswagen')).orderBy(['model_year', 'horsepower'], ascending=[False, False]).show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|36.0|        4|       105.0|        74|  1980|        15.3|        82|     2| volkswagen rabbit l|
|33.0|        4|       105.0|        74|  2190|        14.2|        81|     2|    volkswagen jetta|
|31.5|        4|        89.0|        71|  1990|        14.9|        78|     2| volkswagen scirocco|
|43.1|        4|        90.0|        48|  1985|        21.5|        78|     2|volkswagen rabbit...|
|29.0|        4|        97.0|        78|  1940|        14.5|        77|     2|volkswagen rabbit...|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


In [111]:
df.filter(df['car_name'].like('%volkswagen%')).show(3)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|26.0|        4|        97.0|        46|  1835|        20.5|        70|     2|volkswagen 1131 d...|
|27.0|        4|        97.0|        60|  1834|        19.0|        71|     2|volkswagen model 111|
|23.0|        4|        97.0|        54|  2254|        23.5|        72|     2|   volkswagen type 3|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


# Фильтрация с SQL

In [112]:
df.filter("car_name like '%toyota%'").show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|25.0|        4|       113.0|        95|  2228|        14.0|        71|     3|       toyota corona|
|31.0|        4|        71.0|        65|  1773|        19.0|        71|     3| toyota corolla 1200|
|24.0|        4|       113.0|        95|  2278|        15.5|        72|     3|toyota corona har...|
|27.0|        4|        97.0|        88|  2100|        16.5|        72|     3|toyota corolla 16...|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


In [113]:
df.filter('mpg > 22').show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3|        datsun pl510|
|26.0|        4|        97.0|        46|  1835|        20.5|        70|     2|volkswagen 1131 d...|
|25.0|        4|       110.0|        87|  2672|        17.5|        70|     2|         peugeot 504|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|         audi 100 ls|
+----+---------+------------+----------+------+------------+----------+------+--------------------+


In [114]:
df.filter('mpg > 22 and acceleration < 15').show(5)

+----+---------+------------+----------+------+------------+----------+------+-------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|     car_name|
+----+---------+------------+----------+------+------------+----------+------+-------------+
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3| datsun pl510|
|24.0|        4|       107.0|        90|  2430|        14.5|        70|     2|  audi 100 ls|
|26.0|        4|       121.0|       113|  2234|        12.5|        70|     2|     bmw 2002|
|27.0|        4|        97.0|        88|  2130|        14.5|        71|     3| datsun pl510|
|25.0|        4|       113.0|        95|  2228|        14.0|        71|     3|toyota corona|
+----+---------+------------+----------+------+------------+----------+------+-------------+


In [115]:
df.filter('horsepower == 88 and weight between 2600 and 3000').select(['horsepower', 'weight', 'car_name']).show()

+----------+------+--------------------+
|horsepower|weight|            car_name|
+----------+------+--------------------+
|        88|  2957|         peugeot 504|
|        88|  2740|pontiac sunbird c...|
|        88|  2720| ford fairmont (man)|
|        88|  2890|     ford fairmont 4|
|        88|  2870|       ford fairmont|
|        88|  2605|  chevrolet cavalier|
|        88|  2640|chevrolet cavalie...|
+----------+------+--------------------+


# Групповка и агрегатные операции

In [116]:
df.createOrReplaceTempView('auto_mpg')

df = df.withColumn('brand', split(df['car_name'], ' ').getItem(0)).drop('car_name')  # Бренды

# Замена брендов с ошибками в написании
auto_misspelled = {'chevroelt': 'chevrolet',
                   'chevy': 'chevrolet',
                   'vokswagen': 'volkswagen',
                   'vw': 'volkswagen',
                   'hi': 'harvester',
                   'maxda': 'mazda',
                   'toyouta': 'toyota',
                   'mercedes-benz': 'mercedes'}

for key in auto_misspelled.keys():
    df = df.withColumn('brand', regexp_replace('brand', key, auto_misspelled[key]))

df.show(5)

+----+---------+------------+----------+------+------------+----------+------+---------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|    brand|
+----+---------+------------+----------+------+------------+----------+------+---------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|    buick|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1| plymouth|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|      amc|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|     ford|
+----+---------+------------+----------+------+------------+----------+------+---------+


In [117]:
df.groupBy('brand').agg({'acceleration': 'mean'}).show(5)  # Среднее ускорение по маркам автомобилей

+--------+------------------+
|   brand| avg(acceleration)|
+--------+------------------+
|   buick|14.700000000000003|
| pontiac|14.081249999999999|
|mercedes| 19.53333333333333|
|  toyota| 16.03846153846154|
|    saab|            15.175|
+--------+------------------+


In [118]:
df.groupBy('brand').agg({'mpg': 'max'}).show(5)  # Максимальное количество миль на галлон по маркам автомобилей

+--------+--------+
|   brand|max(mpg)|
+--------+--------+
|   buick|    30.0|
| pontiac|    33.5|
|mercedes|    30.0|
|  toyota|    39.1|
|    saab|    25.0|
+--------+--------+


# Машинное обучение
- Машинное обучение - это метод анализа данных, который автоматизирует построение аналитической модели.
- Используя алгоритмы, которые итеративно извлекают уроки из данных, машинное обучение позволяет компьютерам находить скрытые идеи, не будучи явно запрограммированным, где искать.
# Обучение с учителем
MLlib от Spark в основном предназначен для задач обучения с учителем и без учителя, причем большинство его алгоритмов подпадают под эти две категории.
Алгоритмы обучения с учителем обучаются с использованием помеченных примеров, таких как входные данные, где известен желаемый результат.
Например, элемент оборудования может иметь точки данных, помеченные либо “F” (сбой), либо “R” (работает).
Алгоритм обучения получает набор входных данных вместе с соответствующими правильными выходными данными, и алгоритм учится, сравнивая свои фактические выходные данные с правильными выходными данными, чтобы найти ошибки.
Затем он соответствующим образом модифицирует модель.
С помощью таких методов, как классификация, регрессия, прогнозирование и повышение градиента, контролируемое обучение использует шаблоны для прогнозирования значений метки на дополнительных немаркированных данных.
Контролируемое обучение обычно используется в приложениях, где исторические данные предсказывают вероятные будущие события.
Например, оно может предвидеть, когда транзакции по кредитным картам могут быть мошенническими или какой страховой клиент, скорее всего, подаст претензию.
Или он может попытаться предсказать цену дома на основе различных характеристик для домов, для которых у нас есть исторические данные о ценах.
# Обучение без учителя
Неконтролируемое обучение используется для данных, которые не имеют исторических меток.
Системе не сообщается "правильный ответ". Алгоритм должен вычислить, что показывается.
Цель состоит в том, чтобы изучить данные и найти в них некоторую структуру.
Например, он может найти основные атрибуты, которые отделяют сегменты клиентов друг от друга.
Популярные методы включают самоорганизующиеся карты, сопоставление ближайших соседей, кластеризацию k-средних и декомпозицию по сингулярным значениям.
Одна из проблем заключается в том, что может быть трудно оценить результаты неконтролируемой модели!

# Машинное обучение с помощью PySpark
У Spark есть собственный MLlib для машинного обучения.
Будущее MLlib использует синтаксис Spark 2.0 DataFrame.
Одна из главных “причуд” использования MLlib заключается в том, что вам нужно отформатировать ваши данные так, чтобы в конечном итоге в них был только один или два столбца:
Функции, метки (контролируемые)
Функции (неконтролируемые)
Это требует немного больше работы по обработке данных, чем некоторые другие библиотеки машинного обучения, но большим плюсом является то, что точно такой же синтаксис работает с распределенными данными, что является немаловажным достижением для того, что происходит “под капотом”!
При работе с Python и Spark с MLlib примеры документации всегда содержат красиво отформатированные данные.
Огромная часть изучения MLlib - это освоение документации!
Умение овладеть навыком поиска информации (а не запоминания) - ключ к тому, чтобы стать отличным разработчиком Spark и Python!

# Preprocessing
Кодирование брендов

In [119]:
df.groupby('brand').count().orderBy('count', ascending=False).show(5)  # Сначала проверим частоты использования брендов

+---------+-----+
|    brand|count|
+---------+-----+
|     ford|   48|
|chevrolet|   47|
| plymouth|   31|
|    dodge|   28|
|      amc|   27|
+---------+-----+


In [120]:
def one_hot_encoder(dataframe, col):
    indexed = StringIndexer().setInputCol(col).setOutputCol(col + '_cat'). \
        fit(dataframe).transform(dataframe)  # преобразование категориальных значений в индексы категорий
    ohe = OneHotEncoder().setInputCol(col + '_cat').setOutputCol(col + '_OneHotEncoded'). \
        fit(indexed).transform(indexed)
    ohe = ohe.drop(*[col, col + '_cat'])
    return ohe


df = one_hot_encoder(df, col='brand')
df.show(5)

+----+---------+------------+----------+------+------------+----------+------+-------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|brand_OneHotEncoded|
+----+---------+------------+----------+------+------------+----------+------+-------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|     (29,[1],[1.0])|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|     (29,[8],[1.0])|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|     (29,[2],[1.0])|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|     (29,[4],[1.0])|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|     (29,[0],[1.0])|
+----+---------+------------+----------+------+------------+----------+------+-------------------+


In [121]:
def vector_assembler(dataframe, indep_cols):
    assembler = VectorAssembler(inputCols = indep_cols, outputCol = 'features')
    output = assembler.transform(dataframe).drop(*indep_cols)
    return output

df = vector_assembler(df, indep_cols = df.drop('mpg').columns)
df.show(5)

+----+--------------------+
| mpg|            features|
+----+--------------------+
|18.0|(36,[0,1,2,3,4,5,...|
|15.0|(36,[0,1,2,3,4,5,...|
|18.0|(36,[0,1,2,3,4,5,...|
|16.0|(36,[0,1,2,3,4,5,...|
|17.0|(36,[0,1,2,3,4,5,...|
+----+--------------------+


# Train-Test Split

In [122]:
train_data, test_data = df.randomSplit([0.8, 0.2])

print('Train Shape: ({}, {})'.format(train_data.count(), len(train_data.columns)))
print('Test Shape: ({}, {})'.format(test_data.count(), len(test_data.columns)))

Train Shape: (317, 2)
Test Shape: (75, 2)


# Множественная линейная регрессия с помощью PySpark
Обучение модели

In [123]:
lr = LinearRegression(labelCol = 'mpg', featuresCol = 'features', regParam = 0.3) # избежим переобучения

lr = lr.fit(train_data)

Оценка модели

In [124]:
def evaluate_reg_model(model, test_data):
    print(model.__class__.__name__.center(70, '-'))
    model_results = model.evaluate(test_data)
    print('R2: {}'.format(model_results.r2))
    print('MSE: {}'.format(model_results.meanSquaredError))
    print('RMSE: {}'.format(model_results.rootMeanSquaredError))
    print('MAE: {}'.format(model_results.meanAbsoluteError))
    print(70*'-')

evaluate_reg_model(lr, test_data)

------------------------LinearRegressionModel-------------------------
R2: 0.7604419258990616
MSE: 13.947856398758665
RMSE: 3.7346829047134196
MAE: 2.7347807490894396
----------------------------------------------------------------------


# ОБЯЗАТЕЛЬНО заканчиваем spark сессию

In [125]:
spark.stop()