# Pyspark Dataframe Preprocessing

## Preprocessing 
1. Converting String in to Float
2. Checking missing values
3. Treating missing values
4. Staticstics
5. Checking correlation using pearson method
6. VectorAssembler
7. Standard Scaling
8. PCA

## Import packages

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import pyspark
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession

from pyspark.mllib.feature import StandardScaler,PCA
from pyspark.mllib.stat import Statistics

from pyspark.ml.feature import VectorAssembler

from pyspark.sql.functions import col,count,isnan,isnull,when

## SparkSession

In [3]:
spark = SparkSession.builder.appName('PySpark Dataframe Preprocessing').getOrCreate()

## Read Dataset

In [4]:
dataset = spark.read.csv('Admission_Predict.csv',header=True)

In [5]:
dataset.show()

+---------+-----------+-----------------+---+----+----+--------+----------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+---------+-----------+-----------------+---+----+----+--------+----------------+
|      337|        118|                4|4.5| 4.5|9.65|       1|            0.92|
|      324|        107|                4|  4| 4.5|8.87|       1|            0.76|
|      316|        104|                3|  3| 3.5|   8|       1|            0.72|
|      322|        110|                3|3.5| 2.5|8.67|       1|             0.8|
|      314|        103|                2|  2|   3|8.21|       0|            0.65|
|      330|        115|                5|4.5|   3|9.34|       1|             0.9|
|      321|        109|                3|  3|   4| 8.2|       1|            0.75|
|      308|        101|                2|  3|   4| 7.9|       0|            0.68|
|      302|        102|                1|  2| 1.5|   8|       0|             0.5|
|      323|     

In [6]:
type(dataset)

pyspark.sql.dataframe.DataFrame

In [7]:
dataset.printSchema()

root
 |-- GRE Score: string (nullable = true)
 |-- TOEFL Score: string (nullable = true)
 |-- University Rating: string (nullable = true)
 |-- SOP: string (nullable = true)
 |-- LOR : string (nullable = true)
 |-- CGPA: string (nullable = true)
 |-- Research: string (nullable = true)
 |-- Chance of Admit : string (nullable = true)



In [8]:
dataset.columns

['GRE Score',
 'TOEFL Score',
 'University Rating',
 'SOP',
 'LOR ',
 'CGPA',
 'Research',
 'Chance of Admit ']

## DataFrame Preprocessing

### 1. Converting String in to Float

In [9]:
new_data = dataset.select(*(col(c).cast('float').alias(c) for c in dataset.columns))

In [10]:
new_data.printSchema()

root
 |-- GRE Score: float (nullable = true)
 |-- TOEFL Score: float (nullable = true)
 |-- University Rating: float (nullable = true)
 |-- SOP: float (nullable = true)
 |-- LOR : float (nullable = true)
 |-- CGPA: float (nullable = true)
 |-- Research: float (nullable = true)
 |-- Chance of Admit : float (nullable = true)



### 2. Checking missing values

In [11]:
## Deleteing null value from any place
# data_without_missing_any = dataset.dropna(how='any')

## Deleteing null value from all place
# data_without_missing_all = dataset.dropna(how='all')

In [12]:
## check for null or nan type
new_data.select([count(when(col(c).isNull(),c)).alias(c) for c in new_data.columns]).show()

+---------+-----------+-----------------+---+----+----+--------+----------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+---------+-----------+-----------------+---+----+----+--------+----------------+
|        0|          0|                0|  0|   0|   0|       0|               0|
+---------+-----------+-----------------+---+----+----+--------+----------------+



### 3. Treating missing values

In [13]:
from pyspark.ml.feature import Imputer

In [14]:
imputer = Imputer(inputCols=["GRE Score","TOEFL Score","University Rating"],
                 outputCols=["GRE Score","TOEFL Score","University Rating"])
model = imputer.fit(new_data)
imputed_data=model.transform(new_data)

In [15]:
imputed_data.show()

