# Introduction to Spark ML

**Apache Spark MLlib** is the machine learning (ML) library of **Apache Spark**, which provides high-performance, scalable, and distributed machine learning algorithms. Spark's MLlib is designed for processing large datasets using distributed computing, which makes it suitable for big data applications.

Spark MLlib supports a wide range of machine learning tasks, including classification, regression, clustering, recommendation, and more. It is built on top of Spark’s powerful data processing capabilities, allowing it to handle vast amounts of data efficiently.

---

## Spark ML Components

Spark MLlib is structured into two main components:

1. **DataFrames-based API (spark.ml)**: This is the newer, high-level API, which integrates better with Spark's DataFrame API. It is the recommended approach for most users.
2. **RDD-based API (spark.mllib)**: This is the older API, which is based on Spark's Resilient Distributed Dataset (RDD). It is now in maintenance mode, and the DataFrames-based API is preferred.

We will focus on the **DataFrames-based API (spark.ml)**.

---

## Features of Spark ML

- **Scalability**: Spark ML is designed for distributed computing, making it highly scalable for large datasets.
- **Ease of Use**: The DataFrame API provides an easy-to-use and intuitive interface for machine learning pipelines.
- **Integration**: It integrates seamlessly with other components of Spark, such as SQL, streaming, and graph processing.
- **Built-in Algorithms**: It offers a wide range of out-of-the-box algorithms for classification, regression, clustering, recommendation, and more.
- **Pipeline Support**: Spark ML provides pipeline support for building end-to-end machine learning workflows, including data transformation, feature engineering, model training, and evaluation.

---

## Machine Learning Pipeline in Spark ML

A machine learning pipeline in Spark ML consists of a series of **stages** that are executed in sequence to process data and train a model. The typical stages in a pipeline are:

1. **Data Preparation**: Loading and cleaning data, handling missing values, and normalizing data.
2. **Feature Engineering**: Transforming raw data into meaningful features using techniques such as one-hot encoding, vectorizing, and scaling.
3. **Model Training**: Applying machine learning algorithms to train models.
4. **Model Evaluation**: Evaluating the model's performance using appropriate metrics (e.g., accuracy, precision, recall).
5. **Model Tuning**: Optimizing the model using hyperparameter tuning (e.g., cross-validation or grid search).



In [1]:
# Install Pyspark
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
     ---------------------------------------- 0.0/317.3 MB ? eta -:--:--
     --------------------------------------- 1.6/317.3 MB 10.5 MB/s eta 0:00:31
     --------------------------------------- 3.7/317.3 MB 10.9 MB/s eta 0:00:29
      -------------------------------------- 6.8/317.3 MB 11.3 MB/s eta 0:00:28
     - ------------------------------------- 8.7/317.3 MB 10.7 MB/s eta 0:00:29
     - ------------------------------------ 10.5/317.3 MB 10.4 MB/s eta 0:00:30
     - ------------------------------------ 13.1/317.3 MB 10.5 MB/s eta 0:00:29
     - ------------------------------------ 14.9/317.3 MB 10.2 MB/s eta 0:00:30
     -- ------------------------------------ 16.5/317.3 MB 9.9 MB/s eta 0:00:31
     -- ------------------------------------ 18.6/317.3 MB 9.9 MB/s eta 0:00:31
     -- ------------------------------------ 20.7/317.3 MB 9.9 MB/s eta 0:00:30
     -- ----------------------------------- 23.3/317.3 MB 10.1

In [1]:
# Create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()


In [4]:
# Clone the dataset
! git clone https://github.com/education454/admission_dataset

Cloning into 'admission_dataset'...


In [19]:
import os
from pathlib import Path
os.listdir('admission_dataset')
cwd = os.getcwd()
cwd


'c:\\Users\\Mehdi\\Desktop\\Admission-Prediction-With-Pyspark-ML'

In [26]:
# Create a spark dataframe
df = spark.read.csv('admission_dataset/Admission_Predict_Ver1.1.csv', header=True, inferSchema=True)

In [27]:
#display the dataframe
df.show()

+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|        1|      337|        118|                4|4.5|4.5|9.65|       1|           0.92|
|        2|      324|        107|                4|4.0|4.5|8.87|       1|           0.76|
|        3|      316|        104|                3|3.0|3.5| 8.0|       1|           0.72|
|        4|      322|        110|                3|3.5|2.5|8.67|       1|            0.8|
|        5|      314|        103|                2|2.0|3.0|8.21|       0|           0.65|
|        6|      330|        115|                5|4.5|3.0|9.34|       1|            0.9|
|        7|      321|        109|                3|3.0|4.0| 8.2|       1|           0.75|
|        8|      308|        101|                2|3.0|4.0| 7.9|       0|           0.68|
|        9

In [28]:
#get the number of rows and columns
print((df.count(), len(df.columns)))

(500, 9)


In [29]:
#print schema
df.printSchema()

