### Creating Spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Linear_Regression').getOrCreate()
spark

### Imports 

In [2]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression

In [3]:
main_df = spark.read.csv('Fish.csv',header=True,inferSchema=True)
main_df.show()
main_df.printSchema()

+-------+------+-------+-------+-------+-------+------+
|Species|Weight|Length1|Length2|Length3| Height| Width|
+-------+------+-------+-------+-------+-------+------+
|  Bream| 242.0|   23.2|   25.4|   30.0|  11.52|  4.02|
|  Bream| 290.0|   24.0|   26.3|   31.2|  12.48|4.3056|
|  Bream| 340.0|   23.9|   26.5|   31.1|12.3778|4.6961|
|  Bream| 363.0|   26.3|   29.0|   33.5|  12.73|4.4555|
|  Bream| 430.0|   26.5|   29.0|   34.0| 12.444| 5.134|
|  Bream| 450.0|   26.8|   29.7|   34.7|13.6024|4.9274|
|  Bream| 500.0|   26.8|   29.7|   34.5|14.1795|5.2785|
|  Bream| 390.0|   27.6|   30.0|   35.0|  12.67|  4.69|
|  Bream| 450.0|   27.6|   30.0|   35.1|14.0049|4.8438|
|  Bream| 500.0|   28.5|   30.7|   36.2|14.2266|4.9594|
|  Bream| 475.0|   28.4|   31.0|   36.2|14.2628|5.1042|
|  Bream| 500.0|   28.7|   31.0|   36.2|14.3714|4.8146|
|  Bream| 500.0|   29.1|   31.5|   36.4|13.7592| 4.368|
|  Bream| 340.0|   29.5|   32.0|   37.3|13.9129|5.0728|
|  Bream| 600.0|   29.4|   32.0|   37.2|14.9544|

In [4]:
for cols in main_df.columns :
    print(cols, ':',main_df.filter(col(cols).isNull()).count())

Species : 0
Weight : 0
Length1 : 0
Length2 : 0
Length3 : 0
Height : 0
Width : 0


In [5]:
assembler = VectorAssembler (inputCols=['Length1',
                                        'Length2',
                                       'Length3',
                                       'Height',
                                       'Width'],
                            outputCol='Features')

assembled_df = assembler.transform(main_df)

In [6]:
model_df = assembled_df.select('Features','Weight')
model_df.show()

