In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 39.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=0fb85e9e42afab5b4ae068a4d60799218bd7738f20d02e9db1c98ab6096e6ec2
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import numpy as np
import pandas as pd
import pyspark
import os
import urllib
import sys

from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *


In [None]:
spark = pyspark.sql.SparkSession.builder.appName('Iris').getOrCreate()

print ('Python version: {}'.format(sys.version))
print ('Spark version: {}'.format(spark.version))

data = spark.createDataFrame(pd.read_csv('https://raw.githubusercontent.com/jbrownlee/Datasets/master/iris.csv', header=None, names=['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']))
print("First 10 rows of Iris dataset:")
data.show(10)

Python version: 3.6.9 (default, Oct  8 2020, 12:12:24) 
[GCC 8.4.0]
Spark version: 3.0.1
First 10 rows of Iris dataset:
+------------+-----------+------------+-----------+-----------+
|sepal-length|sepal-width|petal-length|petal-width|      class|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
+------------+-----------+------------+---------

In [None]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[sepal-length: double, sepal-width: double, petal-length: double, petal-width: double, class: string]>

In [None]:
data.show

<bound method DataFrame.show of DataFrame[sepal-length: double, sepal-width: double, petal-length: double, petal-width: double, class: string]>

In [None]:
data.describe().show(5,15)

+-------+---------------+---------------+---------------+---------------+--------------+
|summary|   sepal-length|    sepal-width|   petal-length|    petal-width|         class|
+-------+---------------+---------------+---------------+---------------+--------------+
|  count|            150|            150|            150|            150|           150|
|   mean|5.8433333333...|3.0540000000...|3.7586666666...|1.1986666666...|          null|
| stddev|0.8280661279...|0.4335943113...|1.7644204199...|0.7631607417...|          null|
|    min|            4.3|            2.0|            1.0|            0.1|   Iris-setosa|
|    max|            7.9|            4.4|            6.9|            2.5|Iris-virginica|
+-------+---------------+---------------+---------------+---------------+--------------+



In [None]:
# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = pyspark.ml.feature.VectorAssembler(inputCols=feature_cols, outputCol='features')
data = assembler.transform(data)

In [None]:
# convert text labels into indices
data = data.select(['features', 'class'])
label_indexer = pyspark.ml.feature.StringIndexer(inputCol='class', outputCol='label').fit(data)
data = label_indexer.transform(data)

In [None]:
# only select the features and label column
data = data.select(['features', 'label'])
print("Reading for machine learning")
data.show(10)

Reading for machine learning
+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
|[5.4,3.9,1.7,0.4]|  0.0|
|[4.6,3.4,1.4,0.3]|  0.0|
|[5.0,3.4,1.5,0.2]|  0.0|
|[4.4,2.9,1.4,0.2]|  0.0|
|[4.9,3.1,1.5,0.1]|  0.0|
+-----------------+-----+
only showing top 10 rows



In [None]:
# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.70, 0.30])
lr = pyspark.ml.classification.LogisticRegression(regParam=0.01)
model = lr.fit(train)

In [None]:
prediction = model.transform(test)
print("Prediction")
prediction.show(10)

Prediction
+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.6,3.1,1.5,0.2]|  0.0|[5.76853978965701...|[0.96722815173249...|       0.0|
|[4.6,3.4,1.4,0.3]|  0.0|[6.32679184959567...|[0.98554190859180...|       0.0|
|[4.7,3.2,1.6,0.2]|  0.0|[5.77016271259579...|[0.96768338746233...|       0.0|
|[4.8,3.0,1.4,0.1]|  0.0|[5.6037764048288,...|[0.94966572389106...|       0.0|
|[4.8,3.0,1.4,0.3]|  0.0|[5.17322867975136...|[0.93756072310826...|       0.0|
|[4.8,3.1,1.6,0.2]|  0.0|[5.41870286606336...|[0.94930583022936...|       0.0|
|[4.8,3.4,1.6,0.2]|  0.0|[6.09466808123252...|[0.97792488230617...|       0.0|
|[4.9,2.4,3.3,1.0]|  1.0|[0.33458966464309...|[0.15190041328107...|       1.0|
|[5.0,3.2,1.2,0.2]|  0.0|[5.78199121740053...|[0.96131648151059...|       0.0|
|[5.0,3.5,1.3,0.3]|  0.0|[6.1451218627227

In [None]:
evaluator = pyspark.ml.evaluation.MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(prediction)

In [None]:
print("Accuracy is {}".format(accuracy))

Accuracy is 0.9583333333333334