root
 |-- Serial No: integer (nullable = true)
 |-- GRE Score: integer (nullable = true)
 |-- TOEFL Score: integer (nullable = true)
 |-- University Rating: integer (nullable = true)
 |-- SOP: double (nullable = true)
 |-- LOR: double (nullable = true)
 |-- CGPA: double (nullable = true)
 |-- Research: integer (nullable = true)
 |-- Chance of Admit: double (nullable = true)



In [30]:
# get the summary statistics
df.describe().show()

+-------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-------------------+
|summary|        Serial No|         GRE Score|      TOEFL Score|University Rating|               SOP|               LOR|              CGPA|          Research|    Chance of Admit|
+-------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+-------------------+
|  count|              500|               500|              500|              500|               500|               500|               500|               500|                500|
|   mean|            250.5|           316.472|          107.192|            3.114|             3.374|             3.484| 8.576440000000003|              0.56| 0.7217399999999996|
| stddev|144.4818327679989|11.295148372354712|6.081867659564538|1.143511800759815|0.9910036207566072|0.92

In [33]:
#drop the unnecessary columns
df = df.drop('Serial No.')

In [34]:
#display the dataframe
df.show()

+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|        1|      337|        118|                4|4.5|4.5|9.65|       1|           0.92|
|        2|      324|        107|                4|4.0|4.5|8.87|       1|           0.76|
|        3|      316|        104|                3|3.0|3.5| 8.0|       1|           0.72|
|        4|      322|        110|                3|3.5|2.5|8.67|       1|            0.8|
|        5|      314|        103|                2|2.0|3.0|8.21|       0|           0.65|
|        6|      330|        115|                5|4.5|3.0|9.34|       1|            0.9|
|        7|      321|        109|                3|3.0|4.0| 8.2|       1|           0.75|
|        8|      308|        101|                2|3.0|4.0| 7.9|       0|           0.68|
|        9

In [35]:
#check for null values
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+
|        0|        0|          0|                0|  0|  0|   0|       0|              0|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+



In [39]:
# correlation analysis
from pyspark.sql.functions import corr

for col in df.columns:
    print('Correlation to Chance of Admit for', col, df.stat.corr('Chance of Admit', col))



Correlation to Chance of Admit for Serial No 0.00850504936113174
Correlation to Chance of Admit for GRE Score 0.8103506354632598
Correlation to Chance of Admit for TOEFL Score 0.7922276143050823
Correlation to Chance of Admit for University Rating 0.6901323687886892
Correlation to Chance of Admit for SOP 0.6841365241316723
Correlation to Chance of Admit for LOR 0.6453645135280112
Correlation to Chance of Admit for CGPA 0.882412574904574
Correlation to Chance of Admit for Research 0.5458710294711379
Correlation to Chance of Admit for Chance of Admit 1.0


In [42]:
# feature selection
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['GRE Score', 'TOEFL Score', 'CGPA'], outputCol='features')
output_data = assembler.transform(df)
output_data.show()

+---------+---------+-----------+-----------------+---+---+----+--------+---------------+------------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|          features|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+------------------+
|        1|      337|        118|                4|4.5|4.5|9.65|       1|           0.92|[337.0,118.0,9.65]|
|        2|      324|        107|                4|4.0|4.5|8.87|       1|           0.76|[324.0,107.0,8.87]|
|        3|      316|        104|                3|3.0|3.5| 8.0|       1|           0.72| [316.0,104.0,8.0]|
|        4|      322|        110|                3|3.5|2.5|8.67|       1|            0.8|[322.0,110.0,8.67]|
|        5|      314|        103|                2|2.0|3.0|8.21|       0|           0.65|[314.0,103.0,8.21]|
|        6|      330|        115|                5|4.5|3.0|9.34|       1|            0.9|[330.0,115.0,9.34]|
|        7|      32

In [43]:
# Import LinearRegression and create findal data
from pyspark.ml.regression import LinearRegression

final_data = output_data.select('features', 'Chance of Admit')
final_data.show()

+------------------+---------------+
|          features|Chance of Admit|
+------------------+---------------+
|[337.0,118.0,9.65]|           0.92|
|[324.0,107.0,8.87]|           0.76|
| [316.0,104.0,8.0]|           0.72|
|[322.0,110.0,8.67]|            0.8|
|[314.0,103.0,8.21]|           0.65|
|[330.0,115.0,9.34]|            0.9|
| [321.0,109.0,8.2]|           0.75|
| [308.0,101.0,7.9]|           0.68|
| [302.0,102.0,8.0]|            0.5|
| [323.0,108.0,8.6]|           0.45|
| [325.0,106.0,8.4]|           0.52|
| [327.0,111.0,9.0]|           0.84|
| [328.0,112.0,9.1]|           0.78|
| [307.0,109.0,8.0]|           0.62|
| [311.0,104.0,8.2]|           0.61|
| [314.0,105.0,8.3]|           0.54|
| [317.0,107.0,8.7]|           0.66|
| [319.0,106.0,8.0]|           0.65|
| [318.0,110.0,8.8]|           0.63|
| [303.0,102.0,8.5]|           0.62|
+------------------+---------------+
only showing top 20 rows