+--------------------+------+
|            Features|Weight|
+--------------------+------+
|[23.2,25.4,30.0,1...| 242.0|
|[24.0,26.3,31.2,1...| 290.0|
|[23.9,26.5,31.1,1...| 340.0|
|[26.3,29.0,33.5,1...| 363.0|
|[26.5,29.0,34.0,1...| 430.0|
|[26.8,29.7,34.7,1...| 450.0|
|[26.8,29.7,34.5,1...| 500.0|
|[27.6,30.0,35.0,1...| 390.0|
|[27.6,30.0,35.1,1...| 450.0|
|[28.5,30.7,36.2,1...| 500.0|
|[28.4,31.0,36.2,1...| 475.0|
|[28.7,31.0,36.2,1...| 500.0|
|[29.1,31.5,36.4,1...| 500.0|
|[29.5,32.0,37.3,1...| 340.0|
|[29.4,32.0,37.2,1...| 600.0|
|[29.4,32.0,37.2,1...| 600.0|
|[30.4,33.0,38.3,1...| 700.0|
|[30.4,33.0,38.5,1...| 700.0|
|[30.9,33.5,38.6,1...| 610.0|
|[31.0,33.5,38.7,1...| 650.0|
+--------------------+------+
only showing top 20 rows



### Data Spliting

In [7]:
train_df, test_df = model_df.randomSplit(weights=[0.7,0.3],seed=5)

### Linear Regression Model Building

In [8]:
linear_regression = LinearRegression(featuresCol='Features',labelCol='Weight')
lr_model = linear_regression.fit(train_df)
lr_train_preds = lr_model.transform(train_df)
lr_test_preds = lr_model.transform(test_df)

In [9]:
lr_train_preds.show(5)
lr_test_preds.show(5)

+--------------------+------+-------------------+
|            Features|Weight|         prediction|
+--------------------+------+-------------------+
|[7.5,8.4,8.8,2.11...|   5.9|-246.23504533648466|
|[9.3,9.8,10.8,1.7...|   6.7| -219.8115009685074|
|[10.1,10.6,11.6,1...|   7.0|-196.13796151882838|
|[10.4,11.0,12.0,2...|   9.7|-172.42336123496653|
|[10.7,11.2,12.4,2...|   9.8|-172.06086090385747|
+--------------------+------+-------------------+
only showing top 5 rows

+--------------------+------+-------------------+
|            Features|Weight|         prediction|
+--------------------+------+-------------------+
|[10.0,10.5,11.6,1...|   7.5| -194.2894341801678|
|[11.3,11.8,13.1,2...|   9.9|-159.00869100174367|
|[12.5,13.7,14.7,3...|  32.0| -87.93299634375677|
|[13.8,15.0,16.0,3...|  40.0| -32.24639856669933|
|[17.2,19.0,20.2,5...|  80.0|  94.02371436605779|
+--------------------+------+-------------------+
only showing top 5 rows



### Evaluation of linear Regression

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

In [11]:
rmse_evaluator = RegressionEvaluator(labelCol='Weight',predictionCol='prediction',metricName='rmse')
mae_evaluator = RegressionEvaluator(labelCol='Weight',predictionCol='prediction',metricName='mae')
mse_evaluator = RegressionEvaluator(labelCol='Weight',predictionCol='prediction',metricName='mse')
r2_evaluator = RegressionEvaluator(labelCol='Weight',predictionCol='prediction',metricName='r2')

def myfun(model_pred) :
    print('rmse :',rmse_evaluator.evaluate(model_pred))
    print('mae :',mae_evaluator.evaluate(model_pred))
    print('mse :',mse_evaluator.evaluate(model_pred))
    print('r2 :',r2_evaluator.evaluate(model_pred))
    model_pred.groupBy('prediction','Weight').count().show(5)
    

In [12]:
print('Trainig Data Evaluation')
myfun(lr_test_preds)

Trainig Data Evaluation
rmse : 108.08482429134455
mae : 76.64044664951015
mse : 11682.329242090824
r2 : 0.8851459855410021
+------------------+------+-----+
|        prediction|Weight|count|
+------------------+------+-----+
|198.69423017714462| 140.0|    1|
|471.81795878787455| 450.0|    1|
| 280.1823975275746| 188.0|    1|
|-194.2894341801678|   7.5|    1|
| 94.02371436605779|  80.0|    1|
+------------------+------+-----+
only showing top 5 rows



In [13]:
print('Testing Data Evalaution')
myfun(lr_train_preds)

Testing Data Evalaution
rmse : 125.88014721403526
mae : 99.4528579694665
mse : 15845.81146262719
r2 : 0.8841567265205021
+-------------------+------+-----+
|         prediction|Weight|count|
+-------------------+------+-----+
| 380.59116768539445| 306.0|    1|
|  721.4472355186133| 540.0|    1|
| 144.34539681096123| 125.0|    1|
|-172.06086090385747|   9.8|    1|
| 19.705640830143125|  55.0|    1|
+-------------------+------+-----+
only showing top 5 rows



### Naive Bayes Classification Algorithm

In [14]:
from pyspark.ml.classification import NaiveBayes

In [15]:
indexer = StringIndexer(inputCol='Species',outputCol='indexed_Species')
indexed_df= indexer.fit(assembled_df)
final_df = indexed_df.transform(assembled_df)

In [16]:
nb_df = final_df.select('indexed_Species','Features')
nb_df.show(5)

+---------------+--------------------+
|indexed_Species|            Features|
+---------------+--------------------+
|            1.0|[23.2,25.4,30.0,1...|
|            1.0|[24.0,26.3,31.2,1...|
|            1.0|[23.9,26.5,31.1,1...|
|            1.0|[26.3,29.0,33.5,1...|
|            1.0|[26.5,29.0,34.0,1...|
+---------------+--------------------+
only showing top 5 rows



In [17]:
df_train2, df_test2 = nb_df.randomSplit(weights=[0.8,0.2],seed=10)

In [18]:
naive_bayes = NaiveBayes(featuresCol='Features',labelCol='indexed_Species')
nv_model = naive_bayes.fit(df_train2)
nv_test_preds = nv_model.transform(df_test2)
nv_train_preds = nv_model.transform(df_train2)

In [19]:
nv_train_preds.show(5)
nv_test_preds.show(5)

+---------------+--------------------+--------------------+--------------------+----------+
|indexed_Species|            Features|       rawPrediction|         probability|prediction|
+---------------+--------------------+--------------------+--------------------+----------+
|            0.0|[7.5,8.4,8.8,2.11...|[-41.201359458005...|[0.37086682415316...|       0.0|
|            0.0|[13.8,15.0,16.0,3...|[-73.448074904102...|[0.40064267030956...|       0.0|
|            0.0|[15.7,17.4,18.5,4...|[-85.287370758661...|[0.41508589844416...|       0.0|
|            0.0|[16.2,18.0,19.2,5...|[-90.236739592703...|[0.42045276151081...|       0.0|
|            0.0|[17.2,19.0,20.2,5...|[-94.178254000578...|[0.41030626995115...|       0.0|
+---------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows

+---------------+--------------------+--------------------+--------------------+----------+
|indexed_Species|            Features|       rawPredict

### Evaluation oF Naive Bayes

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [21]:
f1_evaluator = MulticlassClassificationEvaluator(labelCol='indexed_Species', predictionCol='prediction', metricName='f1')
accuracy_evaluator  = MulticlassClassificationEvaluator(labelCol='indexed_Species',predictionCol='prediction',metricName='accuracy')
precision_evaluator = MulticlassClassificationEvaluator(labelCol='indexed_Species',predictionCol='prediction',metricName='weightedPrecision')
recall_evaluator = MulticlassClassificationEvaluator(labelCol='indexed_Species',predictionCol='prediction',metricName='weightedRecall')

def myfun2(model_pred2): 
    print('F1 :',f1_evaluator.evaluate(model_pred2))
    print('accuracy :',accuracy_evaluator.evaluate(model_pred2))
    print('precision :',precision_evaluator.evaluate(model_pred2))
    print('recall :',recall_evaluator.evaluate(model_pred2))

    model_pred2.groupBy('prediction','indexed_Species').count().show(5)

In [22]:
print('Training Evaluation')
myfun2(nv_train_preds)

Training Evaluation
F1 : 0.5071954710088322
accuracy : 0.6328125
precision : 0.4388530865975422
recall : 0.6328125
+----------+---------------+-----+
|prediction|indexed_Species|count|
+----------+---------------+-----+
|       0.0|            5.0|    2|
|       1.0|            1.0|   25|
|       0.0|            4.0|   13|
|       1.0|            5.0|    6|
|       0.0|            6.0|    5|
+----------+---------------+-----+
only showing top 5 rows



In [23]:
print('Testing Evaluation')
myfun2(nv_test_preds)

Testing Evaluation
F1 : 0.7660026291839418
accuracy : 0.8387096774193549
precision : 0.7063172043010754
recall : 0.8387096774193549
+----------+---------------+-----+
|prediction|indexed_Species|count|
+----------+---------------+-----+
|       0.0|            5.0|    1|
|       1.0|            1.0|   10|
|       0.0|            4.0|    1|
|       1.0|            5.0|    2|
|       0.0|            6.0|    1|
+----------+---------------+-----+
only showing top 5 rows



### Giving unseen Data to Trained Model


In [25]:
unseen_df = spark.read.csv('Iris.csv',header=True,inferSchema=True)

In [26]:
unseen_df.show(4)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 4 rows



In [27]:
unseen_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



In [30]:
for cols in unseen_df.columns :
    print(cols, ':',unseen_df.filter(col(cols).isNull()).count())

Id : 0
SepalLengthCm : 0
SepalWidthCm : 0
PetalLengthCm : 0
PetalWidthCm : 0
Species : 0


In [31]:
indexer2= StringIndexer(inputCol='Species',outputCol='indexed_Species')
indexed2 = indexer2.fit(unseen_df)
indexed2 = indexed2.transform(unseen_df)


In [34]:
indexed2.show(3)

+---+-------------+------------+-------------+------------+-----------+---------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|indexed_Species|
+---+-------------+------------+-------------+------------+-----------+---------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|            0.0|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|            0.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|            0.0|
+---+-------------+------------+-------------+------------+-----------+---------------+
only showing top 3 rows



In [37]:
assembler2 = VectorAssembler(inputCols=['SepalLengthCm',
                                        'SepalWidthCm',
                                       'PetalLengthCm',
                                       'PetalWidthCm'],
                            outputCol='Features')

assembled2_df = assembler2.transform(indexed2)


In [39]:
assembled2_df.show(5)

+---+-------------+------------+-------------+------------+-----------+---------------+-----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|indexed_Species|         Features|
+---+-------------+------------+-------------+------------+-----------+---------------+-----------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|            0.0|[5.1,3.5,1.4,0.2]|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|            0.0|[4.9,3.0,1.4,0.2]|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|            0.0|[4.7,3.2,1.3,0.2]|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|            0.0|[4.6,3.1,1.5,0.2]|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|            0.0|[5.0,3.6,1.4,0.2]|
+---+-------------+------------+-------------+------------+-----------+---------------+-----------------+
only showing top 5 rows



In [52]:
unseen_df2 = assembled2_df.select('indexed_Species','Features')

In [54]:
unseen_df2.show(3)

+---------------+-----------------+
|indexed_Species|         Features|
+---------------+-----------------+
|            0.0|[5.1,3.5,1.4,0.2]|
|            0.0|[4.9,3.0,1.4,0.2]|
|            0.0|[4.7,3.2,1.3,0.2]|
+---------------+-----------------+
only showing top 3 rows



In [55]:
nb_unseen_preds = nv_model.transform(unseen_df2)