+---------+-----------+-----------------+---+----+----+--------+----------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+---------+-----------+-----------------+---+----+----+--------+----------------+
|    337.0|      118.0|              4.0|4.5| 4.5|9.65|     1.0|            0.92|
|    324.0|      107.0|              4.0|4.0| 4.5|8.87|     1.0|            0.76|
|    316.0|      104.0|              3.0|3.0| 3.5| 8.0|     1.0|            0.72|
|    322.0|      110.0|              3.0|3.5| 2.5|8.67|     1.0|             0.8|
|    314.0|      103.0|              2.0|2.0| 3.0|8.21|     0.0|            0.65|
|    330.0|      115.0|              5.0|4.5| 3.0|9.34|     1.0|             0.9|
|    321.0|      109.0|              3.0|3.0| 4.0| 8.2|     1.0|            0.75|
|    308.0|      101.0|              2.0|3.0| 4.0| 7.9|     0.0|            0.68|
|    302.0|      102.0|              1.0|2.0| 1.5| 8.0|     0.0|             0.5|
|    323.0|     

In [16]:
imputed_data.count()

400

In [17]:
imputed_data.corr('SOP','Research')

0.4440288093566173

In [18]:
features = imputed_data.drop('Chance of Admit ')

In [19]:
# We need to convert dataframe into a RDD to check for correlation
col_names = features.columns
features_rdd = features.rdd

In [20]:
features_rdd.collect()