In [44]:
# Print the schema of the final data
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Chance of Admit: double (nullable = true)



In [45]:
# Split the dataset into training and testing data
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [46]:
#build the linear regression model
lr = LinearRegression(featuresCol='features' ,labelCol='Chance of Admit')
model = lr.fit(train_data)

In [47]:
# Get coefficients and intercept
print('Coefficients: {}'.format(model.coefficients))
print('Intercept: {}'.format(model.intercept))

Coefficients: [0.0024357089411697662,0.003274343953603397,0.13880979615213976]
Intercept: -1.5886664006130302


In [49]:
# get summary of the model
training_summary = model.summary
print('RMSE: {}'.format(training_summary.rootMeanSquaredError))
print('R2: {}'.format(training_summary.r2))

RMSE: 0.05967141273543487
R2: 0.8176572801166437


In [50]:
# Transform the test data
predictions = model.transform(test_data)

In [51]:
# Display the predictions
predictions.show()

+------------------+---------------+-------------------+
|          features|Chance of Admit|         prediction|
+------------------+---------------+-------------------+
|[290.0,100.0,7.56]|           0.47|0.49452564659671827|
|[290.0,104.0,7.46]|           0.45|0.49374204279591805|
| [294.0,93.0,7.36]|           0.46|0.45358611545574545|
|[295.0,101.0,7.86]|           0.69| 0.5516214741018124|
| [296.0,95.0,7.54]|           0.44|  0.489991984552677|
| [297.0,96.0,7.89]|           0.43| 0.5442854661006993|
| [298.0,92.0,7.88]|           0.51| 0.5322357012659338|
| [298.0,97.0,7.21]|           0.45| 0.4556048576120171|
|  [298.0,99.0,7.6]|           0.46| 0.5162893660185586|
|[298.0,101.0,7.86]|           0.54| 0.5589286009253218|
| [299.0,94.0,7.34]|           0.42|0.46626280819215493|
|[299.0,100.0,7.88]|           0.68| 0.5608661618359307|
|[299.0,100.0,7.89]|           0.59| 0.5622542597974522|
|[299.0,100.0,8.02]|           0.63| 0.5802995332972305|
|[299.0,102.0,8.62]|           

In [53]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='Chance of Admit', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})
print('RMSE: {}'.format(rmse))
print('R2: {}'.format(r2))


RMSE: 0.06755054685537806
R2: 0.7741648382556023


## Try with scaled data

In [59]:
# scale the features 
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures')
assembler = VectorAssembler(inputCols=['GRE Score', 'TOEFL Score', 'CGPA', 'University Rating'], outputCol='features')
output_data = assembler.transform(df)
output_data.show()
final_data = output_data.select('features', 'Chance of Admit')
scaler_model = scaler.fit(final_data)
final_data = scaler_model.transform(final_data)
final_data.show()

# Split the dataset into training and testing data
train_data, test_data = final_data.randomSplit([0.7, 0.3])

#build the linear regression model
lr = LinearRegression(featuresCol='scaledFeatures' ,labelCol='Chance of Admit')
model = lr.fit(train_data)

# Get coefficients and intercept
print('Coefficients: {}'.format(model.coefficients))
print('Intercept: {}'.format(model.intercept))

# get summary of the model
training_summary = model.summary
print('RMSE: {}'.format(training_summary.rootMeanSquaredError))
print('R2: {}'.format(training_summary.r2))

# Transform the test data
predictions = model.transform(test_data)
predictions.show()


+---------+---------+-----------+-----------------+---+---+----+--------+---------------+--------------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|            features|
+---------+---------+-----------+-----------------+---+---+----+--------+---------------+--------------------+
|        1|      337|        118|                4|4.5|4.5|9.65|       1|           0.92|[337.0,118.0,9.65...|
|        2|      324|        107|                4|4.0|4.5|8.87|       1|           0.76|[324.0,107.0,8.87...|
|        3|      316|        104|                3|3.0|3.5| 8.0|       1|           0.72|[316.0,104.0,8.0,...|
|        4|      322|        110|                3|3.5|2.5|8.67|       1|            0.8|[322.0,110.0,8.67...|
|        5|      314|        103|                2|2.0|3.0|8.21|       0|           0.65|[314.0,103.0,8.21...|
|        6|      330|        115|                5|4.5|3.0|9.34|       1|            0.9|[330.0,115.0,9.34...|
|

In [60]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='Chance of Admit', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})
print('RMSE: {}'.format(rmse))
print('R2: {}'.format(r2))

RMSE: 0.06494788804472544
R2: 0.8045369387130764


In [None]:
# save the model
model.save('linear_regression_model')


In [None]:
# load the model
# from pyspark.ml.regression import LinearRegressionModel
# model = LinearRegressionModel.load('linear_regression_model')