In [1]:
#!pip install pyspark==3.0.1 py4j==0.10.9

In [2]:
#импортируем библиотеки
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd

In [3]:
#создаем приложение
spark = SparkSession.builder\
        .master("local[2]")\
        .appName('PySpark_Iris')\
        .getOrCreate()

# Загрузить в pyspark iris.csv.

In [4]:
# Чтение CSV файла
csv_file = './Data/iris_csv.csv'
df = spark.read.csv(csv_file, sep=',', inferSchema= True, header=True)
df.printSchema()

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- class: string (nullable = true)



In [5]:
df.show(3)

+-----------+----------+-----------+----------+-----------+
|sepallength|sepalwidth|petallength|petalwidth|      class|
+-----------+----------+-----------+----------+-----------+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|
+-----------+----------+-----------+----------+-----------+
only showing top 3 rows



In [6]:
print('Статистика датафрейма')
df.describe().show()

Статистика датафрейма
+-------+------------------+-------------------+------------------+------------------+--------------+
|summary|       sepallength|         sepalwidth|       petallength|        petalwidth|         class|
+-------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|                150|               150|               150|           150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          null|
|    min|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+-------------------+------------------+------------------+--------------+



In [7]:
print('разделение по группам:')
df.groupBy('class').count().show()

разделение по группам:
+---------------+-----+
|          class|count|
+---------------+-----+
| Iris-virginica|   50|
|    Iris-setosa|   50|
|Iris-versicolor|   50|
+---------------+-----+



# При помощи VectorAssembler преобразовать все колонки с признаками в одну (использовать PipeLine)

In [8]:
pipeline = Pipeline(stages =
    [
        StringIndexer(inputCol='class', outputCol='out_ind'),
        VectorAssembler(inputCols=['sepallength','sepalwidth','petallength','petalwidth'], outputCol='features')
    ])

In [9]:
pipeline_trained = pipeline.fit(df)
df1 = pipeline_trained.transform(df)
df1.show(3)

+-----------+----------+-----------+----------+-----------+-------+-----------------+
|sepallength|sepalwidth|petallength|petalwidth|      class|out_ind|         features|
+-----------+----------+-----------+----------+-----------+-------+-----------------+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|    0.0|[5.1,3.5,1.4,0.2]|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|    0.0|[4.9,3.0,1.4,0.2]|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|    0.0|[4.7,3.2,1.3,0.2]|
+-----------+----------+-----------+----------+-----------+-------+-----------------+
only showing top 3 rows



# Разбить данные на train и test

In [10]:
train, test = df1.randomSplit([0.8,0.2], seed=55)
test.groupBy('class').count().show()

+---------------+-----+
|          class|count|
+---------------+-----+
| Iris-virginica|   14|
|    Iris-setosa|   14|
|Iris-versicolor|   15|
+---------------+-----+



# Создать модель логистической регресии и обучить ее

In [11]:
lr = LogisticRegression(featuresCol='features', labelCol='out_ind')

lr_model = lr.fit(train)

train_res = lr_model.transform(train)
test_res = lr_model.transform(test)

test_res.groupBy('class','out_ind','prediction').count().show()

+---------------+-------+----------+-----+
|          class|out_ind|prediction|count|
+---------------+-------+----------+-----+
|    Iris-setosa|    0.0|       0.0|   14|
|Iris-versicolor|    1.0|       2.0|    1|
|Iris-versicolor|    1.0|       1.0|   14|
| Iris-virginica|    2.0|       2.0|   13|
| Iris-virginica|    2.0|       1.0|    1|
+---------------+-------+----------+-----+



# Воспользоваться MulticlassClassificationEvaluator для оценки качества на train и test множестве

## train

In [12]:
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='out_ind')
my_mc_lr.evaluate(train_res)

0.9813084112149533

## test

In [13]:
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='out_ind')
my_mc_lr.evaluate(test_res)

0.9534883720930233