In [2]:
!pip install findspark
#!pip install numpy

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [3]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [4]:
if __name__ == '__main__':
    spark = SparkSession.\
            builder.\
            appName('PySparkLinReg').\
            getOrCreate()

In [7]:
dataset = spark.read.csv(r"/home/sidhandsome/ML_PySpark/pyspark_examples/Admission_Prediction.csv", header=True)
dataset

DataFrame[GRE Score: string, TOEFL Score: string, University Rating: string, SOP: string, LOR: string, CGPA: string, Research: string, Chance of Admit: string]

In [8]:
dataset.show()

+---------+-----------+-----------------+----+----+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating| SOP| LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+----+----+----+--------+---------------+
|   337.00|     118.00|                4|4.50|4.50|9.65|    1.00|           0.92|
|   324.00|     107.00|                4|4.00|4.50|8.87|    1.00|           0.76|
|     null|     104.00|                3|3.00|3.50|8.00|    1.00|           0.72|
|   322.00|     110.00|                3|3.50|2.50|8.67|    1.00|           0.80|
|   314.00|     103.00|                2|2.00|3.00|8.21|    0.00|           0.65|
|   330.00|     115.00|                5|4.50|3.00|9.34|    1.00|           0.90|
|   321.00|     109.00|             null|3.00|4.00|8.20|    1.00|           0.75|
|   308.00|     101.00|                2|3.00|4.00|7.90|    0.00|           0.68|
|   302.00|     102.00|                1|2.00|1.50|8.00|    0.00|           0.50|
|   323.00|     

In [9]:
dataset.printSchema()
dataset.columns

root
 |-- GRE Score: string (nullable = true)
 |-- TOEFL Score: string (nullable = true)
 |-- University Rating: string (nullable = true)
 |-- SOP: string (nullable = true)
 |-- LOR: string (nullable = true)
 |-- CGPA: string (nullable = true)
 |-- Research: string (nullable = true)
 |-- Chance of Admit: string (nullable = true)



['GRE Score',
 'TOEFL Score',
 'University Rating',
 'SOP',
 'LOR',
 'CGPA',
 'Research',
 'Chance of Admit']

In [10]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast('float').alias(c) for c in dataset.columns))
new_data.show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|    337.0|      118.0|              4.0|4.5|4.5|9.65|     1.0|           0.92|
|    324.0|      107.0|              4.0|4.0|4.5|8.87|     1.0|           0.76|
|     null|      104.0|              3.0|3.0|3.5| 8.0|     1.0|           0.72|
|    322.0|      110.0|              3.0|3.5|2.5|8.67|     1.0|            0.8|
|    314.0|      103.0|              2.0|2.0|3.0|8.21|     0.0|           0.65|
|    330.0|      115.0|              5.0|4.5|3.0|9.34|     1.0|            0.9|
|    321.0|      109.0|             null|3.0|4.0| 8.2|     1.0|           0.75|
|    308.0|      101.0|              2.0|3.0|4.0| 7.9|     0.0|           0.68|
|    302.0|      102.0|              1.0|2.0|1.5| 8.0|     0.0|            0.5|
|    323.0|      108.0|              3.0

In [11]:
# Counting Missing Values
from pyspark.sql.functions import col, count, isnan, when
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|       15|         10|               15|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [12]:
# Handling missing values
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['GRE Score', 'TOEFL Score', 'University Rating'], 
                  outputCols=['GRE Score', 'TOEFL Score', 'University Rating'])

imputed_data = imputer.fit(new_data).transform(new_data)
imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in imputed_data.columns]).show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|        0|          0|                0|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [13]:
# Dropping column
features = imputed_data.drop('Chance of Admit')
features

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float]

In [14]:
# Vector Assebler
assembler = VectorAssembler(inputCols=features.columns, outputCol='features')
output = assembler.transform(imputed_data)
output = output.select('features', 'Chance of Admit')
output.show()

+--------------------+---------------+
|            features|Chance of Admit|
+--------------------+---------------+
|[337.0,118.0,4.0,...|           0.92|
|[324.0,107.0,4.0,...|           0.76|
|[316.558776855468...|           0.72|
|[322.0,110.0,3.0,...|            0.8|
|[314.0,103.0,2.0,...|           0.65|
|[330.0,115.0,5.0,...|            0.9|
|[321.0,109.0,3.12...|           0.75|
|[308.0,101.0,2.0,...|           0.68|
|[302.0,102.0,1.0,...|            0.5|
|[323.0,108.0,3.0,...|           0.45|
|[325.0,106.0,3.0,...|           0.52|
|[327.0,111.0,4.0,...|           0.84|
|[316.558776855468...|           0.78|
|[307.0,109.0,3.0,...|           0.62|
|[311.0,104.0,3.0,...|           0.61|
|[314.0,105.0,3.0,...|           0.54|
|[317.0,107.0,3.0,...|           0.66|
|[319.0,106.0,3.0,...|           0.65|
|[318.0,110.0,3.0,...|           0.63|
|[303.0,102.0,3.0,...|           0.62|
+--------------------+---------------+
only showing top 20 rows



In [15]:
# Splitting Dataset
train_df, test_df = output.randomSplit([0.7, 0.3])

In [16]:
# Linear Regression
lin_reg = LinearRegression(featuresCol='features', labelCol='Chance of Admit', predictionCol='prediction')
lin_model = lin_reg.fit(train_df)
lin_model

LinearRegressionModel: uid=LinearRegression_ea5a2eba3572, numFeatures=7

In [18]:
# Intercept and Coefficients
lin_model.coefficients, lin_model.intercept

(DenseVector([0.0017, 0.0024, 0.0052, 0.004, 0.0154, 0.1249, 0.0173]),
 -1.2552154559416933,
 <pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f66980a4cc0>)

In [19]:
# Summary
lin_summ = lin_model.summary
print(lin_summ.r2, lin_summ.rootMeanSquaredError, sep='\n')

0.8173134580594349
0.05977111016235504


In [21]:
# Prediction
predictions = lin_model.transform(test_df)
predictions.show()

+--------------------+---------------+-------------------+
|            features|Chance of Admit|         prediction|
+--------------------+---------------+-------------------+
|[290.0,100.0,1.0,...|           0.47| 0.4813704051059602|
|[295.0,99.0,1.0,2...|           0.37|0.48322184134274604|
|[295.0,101.0,2.0,...|           0.69| 0.5392101869519299|
|[296.0,97.0,2.0,1...|           0.49| 0.5196377872455871|
|[296.0,99.0,2.0,2...|           0.61| 0.5649913903186292|
|[297.0,96.0,2.0,2...|           0.43| 0.5265302171037796|
|[297.0,96.0,2.0,2...|           0.34|0.47674769494093483|
|[298.0,99.0,1.0,1...|           0.53|0.49575585326409644|
|[298.0,100.0,3.0,...|           0.58|  0.606491770871874|
|[298.0,107.187751...|           0.46| 0.5331446476962975|
|[299.0,94.0,1.0,1...|           0.42| 0.4375059882146879|
|[299.0,96.0,2.0,1...|           0.54| 0.5299188747187391|
|[299.0,97.0,3.0,5...|           0.38| 0.5497365157493308|
|[300.0,95.0,2.0,3...|           0.62| 0.589903117287613

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Chance of Admit', metricName='r2')
pred_evaluator.evaluate(predictions)

0.8186736250171593