[Row(GRE Score=337.0, TOEFL Score=118.0, University Rating=4.0, SOP=4.5, LOR =4.5, CGPA=9.649999618530273, Research=1.0),
 Row(GRE Score=324.0, TOEFL Score=107.0, University Rating=4.0, SOP=4.0, LOR =4.5, CGPA=8.869999885559082, Research=1.0),
 Row(GRE Score=316.0, TOEFL Score=104.0, University Rating=3.0, SOP=3.0, LOR =3.5, CGPA=8.0, Research=1.0),
 Row(GRE Score=322.0, TOEFL Score=110.0, University Rating=3.0, SOP=3.5, LOR =2.5, CGPA=8.670000076293945, Research=1.0),
 Row(GRE Score=314.0, TOEFL Score=103.0, University Rating=2.0, SOP=2.0, LOR =3.0, CGPA=8.210000038146973, Research=0.0),
 Row(GRE Score=330.0, TOEFL Score=115.0, University Rating=5.0, SOP=4.5, LOR =3.0, CGPA=9.34000015258789, Research=1.0),
 Row(GRE Score=321.0, TOEFL Score=109.0, University Rating=3.0, SOP=3.0, LOR =4.0, CGPA=8.199999809265137, Research=1.0),
 Row(GRE Score=308.0, TOEFL Score=101.0, University Rating=2.0, SOP=3.0, LOR =4.0, CGPA=7.900000095367432, Research=0.0),
 Row(GRE Score=302.0, TOEFL Score=102.0

In [21]:
features_rdd = features_rdd.map(lambda row: row[0:])

In [22]:
features_rdd.collect()

[(337.0, 118.0, 4.0, 4.5, 4.5, 9.649999618530273, 1.0),
 (324.0, 107.0, 4.0, 4.0, 4.5, 8.869999885559082, 1.0),
 (316.0, 104.0, 3.0, 3.0, 3.5, 8.0, 1.0),
 (322.0, 110.0, 3.0, 3.5, 2.5, 8.670000076293945, 1.0),
 (314.0, 103.0, 2.0, 2.0, 3.0, 8.210000038146973, 0.0),
 (330.0, 115.0, 5.0, 4.5, 3.0, 9.34000015258789, 1.0),
 (321.0, 109.0, 3.0, 3.0, 4.0, 8.199999809265137, 1.0),
 (308.0, 101.0, 2.0, 3.0, 4.0, 7.900000095367432, 0.0),
 (302.0, 102.0, 1.0, 2.0, 1.5, 8.0, 0.0),
 (323.0, 108.0, 3.0, 3.5, 3.0, 8.600000381469727, 0.0),
 (325.0, 106.0, 3.0, 3.5, 4.0, 8.399999618530273, 1.0),
 (327.0, 111.0, 4.0, 4.0, 4.5, 9.0, 1.0),
 (328.0, 112.0, 4.0, 4.0, 4.5, 9.100000381469727, 1.0),
 (307.0, 109.0, 3.0, 4.0, 3.0, 8.0, 1.0),
 (311.0, 104.0, 3.0, 3.5, 2.0, 8.199999809265137, 1.0),
 (314.0, 105.0, 3.0, 3.5, 2.5, 8.300000190734863, 0.0),
 (317.0, 107.0, 3.0, 4.0, 3.0, 8.699999809265137, 0.0),
 (319.0, 106.0, 3.0, 4.0, 3.0, 8.0, 1.0),
 (318.0, 110.0, 3.0, 4.0, 3.0, 8.800000190734863, 0.0),
 (303.0

### 4. Staticstics

In [23]:
summary = Statistics.colStats(features_rdd)

In [24]:
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())
print(summary.normL1())

[316.8075     107.41         3.0875       3.4          3.4525
   8.59892501   0.5475    ]
[131.64455514  36.83899749   1.30811404   1.01378446   0.8072619
   0.35559406   0.24836466]
[400. 400. 400. 400. 400. 400. 219.]
[126723.          42964.           1235.           1360.
   1381.           3439.57000208    219.        ]


### 5. Checking correlation using pearson method

In [25]:
corr_mat = Statistics.corr(features_rdd,method='pearson')
corr_df = pd.DataFrame(corr_mat)
corr_df.index,corr_df.columns = col_names,col_names

In [26]:
corr_df

Unnamed: 0,GRE Score,TOEFL Score,University Rating,SOP,LOR,CGPA,Research
GRE Score,1.0,0.835977,0.668976,0.612831,0.557555,0.83306,0.580391
TOEFL Score,0.835977,1.0,0.69559,0.657981,0.567721,0.828417,0.489858
University Rating,0.668976,0.69559,1.0,0.734523,0.660123,0.746479,0.447783
SOP,0.612831,0.657981,0.734523,1.0,0.729593,0.718144,0.444029
LOR,0.557555,0.567721,0.660123,0.729593,1.0,0.670211,0.396859
CGPA,0.83306,0.828417,0.746479,0.718144,0.670211,1.0,0.521654
Research,0.580391,0.489858,0.447783,0.444029,0.396859,0.521654,1.0


In [27]:
corr_df.index

Index(['GRE Score', 'TOEFL Score', 'University Rating', 'SOP', 'LOR ', 'CGPA',
       'Research'],
      dtype='object')

In [28]:
corr_df.columns

Index(['GRE Score', 'TOEFL Score', 'University Rating', 'SOP', 'LOR ', 'CGPA',
       'Research'],
      dtype='object')

### 6. VectorAssembler

In [29]:
imputed_data.show()

+---------+-----------+-----------------+---+----+----+--------+----------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+---------+-----------+-----------------+---+----+----+--------+----------------+
|    337.0|      118.0|              4.0|4.5| 4.5|9.65|     1.0|            0.92|
|    324.0|      107.0|              4.0|4.0| 4.5|8.87|     1.0|            0.76|
|    316.0|      104.0|              3.0|3.0| 3.5| 8.0|     1.0|            0.72|
|    322.0|      110.0|              3.0|3.5| 2.5|8.67|     1.0|             0.8|
|    314.0|      103.0|              2.0|2.0| 3.0|8.21|     0.0|            0.65|
|    330.0|      115.0|              5.0|4.5| 3.0|9.34|     1.0|             0.9|
|    321.0|      109.0|              3.0|3.0| 4.0| 8.2|     1.0|            0.75|
|    308.0|      101.0|              2.0|3.0| 4.0| 7.9|     0.0|            0.68|
|    302.0|      102.0|              1.0|2.0| 1.5| 8.0|     0.0|             0.5|
|    323.0|     

In [32]:
features.show()

+---------+-----------+-----------------+---+----+----+--------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|
+---------+-----------+-----------------+---+----+----+--------+
|    337.0|      118.0|              4.0|4.5| 4.5|9.65|     1.0|
|    324.0|      107.0|              4.0|4.0| 4.5|8.87|     1.0|
|    316.0|      104.0|              3.0|3.0| 3.5| 8.0|     1.0|
|    322.0|      110.0|              3.0|3.5| 2.5|8.67|     1.0|
|    314.0|      103.0|              2.0|2.0| 3.0|8.21|     0.0|
|    330.0|      115.0|              5.0|4.5| 3.0|9.34|     1.0|
|    321.0|      109.0|              3.0|3.0| 4.0| 8.2|     1.0|
|    308.0|      101.0|              2.0|3.0| 4.0| 7.9|     0.0|
|    302.0|      102.0|              1.0|2.0| 1.5| 8.0|     0.0|
|    323.0|      108.0|              3.0|3.5| 3.0| 8.6|     0.0|
|    325.0|      106.0|              3.0|3.5| 4.0| 8.4|     1.0|
|    327.0|      111.0|              4.0|4.0| 4.5| 9.0|     1.0|
|    328.0|      112.0|  

In [34]:
assembler = VectorAssembler(inputCols = features.columns, outputCol='features')

In [36]:
output= assembler.transform(imputed_data)

In [37]:
output.show()

+---------+-----------+-----------------+---+----+----+--------+----------------+--------------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |            features|
+---------+-----------+-----------------+---+----+----+--------+----------------+--------------------+
|    337.0|      118.0|              4.0|4.5| 4.5|9.65|     1.0|            0.92|[337.0,118.0,4.0,...|
|    324.0|      107.0|              4.0|4.0| 4.5|8.87|     1.0|            0.76|[324.0,107.0,4.0,...|
|    316.0|      104.0|              3.0|3.0| 3.5| 8.0|     1.0|            0.72|[316.0,104.0,3.0,...|
|    322.0|      110.0|              3.0|3.5| 2.5|8.67|     1.0|             0.8|[322.0,110.0,3.0,...|
|    314.0|      103.0|              2.0|2.0| 3.0|8.21|     0.0|            0.65|[314.0,103.0,2.0,...|
|    330.0|      115.0|              5.0|4.5| 3.0|9.34|     1.0|             0.9|[330.0,115.0,5.0,...|
|    321.0|      109.0|              3.0|3.0| 4.0| 8.2|     1.0|         

In [38]:
output.select('features','Chance of Admit ').show(truncate=False)

+-----------------------------------------------+----------------+
|features                                       |Chance of Admit |
+-----------------------------------------------+----------------+
|[337.0,118.0,4.0,4.5,4.5,9.649999618530273,1.0]|0.92            |
|[324.0,107.0,4.0,4.0,4.5,8.869999885559082,1.0]|0.76            |
|[316.0,104.0,3.0,3.0,3.5,8.0,1.0]              |0.72            |
|[322.0,110.0,3.0,3.5,2.5,8.670000076293945,1.0]|0.8             |
|[314.0,103.0,2.0,2.0,3.0,8.210000038146973,0.0]|0.65            |
|[330.0,115.0,5.0,4.5,3.0,9.34000015258789,1.0] |0.9             |
|[321.0,109.0,3.0,3.0,4.0,8.199999809265137,1.0]|0.75            |
|[308.0,101.0,2.0,3.0,4.0,7.900000095367432,0.0]|0.68            |
|[302.0,102.0,1.0,2.0,1.5,8.0,0.0]              |0.5             |
|[323.0,108.0,3.0,3.5,3.0,8.600000381469727,0.0]|0.45            |
|[325.0,106.0,3.0,3.5,4.0,8.399999618530273,1.0]|0.52            |
|[327.0,111.0,4.0,4.0,4.5,9.0,1.0]              |0.84         

### 7. Standard Scaling

In [39]:
label = imputed_data.select('Chance of Admit ')

In [40]:
label.show()

+----------------+
|Chance of Admit |
+----------------+
|            0.92|
|            0.76|
|            0.72|
|             0.8|
|            0.65|
|             0.9|
|            0.75|
|            0.68|
|             0.5|
|            0.45|
|            0.52|
|            0.84|
|            0.78|
|            0.62|
|            0.61|
|            0.54|
|            0.66|
|            0.65|
|            0.63|
|            0.62|
+----------------+
only showing top 20 rows



In [41]:
features.show()

+---------+-----------+-----------------+---+----+----+--------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|
+---------+-----------+-----------------+---+----+----+--------+
|    337.0|      118.0|              4.0|4.5| 4.5|9.65|     1.0|
|    324.0|      107.0|              4.0|4.0| 4.5|8.87|     1.0|
|    316.0|      104.0|              3.0|3.0| 3.5| 8.0|     1.0|
|    322.0|      110.0|              3.0|3.5| 2.5|8.67|     1.0|
|    314.0|      103.0|              2.0|2.0| 3.0|8.21|     0.0|
|    330.0|      115.0|              5.0|4.5| 3.0|9.34|     1.0|
|    321.0|      109.0|              3.0|3.0| 4.0| 8.2|     1.0|
|    308.0|      101.0|              2.0|3.0| 4.0| 7.9|     0.0|
|    302.0|      102.0|              1.0|2.0| 1.5| 8.0|     0.0|
|    323.0|      108.0|              3.0|3.5| 3.0| 8.6|     0.0|
|    325.0|      106.0|              3.0|3.5| 4.0| 8.4|     1.0|
|    327.0|      111.0|              4.0|4.0| 4.5| 9.0|     1.0|
|    328.0|      112.0|  

In [42]:
col_names = features.columns

In [43]:
feature_rdd = features.rdd.map(lambda row:row[0:])

In [44]:
feature_rdd.collect()

[(337.0, 118.0, 4.0, 4.5, 4.5, 9.649999618530273, 1.0),
 (324.0, 107.0, 4.0, 4.0, 4.5, 8.869999885559082, 1.0),
 (316.0, 104.0, 3.0, 3.0, 3.5, 8.0, 1.0),
 (322.0, 110.0, 3.0, 3.5, 2.5, 8.670000076293945, 1.0),
 (314.0, 103.0, 2.0, 2.0, 3.0, 8.210000038146973, 0.0),
 (330.0, 115.0, 5.0, 4.5, 3.0, 9.34000015258789, 1.0),
 (321.0, 109.0, 3.0, 3.0, 4.0, 8.199999809265137, 1.0),
 (308.0, 101.0, 2.0, 3.0, 4.0, 7.900000095367432, 0.0),
 (302.0, 102.0, 1.0, 2.0, 1.5, 8.0, 0.0),
 (323.0, 108.0, 3.0, 3.5, 3.0, 8.600000381469727, 0.0),
 (325.0, 106.0, 3.0, 3.5, 4.0, 8.399999618530273, 1.0),
 (327.0, 111.0, 4.0, 4.0, 4.5, 9.0, 1.0),
 (328.0, 112.0, 4.0, 4.0, 4.5, 9.100000381469727, 1.0),
 (307.0, 109.0, 3.0, 4.0, 3.0, 8.0, 1.0),
 (311.0, 104.0, 3.0, 3.5, 2.0, 8.199999809265137, 1.0),
 (314.0, 105.0, 3.0, 3.5, 2.5, 8.300000190734863, 0.0),
 (317.0, 107.0, 3.0, 4.0, 3.0, 8.699999809265137, 0.0),
 (319.0, 106.0, 3.0, 4.0, 3.0, 8.0, 1.0),
 (318.0, 110.0, 3.0, 4.0, 3.0, 8.800000190734863, 0.0),
 (303.0

In [45]:
scaler1= StandardScaler().fit(feature_rdd)

In [48]:
scaled_features = scaler1.transform(feature_rdd)

In [49]:
for data in scaled_features.collect():
    print(data)

[29.37165715817819,19.441425512514545,3.4973346335427156,4.469301967216541,5.008472397132565,16.182665160837633,2.0065736213062157]
[28.23862587314461,17.629089235924205,3.4973346335427156,3.9727128597480363,5.008472397132565,14.874636663097856,2.0065736213062157]
[27.541375851585485,17.134815705945023,2.6230009751570367,2.9795346448110274,3.895478531103106,13.415681492681596,2.0065736213062157]
[28.064313367754828,18.12336276590339,2.6230009751570367,3.476123752279532,2.7824846650736474,14.53924494563559,2.0065736213062157]
[27.367063346195703,16.970057862618628,1.7486673167713578,1.9863564298740182,3.3389815980883766,13.767843195835443,0.0]
[28.761563389313956,18.947151982535363,4.371668291928394,4.469301967216541,3.3389815980883766,15.662808398589581,2.0065736213062157]
[27.97715711505994,17.958604922576995,2.6230009751570367,2.9795346448110274,4.451975464117836,13.751073210143865,2.0065736213062157]
[26.844125830026357,16.64054217596584,1.7486673167713578,2.9795346448110274,4.45197

### 8.PCA

In [50]:
pca = PCA(k=3)
pca_model = pca.fit(scaled_features)

In [52]:
result = pca_model.transform(scaled_features)

In [53]:
result.collect()

[DenseVector([-31.6384, 7.9988, -18.7787]),
 DenseVector([-29.7285, 7.5971, -17.3592]),
 DenseVector([-27.5299, 8.3553, -17.2499]),
 DenseVector([-28.395, 8.9767, -18.493]),
 DenseVector([-26.0402, 7.639, -18.911]),
 DenseVector([-30.726, 8.3012, -18.8943]),
 DenseVector([-28.3701, 8.3387, -17.6314]),
 DenseVector([-26.2593, 6.5891, -17.7328]),
 DenseVector([-24.4777, 8.2625, -19.0739]),
 DenseVector([-27.8625, 7.2561, -19.3362]),
 DenseVector([-28.6416, 8.211, -17.4853]),
 DenseVector([-30.1865, 7.7579, -17.8051]),
 DenseVector([-30.3569, 7.8067, -17.9521]),
 DenseVector([-27.7275, 8.1334, -17.3206]),
 DenseVector([-27.0918, 8.8134, -17.7399]),
 DenseVector([-26.9449, 7.2096, -18.9396]),
 DenseVector([-27.8484, 6.915, -18.9891]),
 DenseVector([-27.9469, 8.3731, -17.4675]),
 DenseVector([-28.1502, 7.0043, -19.2859]),
 DenseVector([-26.7041, 6.63, -18.2265]),
 DenseVector([-26.9239, 9.0629, -17.9997]),
 DenseVector([-27.9393, 7.8941, -20.3473]),
 DenseVector([-31.8164, 7.1023, -17.9254]

In [54]:
type(result)

pyspark.rdd.RDD

In [55]:
# Store dense vector in a dataframe
df = result.map(lambda x:(x,)).toDF(['PCA_features'])

In [56]:
df.show(truncate=False)

+------------------------------------------------------------+
|PCA_features                                                |
+------------------------------------------------------------+
|[-31.638357981616196,7.9988234243403165,-18.778736861264157]|
|[-29.7284816859805,7.5971270965983235,-17.359243547900057]  |
|[-27.529875295247994,8.355301308686364,-17.249917222979263] |
|[-28.395012107528025,8.976734714525021,-18.493044682330435] |
|[-26.04020972477991,7.639040673473518,-18.91101847286499]   |
|[-30.725950374683748,8.301164657578187,-18.894325694789]    |
|[-28.37009643334537,8.338728576348629,-17.63138877578692]   |
|[-26.259297092648367,6.58910354198373,-17.732814791595455]  |
|[-24.477674536234293,8.262519528093943,-19.07390743619906]  |
|[-27.862539246066593,7.256114085064626,-19.336244347295683] |
|[-28.641580861873667,8.210972734460288,-17.485276917457714] |
|[-30.186469010556298,7.757897469035917,-17.80514620005354]  |
|[-30.35689398226225,7.806689421638942,-17.952117566286