In [0]:
## INSTALLING ALL THE NECESSARY MODULES 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf /content/spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
## SETTING UP THE ENVIROMENT
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
## INITIALIZING THE VARIABLES AND CONFIGURATIONS
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iris_test').master("local[*]").getOrCreate()

In [6]:
#### IMPORTING DATA FROM PANDAS LIBRARY TO FEED PYSPARK DATAFRAME 
import pandas as pd 
url = "https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7/raw/639388c2cbc2120a14dcf466e85730eb8be498bb/iris.csv"
df = pd.read_csv(url, header = 'infer')
df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


In [7]:
#### MAKING PYSPARK DATAFRAME FROM PANDAS DATAFRAME 
py_df= spark.createDataFrame(df)
py_df.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

In [9]:
### CHECKING THE SCHEMA OF PYSPARK DATAFRAME
py_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [0]:
#### IMPORTING VECTOR ASSEMBLER FOR CREATING A FEATURE COLUMN WHICH WILL WORK AS TRAINING DATA (VECTOR REQUIRED FOR ML FORMAT)
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['sepal_length','sepal_width','petal_length','petal_width'],
                           outputCol = 'features')

In [0]:
### IMPORTING NECESSARY LIBRARIES FOR CONVERTING DATA INTO ML FORMAT THAT IS FORMATTING TARGET VARIABLE
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator

In [0]:
#### TRANSFORMING DATA INTO REQUIRED FORMAT WHICH IS STRING TO INDEX VARIABLE
target = StringIndexer(inputCol = 'species', outputCol = 'species_index')
py_df = target.fit(py_df).transform(py_df)

In [14]:
py_df.show()

+------------+-----------+------------+-----------+-------+-------------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_index|
+------------+-----------+------------+-----------+-------+-------------+
|         5.1|        3.5|         1.4|        0.2| setosa|          2.0|
|         4.9|        3.0|         1.4|        0.2| setosa|          2.0|
|         4.7|        3.2|         1.3|        0.2| setosa|          2.0|
|         4.6|        3.1|         1.5|        0.2| setosa|          2.0|
|         5.0|        3.6|         1.4|        0.2| setosa|          2.0|
|         5.4|        3.9|         1.7|        0.4| setosa|          2.0|
|         4.6|        3.4|         1.4|        0.3| setosa|          2.0|
|         5.0|        3.4|         1.5|        0.2| setosa|          2.0|
|         4.4|        2.9|         1.4|        0.2| setosa|          2.0|
|         4.9|        3.1|         1.5|        0.1| setosa|          2.0|
|         5.4|        3.7|         1.5

In [15]:
py_df.select('species','species_index').show()

+-------+-------------+
|species|species_index|
+-------+-------------+
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
| setosa|          2.0|
+-------+-------------+
only showing top 20 rows



In [16]:
## COUNTING THE NUMBER OF TARGET COUNT PRESENT 
py_df.groupBy('species_index').count().show()

+-------------+-----+
|species_index|count|
+-------------+-----+
|          0.0|   50|
|          1.0|   50|
|          2.0|   50|
+-------------+-----+



In [21]:
## ONE HOT ENCODING OF TARGET VARIABLE 
OHE = OneHotEncoderEstimator(inputCols=['species_index'],outputCols=['species_ohe'])
py_df = OHE.fit(py_df).transform(py_df)

+------------+-----------+------------+-----------+-------+-------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_index|species_ohe|
+------------+-----------+------------+-----------+-------+-------------+-----------+
|         5.1|        3.5|         1.4|        0.2| setosa|          2.0|  (2,[],[])|
|         4.9|        3.0|         1.4|        0.2| setosa|          2.0|  (2,[],[])|
|         4.7|        3.2|         1.3|        0.2| setosa|          2.0|  (2,[],[])|
|         4.6|        3.1|         1.5|        0.2| setosa|          2.0|  (2,[],[])|
|         5.0|        3.6|         1.4|        0.2| setosa|          2.0|  (2,[],[])|
|         5.4|        3.9|         1.7|        0.4| setosa|          2.0|  (2,[],[])|
|         4.6|        3.4|         1.4|        0.3| setosa|          2.0|  (2,[],[])|
|         5.0|        3.4|         1.5|        0.2| setosa|          2.0|  (2,[],[])|
|         4.4|        2.9|         1.4|        0.2| se

In [20]:
py_df.select('species','species_index','species_ohe').show()

+-------+-------------+-----------+
|species|species_index|species_ohe|
+-------+-------------+-----------+
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
| setosa|          2.0|  (2,[],[])|
+-------+-------------+-----------+
only showing top 20 rows



In [27]:
########### CHANGING THE NAME OF TARGET VARIABLE TO 'label' AS ML LIBRARY TAKES TARGET VARIABLE FOR THIS NAME ONLY
# py_df = py_df.select(*[col(s).alias('label') if s == 'species_index' else s for s in py_df.columns])
import pyspark.sql.functions as F
py_df = py_df.select( '*', F.col('species_index').alias('label') ).drop('species_index')
py_df.show()

