In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [None]:
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

**Домашнее задание**

2 варианта.

Вариант легкий: 
Решите задачу классификации цветков ирисов с использованием PySpark

Вариант сложный: 
Решите задачу классификации пассажиров титаника с использованием PySpark (https://www.kaggle.com/c/titanic)

При выполнении ДЗ не разрешается:
* Использовать библиотеку pandas
* Использовать библиотеку sklearn

Полезные импорты:
* from pyspark.ml.classification import LogisticRegression
* from pyspark.ml.evaluation import MulticlassClassificationEvaluator - для оценки качества работы алгоритма
* from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler - для предобработки признаков

Полезные ссылки:

https://spark.apache.org/docs/latest/ml-classification-regression.html#classification - алгоримты классификации в pyspark

https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa - пример решения задачи классификации на pyspark

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler # для предобработки признаков
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # для оценки качества работы алгоритма

**Iris Dataset**


In [None]:
iris = spark.read.csv('iris.csv', inferSchema=True, header=True)

In [None]:
iris.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [None]:
iris.show(3)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 3 rows



In [None]:
# основные статистики и наличие пропущенных значений:
iris.describe().show()

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  variety|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|     null|
| stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   Setosa|
|    max|               7.9|                4.4|               6.9|               2.5|Virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



Preparing Data for Machine Learning

In [None]:
stages = []
label_stringIdx = StringIndexer(inputCol = 'variety', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
assemblerInputs = numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Pipeline

In [None]:
cols = iris.columns

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(iris)
df = pipelineModel.transform(iris)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)



In [None]:
(train, test) = df.randomSplit([0.7, 0.3], seed = 100) # seed - параметр для воспроизводимости разделения
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

# параметры метода:
# randomSplit(weights, seed=None)
# Randomly splits this DataFrame with the provided weights.
# Parameters:	
# weights – list of doubles as weights with which to split the DataFrame. Weights will be normalized if they don’t sum up to 1.0.
# seed – The seed for sampling.

Training Dataset Count: 103
Test Dataset Count: 47


Logistic Regression Model

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

Make predictions on the test set

In [None]:
predictions = lrModel.transform(test)
predictions.show(10)

+-----+-----------------+------------+-----------+------------+-----------+---------+--------------------+--------------------+----------+
|label|         features|sepal_length|sepal_width|petal_length|petal_width|  variety|       rawPrediction|         probability|prediction|
+-----+-----------------+------------+-----------+------------+-----------+---------+--------------------+--------------------+----------+
|  0.0|[5.8,2.7,5.1,1.9]|         5.8|        2.7|         5.1|        1.9|Virginica|[9.35042575194685...|[0.97384677344424...|       0.0|
|  0.0|[6.0,2.2,5.0,1.5]|         6.0|        2.2|         5.0|        1.5|Virginica|[10.1267477518120...|[0.92536200916523...|       0.0|
|  0.0|[6.0,3.0,4.8,1.8]|         6.0|        3.0|         4.8|        1.8|Virginica|[6.23132593810634...|[0.64331785535649...|       0.0|
|  0.0|[6.1,2.6,5.6,1.4]|         6.1|        2.6|         5.6|        1.4|Virginica|[8.14063822205556...|[0.70054278256136...|       0.0|
|  0.0|[6.3,2.8,5.1,1.5]|  

In [None]:
evaluator = MulticlassClassificationEvaluator()
print('Test Area Under ROC', '%.3f' % evaluator.evaluate(predictions))

Test Area Under ROC 0.895