+------------+-----------+------------+-----------+-------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|species|species_ohe|label|
+------------+-----------+------------+-----------+-------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2| setosa|  (2,[],[])|  2.0|
|         4.9|        3.0|         1.4|        0.2| setosa|  (2,[],[])|  2.0|
|         4.7|        3.2|         1.3|        0.2| setosa|  (2,[],[])|  2.0|
|         4.6|        3.1|         1.5|        0.2| setosa|  (2,[],[])|  2.0|
|         5.0|        3.6|         1.4|        0.2| setosa|  (2,[],[])|  2.0|
|         5.4|        3.9|         1.7|        0.4| setosa|  (2,[],[])|  2.0|
|         4.6|        3.4|         1.4|        0.3| setosa|  (2,[],[])|  2.0|
|         5.0|        3.4|         1.5|        0.2| setosa|  (2,[],[])|  2.0|
|         4.4|        2.9|         1.4|        0.2| setosa|  (2,[],[])|  2.0|
|         4.9|        3.1|         1.5|        0.1| setosa|  (2,

In [0]:
### CHANGE FORMAT FOR MAHCINE LEARNING 
output = assembler.transform(py_df)

In [29]:
output.select('features','label').show()

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  2.0|
|[4.9,3.0,1.4,0.2]|  2.0|
|[4.7,3.2,1.3,0.2]|  2.0|
|[4.6,3.1,1.5,0.2]|  2.0|
|[5.0,3.6,1.4,0.2]|  2.0|
|[5.4,3.9,1.7,0.4]|  2.0|
|[4.6,3.4,1.4,0.3]|  2.0|
|[5.0,3.4,1.5,0.2]|  2.0|
|[4.4,2.9,1.4,0.2]|  2.0|
|[4.9,3.1,1.5,0.1]|  2.0|
|[5.4,3.7,1.5,0.2]|  2.0|
|[4.8,3.4,1.6,0.2]|  2.0|
|[4.8,3.0,1.4,0.1]|  2.0|
|[4.3,3.0,1.1,0.1]|  2.0|
|[5.8,4.0,1.2,0.2]|  2.0|
|[5.7,4.4,1.5,0.4]|  2.0|
|[5.4,3.9,1.3,0.4]|  2.0|
|[5.1,3.5,1.4,0.3]|  2.0|
|[5.7,3.8,1.7,0.3]|  2.0|
|[5.1,3.8,1.5,0.3]|  2.0|
+-----------------+-----+
only showing top 20 rows



In [30]:
### SPLITING THE DATA RANDOMLY FOR TRAIN TEST SET  
final_data = output.select(['features','label'])
train,test = final_data.randomSplit([0.7,0.3])
train.describe().show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|                95|
|   mean|0.8736842105263158|
| stddev|0.8153070856413407|
|    min|               0.0|
|    max|               2.0|
+-------+------------------+



In [31]:
train.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|   38|
|  1.0|   31|
|  2.0|   26|
+-----+-----+



In [0]:
#### IMPORTING LOGISTIC REGRESSION FOR CLASSIFICATION 
from pyspark.ml.classification import LogisticRegression
lr_model = LogisticRegression()

fit_lr_model = lr_model.fit(train)

In [0]:
### PREDICTION FORM THE TRAINED MODEL
predictions_eval = fit_lr_model.evaluate(test)

In [34]:
predictions_eval.predictions.show()

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.4,2.9,1.4,0.2]|  2.0|[-23.411977040831...|[3.32819137768198...|       2.0|
|[4.4,3.2,1.3,0.2]|  2.0|[-33.601267937026...|[1.07842806102753...|       2.0|
|[4.6,3.1,1.5,0.2]|  2.0|[-27.555289370891...|[6.46374740344420...|       2.0|
|[4.6,3.2,1.4,0.2]|  2.0|[-31.284067383370...|[1.96872169704188...|       2.0|
|[4.6,3.4,1.4,0.3]|  2.0|[-36.466061019235...|[5.19641342392006...|       2.0|
|[4.7,3.2,1.6,0.2]|  2.0|[-29.377684750610...|[2.94776716928976...|       2.0|
|[4.8,3.0,1.4,0.1]|  2.0|[-24.283394764468...|[1.27150065920258...|       2.0|
|[4.8,3.0,1.4,0.3]|  2.0|[-21.726356268768...|[7.82689399151179...|       2.0|
|[4.8,3.4,1.9,0.2]|  2.0|[-33.433293430945...|[5.02836138177326...|       2.0|
|[4.9,3.1,1.5,0.1]|  2.0|[-26.105790144187...|[5.798

In [35]:
### ROC FOR ACCURACY AND TRAINING 
my_eval = MulticlassClassificationEvaluator()
roc = my_eval.evaluate(predictions_eval.predictions)
roc

1.0